In [None]:
import numpy as np
import pandas as pd
import sklearn
from sklearn.metrics import precision_score, accuracy_score
from tqdm import trange
import pickle
import math
import torch

In [2]:
def same_seed(seed): 
    '''Fixes random number generator seeds for reproducibility.'''
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.benchmark = False
    torch.backends.cudnn.deterministic = True
    

def amino_encode_table_6(): # key: Amino Acid, value: tensor
    df = pd.read_csv('./6-pc', sep=' ', index_col=0)
    H1 = (df['H1'] - np.mean(df['H1'])) / (np.std(df['H1'], ddof=1))
    V = (df['V'] - np.mean(df['V'])) / (np.std(df['V'], ddof=1))
    P1 = (df['P1'] - np.mean(df['P1'])) / (np.std(df['P1'], ddof=1))
    Pl = (df['Pl'] - np.mean(df['Pl'])) / (np.std(df['Pl'], ddof=1))
    PKa = (df['PKa'] - np.mean(df['PKa'])) / (np.std(df['PKa'], ddof=1))
    NCI = (df['NCI'] - np.mean(df['NCI'])) / (np.std(df['NCI'], ddof=1))
    c = np.array([H1,V,P1,Pl,PKa,NCI], dtype=np.float32)
    amino = ['A','C','D','E','F','G','H','I','K','L','M','N','P','Q','R','S','T','V','W','Y']
    table = {}
    for index,key in enumerate(amino):
        table[key] = list(c[0:6, index])
    table['X'] = [0,0,0,0,0,0]
    return table

table = amino_encode_table_6()
with open('pc6.pickle', 'wb') as handle:
    pickle.dump(table, handle, protocol=pickle.HIGHEST_PROTOCOL)

def padding_seq(original_seq, length=50, pad_value='X'):
    padded_seq = original_seq.ljust(length, pad_value)
    return padded_seq

def seq_to_features_ml(seq, conc):
    features_list = []
    for aa in seq:
        t = table[aa].copy()
        t.append(conc)
        features_list += t
    feature_tensor = np.array(features_list, dtype=np.float32)
    return feature_tensor

seed = 10902128+8403014
same_seed(seed)

In [None]:
from sklearn.model_selection import train_test_split
thresholds = [5,10,20,30,40]
for threshold in thresholds:
    df = pd.read_csv('./processed.csv',dtype={'id':int,
                                            'sequence':'string',
                                            'lysis_group':'category',
                                            'lysis_value':'float32',
                                            'concentration':'float32'})

    def pad(seq:str, length=50) -> str:
        return seq + 'X'*(length-len(seq))

    df['sequence'] = df['sequence'].apply(pad)
    df['concentration'] = df['concentration'] / 300
    df['label'] = df['lysis_value'] > threshold
    df['label'] = df['label'].astype('float32')
    
    df_pos = df[df['label'] == 1]
    df_neg = df[df['label'] == 0]
    if len(df_pos) > len(df_neg):
        df_pos = df_pos.sample(len(df_neg), random_state=seed).reset_index(drop=True)
    else:
        df_neg = df_neg.sample(len(df_pos), random_state=seed).reset_index(drop=True)

    df = pd.concat([df_pos, df_neg]).sample(frac=1, random_state=seed).reset_index(drop=True)

    train_df, test_df = train_test_split(df, test_size=0.2, shuffle=True, random_state=seed, stratify=df['label'])
    train_df_pos = train_df[train_df['label'] == 1]
    train_df_neg = train_df[train_df['label'] == 0]
    test_df_pos = test_df[test_df['label'] == 1]
    test_df_neg = test_df[test_df['label'] == 0]
    print(f'Threshold {threshold} Total:{len(df)}\n Train pos: {len(train_df_pos)}, neg: {len(train_df_neg)}\n Test pos: {len(test_df_pos)}, neg: {len(test_df_neg)}')
    Y = train_df['label'].to_numpy(dtype=int)
    X = np.array([]).reshape(0,350)
    for i, row in train_df.iterrows():
        X = np.vstack([X,seq_to_features_ml(row['sequence'], row['concentration'])])

    shuffled_test_df = test_df.copy(deep=True)
    shuffled_test_df['concentration'] = shuffled_test_df['concentration'].sample(frac=1, random_state=seed).values
    test_Y = test_df['label'].to_numpy(dtype=int)
    test_X = np.array([]).reshape(0,350)
    for i, row in test_df.iterrows():
        test_X = np.vstack([test_X,seq_to_features_ml(row['sequence'], row['concentration'])])
    shuffled_X = np.array([]).reshape(0,350)
    for i, row in shuffled_test_df.iterrows():
        shuffled_X = np.vstack([shuffled_X,seq_to_features_ml(row['sequence'], row['concentration'])])

    from sklearn import svm
    svc_clf = svm.SVC(random_state=seed, probability=True)

    from sklearn.ensemble import RandomForestClassifier
    rf_clf = RandomForestClassifier(random_state=seed)

    from sklearn.ensemble import AdaBoostClassifier
    ada_clf = AdaBoostClassifier(random_state=seed)

    from sklearn.neural_network import MLPClassifier
    mlp_clf = MLPClassifier(max_iter=5000, random_state=seed)

    from sklearn.neighbors import KNeighborsClassifier
    knn_clf = KNeighborsClassifier()

    import xgboost as xgb
    xgb_clf = xgb.XGBClassifier(random_state=seed)

    from sklearn.ensemble import VotingClassifier
    from sklearn import model_selection
    from itertools import chain, combinations
    def powerset(s :list):
        return chain.from_iterable(combinations(s, r) for r in range(len(s)+1))

    from sklearn.ensemble import StackingClassifier
    from tqdm import tqdm

    max_acc = -math.inf
    max_prec = -math.inf
    best_choices = []
    all_estimators = [('svc',svc_clf), ('rf',rf_clf), ('ada',ada_clf), ('mlp',mlp_clf), ('knn',knn_clf), ('xgb',xgb_clf)]
    for estimators in tqdm(list(powerset(all_estimators))[1:]):
        estimators = list(estimators)
        ensemble = VotingClassifier(estimators, voting='soft')
        ensemble.fit(X,Y)
        ensembled = ensemble.predict(test_X)
        if accuracy_score(test_Y, ensembled) > max_acc:
            max_acc = accuracy_score(test_Y, ensembled)
            max_prec = precision_score(test_Y, ensembled)
            best_choices = estimators
            pickle.dump(ensemble, open(f'b_{threshold}_clf.pickle', 'wb'))

    print(f'Threshold {threshold} Best acc: {max_acc}, prec: {max_prec}')
    print([x[0] for x in best_choices])
    
    ensemble = pickle.load(open(f'b_{threshold}_clf.pickle', 'rb'))
    ensemble.predict(shuffled_X)
    shuffled_acc = accuracy_score(test_Y, ensembled)
    shuffled_prec = precision_score(test_Y, ensembled)
    print(f'Shuffled: Threshold {threshold} Best acc: {shuffled_acc}, prec: {shuffled_prec}')