# Notebook dédiée au test du noyau gaussien avec les différents classifieurs





## 0.Import des fonctions, créations des fonctions utiles et des classifieurs

In [1]:
import numpy as np 
import pandas as pd 
import numexpr as ne
from scipy.stats import uniform
from scipy.linalg import solve,lstsq
from sklearn.pipeline import Pipeline,make_pipeline
from sklearn.model_selection import RandomizedSearchCV,GridSearchCV,StratifiedKFold
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.metrics import accuracy_score
from cvxopt import matrix
from cvxopt import solvers
from tqdm import tqdm
from itertools import product,compress

### 0.1 Kernel Ridge Regression

In [2]:
def Kernel_Ridge_Regression(X_train,y_train,lbd,weight,gamma,degree,c0,k,biais,kernel):
    if kernel=="rbf":
        K=rbf_kernel(X_train,gamma)
    elif kernel=="poly":
        K=poly_kernel(X_train,X_train,degree,c0)
    elif kernel=="spectrum":
        K=spectrum_kernel(X_train,k)
    elif kernel=="precomputed":
        K=X_train
    n=K.shape[0]
    w=weight
    if not(biais):
        if isinstance(weight,bool):
            A=(K+n*lbd*np.eye(n))
            alpha=solve(A,y_train,assume_a="sym")
            return alpha
        elif isinstance(weight,str):
                w1=(y_train==1).mean()
                w0=1-w1
                w=np.where(y_train==1,w1,w0)
        wi=(1/w)
        
        A=K+n*lbd*wi*np.eye(n)
        alpha=solve(A,y_train,assume_a="sym")
     
        return alpha
    else:
        
        Kb=addbiais(K)
        
        K0=addzeros(K)
    
        if isinstance(weight,bool):
            A=(Kb.T.dot(Kb)+lbd*n*K0)
            B=Kb.T.dot(y_train)
            alpha=solve(A,B,assume_a="sym")
            return alpha
        elif isinstance(weight,str):
                w1=(y_train==1).mean()
                w0=1-w1
                w=np.where(y_train==1,w1,w0)
        W=np.diag(w)
        A=(Kb.T.dot(W.dot(Kb))+lbd*n*K0)
        B=Kb.T.dot(W.dot(y_train),assume_a="sym")
        alpha=solve(A,B)
        return alpha
        

In [3]:
class KernelRR(BaseEstimator,ClassifierMixin):
    def __init__(self,lbd=1,weight=False,gamma="auto",degree=2,c0=1,k=3,biais=False,kernel="rbf"):
        self.lbd=lbd
        self.weight=weight
        self.gamma=gamma
        self.degree=degree
        self.c0=c0
        self.k=k
        self.biais=biais
        self.kernel=kernel
    def fit(self,X,y):
        self.classes_ = np.unique(y)
        self.Xtr=X
        if isinstance(self.gamma,str) and self.kernel=="rbf":
            self.gamma=1/self.Xtr.shape[1]
        self.alpha=Kernel_Ridge_Regression(X,y,self.lbd,self.weight,self.gamma,self.degree,self.c0,self.k,self.biais,self.kernel)
        return self
    def decision_function(self,X):
        if self.kernel=="precomputed":
            return X.dot(self.alpha)
        if not(self.biais):
            if self.kernel=="rbf":
                return K_rbf_kernel(X,self.Xtr,self.gamma).dot(self.alpha) 
            elif self.kernel=="poly":
                return poly_kernel(X,self.Xtr,self.degree,self.c0).dot(self.alpha) 
            elif self.kernel=="spectrum":
                return K_spectrum_kernel(X,self.Xtr,self.k).dot(self.alpha) 
        else:
            if self.kernel=="rbf":
                return addbiais(K_rbf_kernel(X,self.Xtr,self.gamma)).dot(self.alpha)
            elif self.kernel=="poly":
                return addbiais(poly_kernel(X,self.Xtr,self.degree,self.c0)).dot(self.alpha)
            elif self.kernel=="spectrum":
                return addbiais(K_spectrum_kernel(X,self.Xtr,self.k)).dot(self.alpha)

    def predict(self,X,y=None):
        scores=self.decision_function(X)
        if len(scores.shape) == 1:
            indices = (scores > 0).astype(np.int)
        else:
            indices = scores.argmax(axis=1)
        return self.classes_[indices]
   
    def get_params(self, deep=True):
    
        return {"lbd": self.lbd,"weight":self.weight,"gamma":self.gamma,"degree":self.degree,"c0":self.c0,"k":self.k,
                "biais":self.biais,"kernel":self.kernel}

    def set_params(self, **parameters):
        for parameter, value in parameters.items():
            setattr(self, parameter, value)
        return self

