In [None]:
import pandas as pd
import numpy as np
import random
import time
from scipy import stats
import pyts
import sklearn
from pyts.classification import KNeighborsClassifier
from pyts.datasets import load_gunpoint
from sktime.distances import distance
from statsmodels.tsa.api import ExponentialSmoothing, SimpleExpSmoothing, Holt
from pyts.classification import TimeSeriesForest
from pyts.classification import BOSSVS
from pyts.classification import SAXVSM
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix
from sklearn.metrics import classification_report
from sklearn import svm
from tslearn.svm import TimeSeriesSVC
import pickle
import matplotlib.pyplot as plt
from sklearn.model_selection import GridSearchCV
from rpy2.robjects.packages import importr
from rpy2.robjects import pandas2ri
import rpy2.robjects.packages as rpackages
import rpy2.robjects as robjects
from pyts import metrics
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import warnings

In [None]:
pandas2ri.activate()
utils = rpackages.importr('utils')s
utils.chooseCRANmirror(ind=1)
Tdist = importr('TSdist')
warnings.filterwarnings("ignore")

In [None]:
class Classification_Procedure():

    def __init__(self,path, X_train, X_test, y_train, y_test,splits_number=0):
        
        self.path=path
        
        self.splits_number=splits_number
        
        self.X_train, self.X_test, self.y_train, self.y_test = X_train, X_test, y_train, y_test
        
        self.X= np.concatenate((X_train, X_test), axis=0)
        self.y= np.concatenate((y_train, y_test), axis=0)
 

    def euclidean(self, data, parameters=(1,False)):
    
        n_neighbors =parameters[0] 
        tuning=parameters[1] 
        
        X_train, X_test, y_train, y_test = data
       
        if tuning==True:     
            param_grid = {'n_neighbors': [1, 2, 3, 4, 5],'p':[2]}  
            cf= self.parameter_tuning(X_train, X_test, y_train, y_test, pyts.classification.KNeighborsClassifier(), param_grid)
         
        else:
        
            clf = pyts.classification.KNeighborsClassifier(n_neighbors ,p=2)
            clf.fit(X_train, y_train)
            preds=clf.predict(X_test)
            cf=[y_test, preds]
            
        
        return cf
        
        

    def tsf(self, data, parameters=(500,1, 1, False)):
      
        n_estimators=parameters[0]
        n_windows=parameters[1]
        min_window_size=parameters[2]
        tuning=parameters[3] 
        
        X_train, X_test, y_train, y_test = data
        
        if tuning==True:     
            param_grid = {'n_estimators': [300,400,500,600,1000],'n_windows':[1], 'min_window_size': [1,3,5,10], 'random_state':[1]}  
            cf= self.parameter_tuning(X_train, X_test, y_train, y_test, TimeSeriesForest(), param_grid)
            
        else:
            
            clf = TimeSeriesForest(n_estimators=parameters[0],n_windows=parameters[1],min_window_size=parameters[2],random_state=1)
            clf.fit(X_train, y_train)
        
            preds=clf.predict(X_test)
            cf=[y_test, preds]
        
        return cf


    def dtw(self, data, parameters=(1,False)):
      
        
        n_neighbors =parameters[0] 
        tuning=parameters[1] 
        
        X_train, X_test, y_train, y_test = data
        
        if tuning==True:     
            param_grid = {'n_neighbors': [1, 2, 3, 4, 5],'metric':['dtw']}  
            cf= self.parameter_tuning(X_train, X_test, y_train, y_test, pyts.classification.KNeighborsClassifier(), param_grid)
         
        else:
        
            clf = pyts.classification.KNeighborsClassifier(n_neighbors ,metric='dtw')
            clf.fit(X_train, y_train)
            preds=clf.predict(X_test)
            cf=[y_test, preds]
        
        return cf
    
    
    
    def ciddtw(self,data,parameters=(1,False)):
   
        def mydist(Q, C):
            
            CE_Q = np.sqrt(np.sum(np.diff(Q)**2))
            CE_C= np.sqrt(np.sum(np.diff(C)**2))
            return  pyts.metrics.dtw(Q,C)*(np.maximum(CE_Q,CE_C)/np.minimum(CE_Q,CE_C))
    
        n_neighbors =parameters[0] 
        tuning=parameters[1] 
        
        X_train, X_test, y_train, y_test = data
        
        if tuning==True:     
            param_grid = {'n_neighbors': [1, 2, 3, 4, 5],'metric':[mydist]}  
            cf= self.parameter_tuning(X_train, X_test, y_train, y_test, pyts.classification.KNeighborsClassifier(), param_grid)
         
        else:
        
            clf = pyts.classification.KNeighborsClassifier(n_neighbors ,metric=mydist)
            clf.fit(X_train, y_train)
            preds=clf.predict(X_test)
            cf=[y_test, preds]
            
        
        return cf
    
    def msm(self, data , parameters=(1,False)):
      
        X_train, X_test, y_train, y_test = data
        
        n_neighbors = parameters[0] 
        tuning = parameters[1] 
        
        def mydist(x, y):
            return distance(x, y, metric='msm')
        
             
        if tuning==True:     
            param_grid = {'n_neighbors': [1, 2, 3, 4, 5],'metric':[mydist]}  
            cf= self.parameter_tuning(X_train, X_test, y_train, y_test, pyts.classification.KNeighborsClassifier(), param_grid)
         
        else:
        
            clf = pyts.classification.KNeighborsClassifier(n_neighbors , metric=mydist)
            clf.fit(X_train, y_train)
            preds=clf.predict(X_test)
            cf=[y_test, preds]
            
        
        return cf   
    
    
    def svm_ndtw(self,data,parameters=(1,False)):
      
        X_train, X_test, y_train, y_test = data
        
        C = parameters[0] 
        tuning = parameters[1]
        
        def NDTW_kernel(X1,X2):
            return np.array([[-pyts.metrics.dtw(_x1,_x2) for _x2 in X2] for _x1 in X1])
        
        
        clf = svm.SVC(kernel=NDTW_kernel, C=C)
        clf.fit(X_train, y_train)
        preds=clf.predict(X_test)
        cf=[y_test, preds]
        
        return cf
        
    
    def svm_gdtw(self, data, parameters=(1,2,False)):
         
        X_train, X_test, y_train, y_test = data
        
        sigma = parameters[0] 
        C = parameters[1] 
        tuning = parameters[2]
        
        def build_kernel_GDTW(sigma):
            def GDTW_kernel(X1,X2):
                return np.array([[np.exp(-pyts.metrics.dtw(_x1,_x2)**2/sigma**2) for _x2 in X2] for _x1 in X1])
            return GDTW_kernel
        
        
        clf = svm.SVC(kernel=build_kernel_GDTW(sigma), C=C)
        clf.fit(X_train, y_train)
        preds=clf.predict(X_test)
        cf=[y_test, preds]
        return cf
            
    
    def svm_gak(self,data,parameters=(2,1,False)):
        
        X_train, X_test, y_train, y_test = data
        
        gamma = parameters[0] 
        C = parameters[1] 
        tuning = parameters[2]
        
        if tuning==True:     
            param_grid = {'C': [0.1, 1, 10, 100], 'gamma': [2, 1, 0.1, 0.01]}  
            cf= self.parameter_tuning(X_train, X_test, y_train, y_test, TimeSeriesSVC(), param_grid)
         
        else:
        
            clf = TimeSeriesSVC(C=C,kernel='gak', gamma=gamma)
            clf.fit(X_train, y_train)
            preds=clf.predict(X_test)
            cf=[y_test, preds]
            
        
        return cf   

        
    def svm_rbf(self,data, parameters=(1,2,False)):
        
        X_train, X_test, y_train, y_test = data
        
        gamma = parameters[0] 
        C = parameters[1] 
        tuning = parameters[2]
        
        
        if tuning==True:     
            param_grid = {'C': [0.1, 1, 10, 100, 1000], 'gamma': [2, 1, 0.1, 0.01, 0.001],'kernel':['rbf']}  
            cf= self.parameter_tuning(X_train, X_test, y_train, y_test, TimeSeriesSVC(), param_grid)
         
        else:
        
            clf = TimeSeriesSVC(C=C, kernel='rbf', gamma=gamma)
            clf.fit(X_train, y_train)
            preds=clf.predict(X_test)
            cf=[y_test, preds]
             
        return cf   
    
    def bossvs(self, data, parameters=(4, 4, 10, False)):

        word_size =parameters[0] 
        n_bins = parameters[1] 
        window_size= parameters[2] 
        tuning=parameters[3] 
        
        X_train, X_test, y_train, y_test = data
        
        if tuning==True:     
            param_grid = {'word_size': [3,4,5,6],'n_bins':[4],'window_size':[10,15,20]}  
            cf= self.parameter_tuning(X_train, X_test, y_train, y_test, BOSSVS(), param_grid)
         
        else:
        
            clf = BOSSVS(word_size, n_bins, window_size)
            clf.fit(X_train, y_train)
            preds=clf.predict(X_test)
            cf=[y_test, preds]
            
        
        return cf
    
    def saxvsm(self, data, parameters=(0.5,0.5,4,False)):
        
        window_size =parameters[0] 
        word_size = parameters[1] 
        n_bins= parameters[2] 
        tuning=parameters[3] 

        
        X_train, X_test, y_train, y_test = data
        
        if tuning==True:     
            param_grid = {'window_size': [0.3,0.4,0.5,0.6],'word_size':[0.2,0.3,0.4,0.5],'n_bins':[4],'strategy':['normal']}
            cf= self.parameter_tuning(X_train, X_test, y_train, y_test, SAXVSM(), param_grid)
         
        else:
        
            clf = SAXVSM(window_size, word_size, n_bins, strategy='normal')
            clf.fit(X_train, y_train)
            preds=clf.predict(X_test)
            cf=[y_test, preds]
            
        return cf
        
        
    def characteristic_based(self, data, parameters=(1,False)):
        
        n_neighbors =parameters[0] 
        tuning=parameters[1] 
        
        X_train, X_test, y_train, y_test = data
        
        if tuning==True:     
            param_grid = {'n_neighbors': [1, 2, 3, 4, 5],'p':[2]}  
            cf= self.parameter_tuning(X_train, X_test, y_train, y_test, sklearn.neighbors.KNeighborsClassifier(), param_grid)
         
        else:
        
            clf = sklearn.neighbors.KNeighborsClassifier(n_neighbors ,p=2)
            clf.fit(X_train, y_train)
            preds=clf.predict(X_test)
            cf=[y_test, preds]
        
        return cf
        
        
    def acf(self,data,parameters=(1,False)):
     
        X_train, X_test, y_train, y_test = data
        
        n_neighbors = parameters[0] 
        tuning = parameters[1] 
        
        def mydist(x, y):
            
            x1=pd.Series(x)
            x2=pd.Series(y)

            dfx=pd.to_numeric(x1)
            dfy=pd.to_numeric(x2)

            return Tdist.ACFDistance(dfx,dfy)[0]
        
             
        if tuning==True:     
            param_grid = {'n_neighbors': [1, 2, 3, 4, 5],'metric':[mydist]}  
            cf= self.parameter_tuning(X_train, X_test, y_train, y_test, pyts.classification.KNeighborsClassifier(), param_grid)
         
        else:
        
            clf = pyts.classification.KNeighborsClassifier(n_neighbors , metric=mydist)
            clf.fit(X_train, y_train)
            preds=clf.predict(X_test)
            cf=[y_test, preds]
              
        return cf
        

    
    def classification(self, method, params=None):
        
        self.results=[]
        st = time.process_time()
        st1 = time.time()
    

        for i in range(0,self.splits_number):
            data=self.datasets[i]
            m = getattr(self, method)
            if params==None:
                self.results.append(m(data)) 
            else:
                self.results.append(m(data,params)) 
         
            et = time.process_time()
            res = et - st
            et1 = time.time()
            res1 = et1 - st1
        
        self.time_results= ['Execution time', res1 ,'CPU Execution time', res]
        
        self.save_results(method)
    
    
        return self.results
    

    def split_data(self,test_size):
        
        self.datasets  = {key: train_test_split(self.X, self.y, test_size=test_size, random_state=key, 
                                                stratify=self.y) for key in range(1,self.splits_number)}
        
        self.datasets[0]= (self.X_train,self.X_test,self.y_train,self.y_test)
        return self.datasets
        
 

    def parameter_tuning(self, X_train, X_test, y_train, y_test, method, parameters_grid):
        
        
        grid = GridSearchCV(method, parameters_grid, refit = True) 
        grid.fit(X_train, y_train) 
        grid_predictions = grid.predict(X_test) 
        cf=[y_test, grid_predictions]
        
        return cf
    
    
    def data_transform(self, transformation, window=3):
        
        nd=self.X_train.copy()
        nd2=self.X_test.copy()
        nd3=self.X.copy()
        box_lambda=self.X_train.copy()
        
        if transformation=='standardization':
            
            scaler=sklearn.preprocessing.StandardScaler()
            
            scaler.fit(nd.T)
            X_train=scaler.transform(nd.T)
            self.X_train=X_train.T
            
            scaler.fit(nd2.T)
            X_test=scaler.transform(nd2.T)
            self.X_test=X_test.T
            
            scaler.fit(nd3.T)
            X=scaler.transform(nd3.T)
            self.X=X.T
            
        if transformation=='smoothing':
            
            l=np.shape(nd)[1]
            
            nd_pd = pd.DataFrame(nd.T)
            nd2_pd = pd.DataFrame(nd2.T)
            nd3_pd = pd.DataFrame(nd3.T)
            
            rolling = nd_pd.rolling(window,center=True)
            rolling2 = nd2_pd.rolling(window,center=True)
            rolling3 = nd3_pd.rolling(window,center=True)
            rolling_mean = rolling.mean()
            rolling_mean2 = rolling2.mean()
            rolling_mean3 = rolling3.mean()
            x=rolling_mean[int((window-1)/2):int(l-((window-1)/2))]
            x2=rolling_mean2[int((window-1)/2):int(l-((window-1)/2))]
            x3=rolling_mean3[int((window-1)/2):int(l-((window-1)/2))]
            
            x=x.T
            x2=x2.T
            x3=x3.T
            
            self.X_train=x.to_numpy()
            self.X_test=x2.to_numpy()
            self.X=x3.to_numpy()
            
        if transformation=='box_cox':
            
            negative=False
            min_list=[]
            for i in nd3:
                min_list.append(min(i))
                
            if min(min_list)<0:
                negative = True
                    
                    
            if negative == True:
            
                for i in range(0,np.shape(nd)[0]):
                    print('real szereg:',nd[i])
                    nd[i]=stats.boxcox(nd[i]-min(min_list)+1)[0]
                    box_lambda[i]=stats.boxcox(nd[i]-min(min_list)+1)[1]
                    print('box szereg:',nd[i])
                for i in range(0,np.shape(nd2)[0]):
                    nd2[i]=stats.boxcox(nd2[i]-min(min_list)+1)[0]
                
                for i in range(0,np.shape(nd3)[0]):
                    nd3[i]=stats.boxcox(nd3[i]-min(min_list)+1)[0]
            else:
                
                for i in range(0,np.shape(nd)[0]):
                    print('real szereg:',nd[i])
                    nd[i]=stats.boxcox(nd[i])[0]
                    box_lambda[i]=stats.boxcox(nd[i])[0]
                    print('box szereg:',nd[i])
                    
                for i in range(0,np.shape(nd2)[0]):
                    nd2[i]=stats.boxcox(nd2[i])[0]
                
                for i in range(0,np.shape(nd3)[0]):
                    nd3[i]=stats.boxcox(nd3[i])[0]
                
            self.X_train=nd
            self.X_test=nd2
            self.X=nd3
            
        return box_lambda 

    def save_results(self, method):

        results_data = self.results
        time_data = self.time_results
         
        with open(self.path+method+'.pickle', 'wb') as handle:
            pickle.dump(results_data, handle, protocol=pickle.HIGHEST_PROTOCOL)
            pickle.dump(time_data, handle, protocol=pickle.HIGHEST_PROTOCOL)


