In [None]:
# General Imports 
import numpy as np
import pandas as pd 
from sklearn.model_selection import GridSearchCV, KFold
from sksurv.linear_model import CoxPHSurvivalAnalysis
from sksurv.ensemble import RandomSurvivalForest
from sklearn.preprocessing import StandardScaler
import sksurv
from sklearn.pipeline import Pipeline
import warnings
from sksurv.ensemble import ComponentwiseGradientBoostingSurvivalAnalysis
from sksurv.ensemble import GradientBoostingSurvivalAnalysis

In [None]:
def fit_and_score_features(X, y):
    X = np.array(X)
    n_features = X.shape[1]
    scores = np.empty(n_features)
    m = CoxPHSurvivalAnalysis()
    # m = RandomSurvivalForest()
    for j in range(n_features):
        Xj = X[:, j:j+1]
        m.fit(Xj, y)
        scores[j] = m.score(Xj, y)
    return scores

In [None]:
def remove_high_correlated_features(_X_, _VALUE_): 
    corr_matrix = _X_.corr(method ='spearman').abs() # 'pearson' OR 'kendall' OR 'spearman'(non linearity assumption)
    upper = corr_matrix.where(np.triu(np.ones(corr_matrix.shape), k=1).astype(bool))
    to_drop = [column for column in upper.columns if any(upper[column] >= _VALUE_)] # <---- !!!!! insert the value !!!!
    _X_ = _X_.drop(to_drop, axis=1)
    #print ("High correlated features removed:", len(to_drop), "\nnew training set has:", _X_.shape[1],"features")
    return _X_

In [None]:
def feature_selection(_X_, _Y_):
    scores = fit_and_score_features(_X_, _Y_)
    a = pd.Series(scores, index=_X_.columns).sort_values(ascending=False)
    #print(a)
    pipe = Pipeline([('select', SelectKBest(fit_and_score_features, k=3)), ('model', CoxPHSurvivalAnalysis())])
    # pipe = Pipeline([('select', SelectKBest(fit_and_score_features, k=3)), ('model', RandomSurvivalForest())])
    
    warnings.filterwarnings("ignore")
    param_grid = {'select__k': np.arange(1, 10)}
    cv = KFold(n_splits=3, random_state=1, shuffle=True)
    gcv = GridSearchCV(pipe, param_grid, return_train_score=True, cv=cv)
    SS = StandardScaler().fit(_X_)
    train_x_SS = SS.transform(_X_)
    gcv.fit(train_x_SS, _Y_)
    results = pd.DataFrame(gcv.cv_results_).sort_values(by='mean_test_score', ascending=False)
    results.loc[:, ~results.columns.str.endswith("_time")]
    
    selected_features = list(a.index[0:list(gcv.best_params_.values())[0]])
    #print(selected_features)
    return selected_features

In [None]:
def CV_on_training_data(_X_, _Y_, _SELECTED_FEATURES_, _FOLDS_):    
    kf = KFold(n_splits = _FOLDS_, shuffle = True, random_state = 0)
    a = range(0,len(_X_))

    # training splits
    k_fold= [x for x in kf.split(a)]

    results_list = []

    for i in range(0,_FOLDS_):
        train_x_selected = (_X_.iloc[k_fold[i][0]])[_SELECTED_FEATURES_].values
        validation_x_selected = (_X_.iloc[k_fold[i][1]])[_SELECTED_FEATURES_].values

        y_train = _Y_[k_fold[i][0]]
        y_validation = _Y_[k_fold[i][1]]

        estimator = GradientBoostingSurvivalAnalysis(n_estimators= 100,learning_rate = 0.001, max_depth = 4, min_samples_split=10,min_samples_leaf=2, max_features=1, random_state=0)
        #estimator = RandomSurvivalForest()

        SS = StandardScaler().fit(train_x_selected)
        train_x_selected = SS.transform(train_x_selected)
        validation_x_selected = SS.transform(validation_x_selected)

        estimator.fit(train_x_selected, y_train)
        result = estimator.score(validation_x_selected, y_validation)

        results_list.append(result)

    print(f"C-index mean:", np.mean(results_list))
    print(f"C-index standard deviation:", np.std(results_list))
    return results_list