### 0.2 Kernel Logistic Regression

In [4]:
def sigmoid(v):
    return 1/(1+np.exp(-v))
def log_loss(v):
    return np.log(1+np.exp(-v))


In [5]:
def IRLS(X_train,y_train,lbd,ga,degree,c0,k,bs,ker,n_iter,eps=10**-6,method='slow'):
    n=y_train.shape[0]
  
    if ker=="rbf":
        K=rbf_kernel(X_train,ga)
    elif ker=="poly":
        K=poly_kernel(X_train,X_train,degree,c0)
    elif ker=="spectrum":
        K=spectrum_kernel(X_train,k)
    elif ker=="precomputed":
        K=X_train
    #alpha=Kernel_Ridge_Regression(K,y_train,lbd,False,1,bs,"precomputed")
    #alpha=np.zeros(n)
    #l=[]
  
    if bs :
        Kb=addbiais(K)
        K0=addzeros(K)
        alpha=np.zeros(n+1)
    else:
        alpha=np.zeros(n)
    for i in range(n_iter):
   
        alpha_old=alpha
       
        if bs:
            m=Kb.dot(alpha)
            #l.append(log_loss(y_train*m).mean()+lbd*alpha[:-1].dot(K.dot(alpha[:-1])))
        
        else:
            m=K.dot(alpha)
            #l.append(log_loss(y_train*m).mean()+lbd*alpha.dot(m))
        
        
        p=sigmoid(m)
       
        weight=p*(1-p)
       
        weight=np.where(weight<0.000001,0.000001,weight)
       
   
        u=np.where(sigmoid(y_train*m)<0.000001,0.000001,sigmoid(y_train*m))
        z = m + y_train/u
    
        if not(bs):
        
            S = np.diag(weight**-1)
            A=(K+2*lbd*n*S)
            alpha=solve(A,z,assume_a="sym")
            
            #print(np.linalg.norm(alpha_old-alpha))
            
            if np.linalg.norm(alpha_old-alpha)<eps:
                break
        else:
            S = np.diag(weight)
            A=(Kb.T.dot(S.dot(Kb))+2*lbd*n*K0)
            B=Kb.T.dot(S.dot(z))
            if method=="slow":
                alpha=lstsq(A,B)[0]
            else:
                alpha=solve(A,B,assume_a="sym")
            #print(np.linalg.norm(alpha_old-alpha))
            if np.linalg.norm(alpha_old-alpha)<eps:
                break
                
       
    return alpha #,l
        

In [6]:
class KernelLR(BaseEstimator,ClassifierMixin):
    def __init__(self,lbd=1,gamma='auto',degree=2,c0=1,k=3,biais=False,kernel="rbf",n_iter=15,method="slow"):
        self.lbd=lbd
        self.gamma=gamma
        self.degree=degree
        self.c0=c0
        self.k=k
        self.biais=biais
        self.kernel=kernel
        self.n_iter=n_iter
        self.method=method
    def fit(self,X,y):
        self.classes_ = np.unique(y)
        self.Xtr=X
        if isinstance(self.gamma,str) and self.kernel=="rbf":
            self.gamma=1/self.Xtr.shape[1]
        self.alpha=IRLS(X,y,self.lbd,self.gamma,self.degree,self.c0,self.k,self.biais,self.kernel,self.n_iter,method=self.method)
        return self
    def decision_function(self,X):
        if not(self.biais):
            if self.kernel=="precomputed":
                return X.dot(self.alpha)
            if self.kernel=="rbf":
                return K_rbf_kernel(X,self.Xtr,self.gamma).dot(self.alpha) 
            elif self.kernel=="poly":
                return poly_kernel(X,self.Xtr,self.degree,self.c0).dot(self.alpha) 
            elif self.kernel=="spectrum":
                return K_spectrum_kernel(X,self.Xtr,self.k).dot(self.alpha) 
        else:
            if self.kernel=="rbf":
                return addbiais(K_rbf_kernel(X,self.Xtr,self.gamma)).dot(self.alpha)
            elif self.kernel=="poly":
                return addbiais(poly_kernel(X,self.Xtr,self.degree,self.c0)).dot(self.alpha)
            elif self.kernel=="spectrum":
                return addbiais(K_spectrum_kernel(X,self.Xtr,self.k)).dot(self.alpha)
        

    def predict(self,X,y=None):
        scores=self.decision_function(X)
        if len(scores.shape) == 1:
            indices = (scores > 0).astype(np.int)
        else:
            indices = scores.argmax(axis=1)
        return self.classes_[indices]
    def predict_proba_(self,X,y=None):
        p=sigmoid(self.decision_function(X)).reshape(-1,1)
        return hstack((p,1-p))
    def get_params(self, deep=True):
    
        return {"lbd": self.lbd,"gamma":self.gamma,"degree":self.degree,"c0":self.c0,"k":self.k,"biais":self.biais,
                "kernel":self.kernel,"n_iter":self.n_iter,"method":self.method}

    def set_params(self, **parameters):
        for parameter, value in parameters.items():
            setattr(self, parameter, value)
        return self

