In [None]:
from functools import reduce
import pandas as pd

datasets = {
    name: pd.read_csv(f'datasets/{name}.csv')
    for name in ["BLCA", "BRCA", "COAD"]
}

for name, df in datasets.items():
    print(f"{name}\t: {df.shape}")


def omics_str(omics):
    if type(omics) == str:
        omics = set([omics])
    return reduce(lambda a,b: a+"_"+b, omics)

def get_X(df, omics):
    if type(omics) == str:
        if omics == "__ALL__":
            return df.values
        omics = set([omics])
    indexes = [
        i
        for i, name in enumerate(df.columns)
        if name.split("_")[-1] in omics
    ]
    #print(f"selected {len(indexes)} for omics: {omics}")
    return df.values[:, indexes]

In [None]:
from typing import List, Set, Dict
from statistics import mean

from sklearn.metrics import accuracy_score
from sklearn.linear_model import LogisticRegression
from sklearn.svm import LinearSVC

from utils import stability_measure


estimators = {
    "LR": (LogisticRegression, {'penalty':'l1', 'solver':'liblinear', 'max_iter':100000}),
    "SVM": (LinearSVC, {'penalty':"l1", 'dual':False, 'max_iter':100000}),
}


class CVResult:
    def __init__(self, n_features:int):
        self.n_features = n_features # number of all features
        self.metrics = {
            "accuracy": accuracy_score
        }
        self.results = {}
        self.features_sets = []
    
    def add_feature_set(self, feature_set:Set[int]):
        self.features_sets.append(feature_set)

    def evaluate(self, prefix:str, y_real:List[float], y_predict:List[float]):
        for name, fun in self.metrics.items():
            key = f"{prefix}_{name}"
            if key not in self.results:
                self.results[key] = []
            self.results[key].append(fun(y_real,y_predict))
    
    def summary(self)->Dict[str,float]:
        results = {key:mean(values) for key, values in self.results.items()}
        results["fss_nogueira"] = stability_measure.nogueira(self.features_sets, self.n_features)
        #results["fss_lustgarten"] = stability_measure.lustgarten(self.features_sets, self.n_features)
        results["avg_features"] = mean([len(s) for s in self.features_sets])
        results["all_features"] = self.n_features
        results["runs"] = len(self.features_sets)
        return results


def append(data:Dict[str, List[float]], key:str, value:float):
    if key not in data:
        data[key] = []
    data[key].append(value)

In [None]:
from datetime import datetime

from sklearn.feature_selection import SelectFromModel
from sklearn.model_selection import StratifiedKFold

from utils import estimator_helpers


data = {}

N_SPLITS = [5,]
OMICS = ["__ALL__", "mutation", "cnv", "mirna", "rna"]
RANDOM_SEEDS = [0,42,21]
FEATURES_NUMS = [10,25,100]
i = 0
I =  len(datasets) * len(OMICS) * len(N_SPLITS)

for ds_name, df in datasets.items():    
    y = df.values[:,0]
    
    for omics in OMICS:
        X = get_X(df.iloc[:,1:], omics)

        for n_splits in N_SPLITS:
            selectors = estimator_helpers.create_selectors(estimators, X, y, FEATURES_NUMS, train_size=(1.-1./n_splits))
            results = { name: CVResult(X.shape[1]) for name in selectors }

            for random_seed in RANDOM_SEEDS:
                skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=random_seed)
                    
                for train_index, test_index in skf.split(X, y):
                    X_train, y_train = X[train_index], y[train_index]
                    X_test, y_test = X[test_index], y[test_index]
                    
                    for name, selector in selectors.items():
                        # calculate results on this split for every estimator/selector
                        result = results[name]
                        selector.fit(X_train, y_train)
                        selected_features_indexes = selector.get_support(indices=True)
                        result.add_feature_set(set(selected_features_indexes))
                        result.evaluate("train", y_train, selector.estimator_.predict(X_train))
                        result.evaluate("test", y_test, selector.estimator_.predict(X_test))
            i+=1
            print(f"Results {i}/{I} for {ds_name} {omics} and {n_splits}-fold cross validation:")
            
            for name, result in results.items():
                r = result.summary()
                #print(f"{name:5} : ", r)
                append(data, "cv-folds", n_splits)
                append(data, "dataset", ds_name)
                append(data, "estimator", name)
                append(data, "omics", omics_str(omics))
                for key, value in r.items():
                    append(data, key, value)

df = pd.DataFrame(data)
df.to_csv(f"results/r_{datetime.today().strftime('%Y%m%d_%H%M')}.csv")
df

In [None]:
import random
from sklearn.svm import LinearSVC
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import StratifiedKFold
from utils import estimator_helpers

df = datasets["COAD"]
yy = df.values[:,0]

for omics in ("__ALL__", "mutation", "cnv", "mirna", "rna"):
    X = get_X(df.iloc[:,1:], omics)

    skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=0)
    selected_index, _ = next(skf.split(X, yy))
    X, y = X[selected_index], yy[selected_index]

    for n_features in (10, 25, 100):
        C = estimator_helpers.match_C_to_number_of_features(
            LinearSVC,
            n_features,
            X, y,
            estimator_params = {'penalty':'l1', 'dual':False, 'max_iter':100000},
            max_c = 5.0
        )
        print(f"SVM {omics:10} {n_features:4} {C}")

        C = estimator_helpers.match_C_to_number_of_features(
            LogisticRegression,
            n_features,
            X, y,
            estimator_params = {'penalty':'l1', 'solver':'liblinear', 'max_iter':100000},
            max_c = 5.0
        )
        print(f"LR  {omics:10} {n_features:4} {C}")