In [None]:
def result_on_testing_data(_X_training_, _Y_training_, _X_testing_, _Y_testing_, _SELECTED_FEATURES_):
    estimator = GradientBoostingSurvivalAnalysis(n_estimators= 100,learning_rate = 0.001, max_depth = 4, min_samples_split=10,min_samples_leaf=2, max_features=1, random_state=0)
    #estimator = RandomSurvivalForest()
    
    SS = StandardScaler().fit(_X_training_[_SELECTED_FEATURES_])
    train_x_selected = SS.transform(_X_training_[_SELECTED_FEATURES_])
    test_x_selected = SS.transform(_X_testing_[_SELECTED_FEATURES_])

    estimator.fit(train_x_selected, _Y_training_)
    print(estimator.score(test_x_selected, _Y_testing_))
    return estimator.score(test_x_selected, _Y_testing_), estimator.predict(test_x_selected), estimator, test_x_selected

In [None]:
def CV_on_all_datasat(_X_, _Y_, _SELECTED_FEATURES_, _FOLDS_):    
    kf = KFold(n_splits = _FOLDS_, shuffle = True, random_state = 0)
    a = range(0,len(_X_))

    # training splits
    k_fold= [x for x in kf.split(a)]

    C_index_results_list = []
    
    name_columns = list(_X_[_SELECTED_FEATURES_].columns) + list(_Y_.dtype.names) + ['First_TYPE_Treatment']
    cv_dataframe = pd.DataFrame(columns = name_columns)

    for i in range(0,_FOLDS_):
        train_x_selected = (_X_.iloc[k_fold[i][0]])[_SELECTED_FEATURES_ + ['First_TYPE_Treatment']]
        train_First_TYPE_Treatment = train_x_selected[['First_TYPE_Treatment']]
        train_x_selected = train_x_selected[_SELECTED_FEATURES_]
        
        validation_x_selected = (_X_.iloc[k_fold[i][1]])[_SELECTED_FEATURES_ + ['First_TYPE_Treatment']]
        validation_First_TYPE_Treatment = validation_x_selected[['First_TYPE_Treatment']]
        validation_x_selected = validation_x_selected[_SELECTED_FEATURES_]
        
        y_train = _Y_[k_fold[i][0]]
        y_validation = _Y_[k_fold[i][1]]

        estimator = GradientBoostingSurvivalAnalysis(n_estimators= 100,learning_rate = 0.001, max_depth = 4, min_samples_split=10,min_samples_leaf=2, max_features=1, random_state=0)
        #estimator = RandomSurvivalForest()

        SS = StandardScaler().fit(train_x_selected)
        temp = SS.transform(train_x_selected)
        train_x_selected = pd.DataFrame(temp, index = train_x_selected.index, columns = train_x_selected.columns)
        
        temp = SS.transform(validation_x_selected)
        validation_x_selected = pd.DataFrame(temp, index = validation_x_selected.index, columns = validation_x_selected.columns)   

        estimator.fit(train_x_selected, y_train)
        result = estimator.score(validation_x_selected, y_validation)

        C_index_results_list.append(result)
        
        temp_dataframe = pd.concat([validation_x_selected, pd.DataFrame(y_validation, index = validation_x_selected.index, columns = y_validation.dtype.names)], axis = 1)
        temp_dataframe['Risk_Score'] = estimator.predict(validation_x_selected)
        temp_dataframe = pd.merge(temp_dataframe, validation_First_TYPE_Treatment, on="ID", how="inner")
        cv_dataframe = pd.concat([cv_dataframe, temp_dataframe])
        

    print(f"C-index mean CV:", np.mean(C_index_results_list))
    print(f"C-index standard deviation CV:", np.std(C_index_results_list))
    return C_index_results_list, cv_dataframe

In [None]:
def CV_concordance_index_OS(_DATASET_):
    return(sksurv.metrics.concordance_index_censored(_DATASET_['Censor_OS'].astype('bool'), _DATASET_['OS'].astype(float), _DATASET_['Risk_Score']))
def CV_concordance_index_PFS(_DATASET_):
    return(sksurv.metrics.concordance_index_censored(_DATASET_['Censor_PFS'].astype('bool'), _DATASET_['PFS'].astype(float), _DATASET_['Risk_Score']))