### 0.3 Kernel SVM

In [7]:
def SVM(X_train,y_train,C,gamma,degree,c0,k,kernel):
    n=y_train.shape[0]
    if kernel=="rbf":
        K=rbf_kernel(X_train,gamma)
    elif kernel=="poly":
        K=poly_kernel(X_train,X_train,degree,c0)
    elif kernel=="spectrum":
        K=spectrum_kernel(X_train,k)
    elif kernel=="precomputed":
        K=X_train
    P=matrix(K,tc='d')
    q=matrix(-y_train,tc='d')
    g1=np.diag(y_train)
    G=matrix(np.vstack((g1,-g1)),tc='d')
    h=matrix(np.hstack((np.repeat(C,n),np.zeros(n))),tc='d')
    solvers.options['show_progress'] = False
    sol=solvers.qp(P,q,G,h)
    return np.array(sol['x']).reshape(-1,)

In [8]:
class KernelSVM(BaseEstimator,ClassifierMixin):
    def __init__(self,C=1,gamma='auto',degree="2",c0=1,k=3,kernel="rbf"):
        self.C=C
        self.gamma=gamma
        self.degree=degree
        self.c0=c0
        self.k=k
        self.kernel=kernel
    def fit(self,X,y):
        self.classes_ = np.unique(y)
        self.Xtr=X
        if isinstance(self.gamma,str) and self.kernel=="rbf":
            self.gamma=1/self.Xtr.shape[1]
        self.alpha=SVM(X,y,self.C,self.gamma,self.degree,self.c0,self.k,self.kernel)
        #idx=self.alpha>10**-5
        #self.Xtr=self.Xtr[idx]
        #self.alpha=self.alpha[idx]
        return self
    def decision_function(self,X):
        if self.kernel=="precomputed":
            return X.dot(self.alpha)
        elif self.kernel=="rbf":
            return K_rbf_kernel(X,self.Xtr,self.gamma).dot(self.alpha) 
        elif self.kernel=="poly":
            return poly_kernel(X,self.Xtr,self.degree,self.c0).dot(self.alpha) 
        elif self.kernel=="spectrum":
            return K_spectrum_kernel(X,self.Xtr,self.k).dot(self.alpha) 
            
           
    def predict(self,X,y=None):
        scores=self.decision_function(X)
        if len(scores.shape) == 1:
            indices = (scores > 0).astype(np.int)
        else:
            indices = scores.argmax(axis=1)
        return self.classes_[indices]
    def get_params(self, deep=True):
    
        return {"C": self.C,"gamma":self.gamma,"degree":self.degree,"c0":self.c0,"k":self.k,"kernel":self.kernel}

    def set_params(self, **parameters):
        for parameter, value in parameters.items():
            setattr(self, parameter, value)
        return self

### 0.4 Fonctions utiles 

In [None]:
def Cross_val_spectrum(dic_K,Y_train,model,hps,cv=5):
    CV=StratifiedKFold(cv)
    dic_K_prime={k:dic_K[k] for k in hps["k"]}
    list_hp=[]
    list_val_score=[]
    idx_k=list(hps.keys()).index("k")
    
    for i in tqdm(product(*hps.values())):
        
        dic_hp={keys:values for keys,values in zip(hps.keys(),i)}
        list_hp.append(dic_hp)
        
        model.set_params(**dic_hp)
        acc_mean=0
        for train_idx,val_idx in CV.split(X_train,Y_train):
            K=dic_K_prime[i[idx_k]]
            model.fit(K[train_idx][:,train_idx],Y_train[train_idx])
            Y_pred=model.predict(K[val_idx][:,train_idx])
            acc_mean+=accuracy_score(Y_train[val_idx],Y_pred)
        list_val_score.append(acc_mean/cv)
    return {"params":list_hp,"mean_test_score":np.array(list_val_score),"rank_test_score":rankdata(list_val_score,method='min')}
            