In [None]:
def get_results(path):
    
    with open(path, 'rb') as handle:
        result_data = pickle.load(handle)
        time_data = pickle.load(handle)
             
    accuracy_list=[]
    precision_list=[]
    recall_list=[]
    f1_list=[]
    cm_list=[]
    
    for i in result_data:
        if max(i[0])>1:
            a=accuracy_score(i[0], i[1])
            p=precision_score(i[0], i[1],average='macro')
            r=recall_score(i[0], i[1],average='macro')
            f=f1_score(i[0], i[1],average='macro')
            
        else:   
            a=accuracy_score(i[0], i[1])
            p=precision_score(i[0], i[1])
            r=recall_score(i[0], i[1])
            f=f1_score(i[0], i[1])
        
        accuracy_list.append(a)
        precision_list.append(p)
        recall_list.append(r)
        f1_list.append(f)
        
    mean_accuracy=np.mean(accuracy_list)
    deviation_accuracy=np.std(accuracy_list)
    mean_p=np.mean(precision_list)
    deviation_p=np.std(precision_list)
    mean_r=np.mean(recall_list)
    deviation_r=np.std(recall_list)
    mean_f=np.mean(f1_list)
    deviation_f=np.std(f1_list)
   
    
    return (mean_accuracy,deviation_accuracy,mean_p,deviation_p,mean_r,deviation_r,mean_f,deviation_f,time_data[1])