In [None]:
def Randomized_Cross_val_spectrum(dic_K,Y_train,model,hps,n_iter,cv=5):
    CV=StratifiedKFold(cv)
    dic_K_prime={k:dic_K[k] for k in hps["k"]}
    list_hp=[]
    list_val_score=[]
    idx_k=list(hps.keys()).index("k")
    for keys,values in hps.items():
        if isinstance(values,scipy.stats._distn_infrastructure.rv_frozen):
            hps[keys]=values.rvs(size=n_iter)
        else:
            hps[keys]=np.random.choice(values,n_iter)
    print(hps)
    for i in tqdm(zip(*hps.values())):
        
        dic_hp={keys:values for keys,values in zip(hps.keys(),i)}
        list_hp.append(dic_hp)
        
        model.set_params(**dic_hp)
        acc_mean=0
        for train_idx,val_idx in CV.split(X_train,Y_train):
            K=dic_K_prime[i[idx_k]]
            model.fit(K[train_idx][:,train_idx],Y_train[train_idx])
            Y_pred=model.predict(K[val_idx][:,train_idx])
            acc_mean+=accuracy_score(Y_train[val_idx],Y_pred)
        list_val_score.append(acc_mean/cv)
    return {"params":list_hp,"mean_test_score":np.array(list_val_score),"rank_test_score":rankdata(list_val_score,method='min')}
            

In [9]:
def présentation_résultat2(search,n):
    mask=search['rank_test_score']<=n
    params=list(compress(search['params'], list(mask)))
    mean_test_score=search['mean_test_score'][mask]
    a={}
    for i in range(mean_test_score.size):
        k=''
        for key, value in params[i].items():
            k+=" "+key+" "+str(value)
        a.update({k:mean_test_score[i]})
        sortedDict = sorted(a.items(), key=lambda x: x[1],reverse=True)
    l=[]
    for i in sortedDict:
        u=i[0].split(sep=' ')
        del(u[0])
        lp=[]
        for j in u[1::2]:
            lp.append(j)
        lp.append(i[1])
        l.append(lp)
    head=list(params[0].keys())+["mean_test_score"]

    return(pd.DataFrame(l,columns=head))
        

In [10]:
def addbiais(X):
    return np.hstack((X,np.ones((X.shape[0],1))))
def addzeros(X):
    n,_=X.shape
    A=np.zeros((n+1,n+1))
    A[:n,:n]=X
    return(A)

In [None]:
def csv_file_string_kernel(models,filename): #models is a list of 3 models
    Y_test=np.empty(0)
    for K_train, Y_train, K_test_train, model  in zip(list_K_train, [Y_0train,Y_1train,Y_2train], list_K_train_test, models):
        model.fit(K_train, Y_train)
        Y_pred=model.predict(K_test_train)
        Y_test=np.concatenate((Y_test,np.where(Y_pred==-1,0,Y_pred)), axis=0)
    
    Y_test=Y_test.reshape(len(Y_test),1)
    
    ids=np.arange(Y_test.shape[0])
    ids=ids.reshape(len(ids),1)
    
    df=pd.DataFrame(data=np.concatenate((ids,Y_test), axis=1), columns=['Id','Bound'],dtype=np.int)
    
    return df.to_csv('Predictions/'+filename, index = False, header=True)

### 0.5 Kernel spectrum 

In [None]:
def K_spectrum_kernel(X,Y,k,alphabet="ACGT"):
    voc=product(alphabet, repeat=k)
    voc=[''.join(elt) for elt in voc]
    phi_X=np.vstack(X.apply(lambda x: [x[i:i+k] for i in range(0, len(x)-k+1)]).apply(lambda x: np.array([x.count(v) for v in voc])).to_numpy())
    phi_Y=np.vstack(Y.apply(lambda x: [x[i:i+k] for i in range(0, len(x)-k+1)]).apply(lambda x: np.array([x.count(v) for v in voc])).to_numpy())
    return phi_X.dot(phi_Y.T)
def spectrum_kernel(X,k,alphabet="ACGT"):
    voc=product(alphabet, repeat=k)
    voc=[''.join(elt) for elt in voc]
    phi_X=np.vstack(X.apply(lambda x: [x[i:i+k] for i in range(0, len(x)-k+1)]).apply(lambda x: np.array([x.count(v) for v in voc])).to_numpy())
   
    return phi_X.dot(phi_X.T)

## 1.Test des différents classifieurs

### 1.0 Import des données et précomputation du kernel !!! (étape hyper importante) 

In [None]:
X_0train=pd.read_csv('Data/Xtr0.csv', sep=',')['seq']
X_1train=pd.read_csv('Data/Xtr1.csv', sep=',')['seq']
X_2train=pd.read_csv('Data/Xtr2.csv', sep=',')['seq']

X_0test=pd.read_csv('Data/Xte0.csv', sep=',')['seq']
X_1test=pd.read_csv('Data/Xte1.csv', sep=',')['seq']
X_2test=pd.read_csv('Data/Xte2.csv', sep=',')['seq']

Y_0=pd.read_csv("Data/Ytr0.csv",sep=',')
Y_1=pd.read_csv("Data/Ytr1.csv",sep=',')
Y_2=pd.read_csv("Data/Ytr2.csv",sep=',')

Y_0train=np.where(Y_0["Bound"]==0,-1,Y_0["Bound"])
Y_1train=np.where(Y_1["Bound"]==0,-1,Y_1["Bound"])
Y_2train=np.where(Y_2["Bound"]==0,-1,Y_2["Bound"])

In [None]:
list_k=[2,3,4,5,6,7,8]
dic_K0={k:spectrum_kernel(X_0train,k) for k in tqdm(list_k)}
dic_K1={k:spectrum_kernel(X_1train,k) for k in tqdm(list_k)}
dic_K2={k:spectrum_kernel(X_2train,k) for k in tqdm(list_k)}

### 1.1 Tests de Kernel Ridge Regression

### 1.2 Tests de Kernel Logistic Regression

### 1.3 Test de Kernel SVM

In [None]:
KSVM=KernelSVM(kernel="precomputed")
print(KSVM)

hps0={'C':[10**-2,10**-1,1,10,50,100,500,1000],'k':[2,3,4,5,6,7,8]}
searchKSVM0= Cross_val_spectrum(dic_K0,Y_0train,KSVM,hps0)


hps1={'C':[10**-2,10**-1,1,10,50,100,500,1000],'k':[2,3,4,5,6,7,8]}
searchKSVM1= Cross_val_spectrum(dic_K1,Y_1train,KSVM,hps1)



hps2={'C':[10**-2,10**-1,1,10,50,100,500,1000],'k':[2,3,4,5,6,7,8]}
searchKSVM2= Cross_val_spectrum(dic_K2,Y_2train,KSVM,hps2)

In [None]:
présentation_résultat2(searchKSVM0,10)

In [None]:
présentation_résultat2(searchKSVM1,10)

In [None]:
présentation_résultat2(searchKSVM2,10)

In [None]:
hps0={'C':uniform(loc=0.005,scale=0.5),'k':[8]}
searchKSVM0= Randomized_Cross_val_spectrum(dic_K0,Y_0train,KSVM,hps0,50)


hps1={'C':uniform(loc=0.005,scale=0.5),'k':[5]}
searchKSVM1= Randomized_Cross_val_spectrum(dic_K1,Y_1train,KSVM,hps1,50)



hps2={'C':uniform(loc=0.005,scale=0.5),'k':[7]}
searchKSVM2= Randomized_Cross_val_spectrum(dic_K2,Y_2train,KSVM,hps2,50)


In [None]:
présentation_résultat2(searchKSVM0,10)

In [None]:
présentation_résultat2(searchKSVM1,10)

In [None]:
présentation_résultat2(searchKSVM2,10)

# 2 Elaboration du model final et création du fichier csv

### 2.1 Precomputation du kernel test_train selon le modèle choisi

In [None]:
list_k_chosen=[8,5,7]
model0=KernelSVM(C=0.01,k=list_k_chosen[0],kernel="precomputed")
model1=KernelSVM(C=0.01,k=list_k_chosen[1],kernel="precomputed")
model2=KernelSVM(C=0.01,k=list_k_chosen[2],kernel="precomputed")
list_K_train=[dic_K0[list_k_chosen[0]],dic_K1[list_k_chosen[1]],dic_K2[list_k_chosen[2]]]
list_K_train_test=[K_spectrum_kernel(X_test,X_train,k) for X_test,X_train,k in tqdm(zip([X_0test,X_1test,X_2test],[X_0train,X_1train,X_2train],list_k_chosen))]




In [None]:
csv_file([model0,model1,model2],filename)