# Long-term Cognitive Networks for pattern classification

Long-term Cognitive Networks are trained with an inverse learning rule. In this model, the weights connecting the input neurons are coefficients of multiple regressions models while the weights connecting the temporal states with and outputs are computed using a learning method (the Moore–Penrose inverse method when no regularization is needed or the Ridge regression method when the model might overfit the data).

The syntax for the use of the LTCN classifier is compatible with scikit-learn library. 

In [1]:
!pip install ltcn

import pandas as pd
pd.options.mode.chained_assignment = None  # default='warn'
import re
import numpy as np
import random
import os

from ltcn.LTCN import LTCN

from fitter import Fitter, get_common_distributions, get_distributions
from sklearn import preprocessing
from sklearn.preprocessing import scale
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import Normalizer
from sklearn.preprocessing import OneHotEncoder
from sklearn.decomposition import PCA
from sklearn.datasets import make_friedman3, make_s_curve
from sklearn.linear_model import BayesianRidge, ElasticNet
from sklearn.model_selection import KFold, train_test_split
from sklearn.model_selection import RepeatedKFold
from sklearn.model_selection import StratifiedKFold
from sklearn.naive_bayes import GaussianNB
from sklearn.tree import DecisionTreeRegressor
from sklearn.tree import DecisionTreeClassifier # Import Decision Tree Classifier
from sklearn.neural_network import MLPClassifier

!pip install imbalanced-learn
!pip install lightgbm
!pip install wittgenstein --user
!pip install explainerdashboard --user

from imblearn.over_sampling import ADASYN
from imblearn.combine import SMOTEENN
from imblearn.combine import SMOTETomek
from sklearn.model_selection import cross_val_predict
from sklearn.model_selection import cross_val_score
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import roc_auc_score
from sklearn.metrics import cohen_kappa_score
from sklearn.metrics import accuracy_score
from sklearn.metrics import plot_confusion_matrix
from sklearn.metrics import classification_report
from sklearn.metrics import make_scorer

# for Box-Cox Transformation
from scipy import stats

# plotting modules
import seaborn as sns
import matplotlib.pyplot as plt
from matplotlib.pyplot import figure
from plotly.subplots import make_subplots
import plotly.graph_objs as go
import plotly.express as px

from tqdm import tqdm
import math

from sklearn.datasets import make_blobs
from matplotlib import pyplot
from pandas import DataFrame

#Multilabel Stratified K Fold Creation
!pip install iterative-stratification
import sys
sys.path.append('../input/iterative-stratification/iterative-stratification-master')
from iterstrat.ml_stratifiers import MultilabelStratifiedKFold





In [2]:
#Collect data from sensors (i.e., time-series data) installed in the manufacturing system 
ai4i2020_encoded_balanced=pd.read_csv(r'G:\\.shortcut-targets-by-id\\1-wapAl6N5YrCs68c4NiFKyvybXTXmdgZ\\Ph_D_Tyrovolas\\Our Papers\\3rd_Paper-Proposal\\Testbed Codes\\AI4I_Case_Study\\raw_data\\ai4i2020_encoded_balanced.csv')
ai4i2020_encoded_balanced.head()

Unnamed: 0,Type,Air temperature [K],Process temperature [K],Rotational speed [rpm],Torque [Nm],Tool wear [min],Machine failure,TWF,HDF,PWF,OSF,RNF
0,1,298.1,308.6,1551,42.8,0,0,0,0,0,0,0
1,0,298.2,308.7,1408,46.3,3,0,0,0,0,0,0
2,0,298.1,308.5,1498,49.4,5,0,0,0,0,0,0
3,0,298.2,308.6,1433,39.5,7,0,0,0,0,0,0
4,0,298.2,308.7,1408,40.0,9,0,0,0,0,0,0


In [3]:
# Run in local
# Fold index
fold = 1

original_X_train = pd.read_csv('G:\\.shortcut-targets-by-id\\1-wapAl6N5YrCs68c4NiFKyvybXTXmdgZ\\Ph_D_Tyrovolas\\Our Papers\\3rd_Paper-Proposal\\Testbed Codes\\AI4I_Case_Study\\k-fold cross validation datasets\\' + str(fold) + "\\Training Dataset" + "\\X_train_iter_" + str(fold) + ".csv")
original_X_test = pd.read_csv('G:\\.shortcut-targets-by-id\\1-wapAl6N5YrCs68c4NiFKyvybXTXmdgZ\\Ph_D_Tyrovolas\\Our Papers\\3rd_Paper-Proposal\\Testbed Codes\\AI4I_Case_Study\\k-fold cross validation datasets\\' + str(fold) + "\\Test Dataset" + "\\X_test_iter_" + str(fold) + ".csv")

X_train = pd.read_csv('G:\\.shortcut-targets-by-id\\1-wapAl6N5YrCs68c4NiFKyvybXTXmdgZ\\Ph_D_Tyrovolas\\Our Papers\\3rd_Paper-Proposal\\Testbed Codes\\AI4I_Case_Study\\k-fold cross validation datasets\\' + str(fold) + "\\Training Dataset" + "\\Scaled_X_train_iter_" + str(fold) + ".csv", names=['Type', 'Air temperature [K]', 'Process temperature [K]', 'Rotational speed [rpm]', 'Torque [Nm]', 'Tool wear [min]'])
y_train = pd.read_csv('G:\\.shortcut-targets-by-id\\1-wapAl6N5YrCs68c4NiFKyvybXTXmdgZ\\Ph_D_Tyrovolas\\Our Papers\\3rd_Paper-Proposal\\Testbed Codes\\AI4I_Case_Study\\k-fold cross validation datasets\\' + str(fold) + "\\Training Dataset" + "\\y_train_iter_" + str(fold) + ".csv")
X_test = pd.read_csv('G:\\.shortcut-targets-by-id\\1-wapAl6N5YrCs68c4NiFKyvybXTXmdgZ\\Ph_D_Tyrovolas\\Our Papers\\3rd_Paper-Proposal\\Testbed Codes\\AI4I_Case_Study\\k-fold cross validation datasets\\' + str(fold) + "\\Test Dataset" + "\\Scaled_X_test_iter_" + str(fold) + ".csv", names=['Type', 'Air temperature [K]', 'Process temperature [K]', 'Rotational speed [rpm]', 'Torque [Nm]', 'Tool wear [min]'])
y_test = pd.read_csv('G:\\.shortcut-targets-by-id\\1-wapAl6N5YrCs68c4NiFKyvybXTXmdgZ\\Ph_D_Tyrovolas\\Our Papers\\3rd_Paper-Proposal\\Testbed Codes\\AI4I_Case_Study\\k-fold cross validation datasets\\' + str(fold) + "\\Test Dataset" + "\\y_test_iter_" + str(fold) + ".csv")

In [4]:
ai4i2020_encoded_balanced = ai4i2020_encoded_balanced.rename(columns = lambda x:re.sub('[^A-Za-z0-9_]+', '', x))
original_X_train = original_X_train.rename(columns = lambda x:re.sub('[^A-Za-z0-9_]+', '', x))
original_X_test = original_X_test.rename(columns = lambda x:re.sub('[^A-Za-z0-9_]+', '', x))
X_train = X_train.rename(columns = lambda x:re.sub('[^A-Za-z0-9_]+', '', x))
X_test = X_test.rename(columns = lambda x:re.sub('[^A-Za-z0-9_]+', '', x))
y_train = y_train.rename(columns = lambda x:re.sub('[^A-Za-z0-9_]+', '', x))
y_test = y_test.rename(columns = lambda x:re.sub('[^A-Za-z0-9_]+', '', x))

In [5]:
acc_list = []
auc_list = []
kappa_list = []

In [6]:
def reset_random_seeds():
    os.environ['PYTHONHASHSEED']=str(42)
    np.random.seed(42)
    random.seed(42)

def kappa_scorer(Y_pred, y_test_numpy):
    return cohen_kappa_score(y_test_numpy,Y_pred.round())


In [7]:
labels = y_train # here we have the classes
n_features = len(X_train.columns)+len(y_train.columns) #number of features
n_classes = len(np.unique(y_train)) #number of unique classes

In [None]:
from sklearn.model_selection import ParameterGrid
from tqdm.notebook import trange, tqdm

X_train_numpy = X_train.to_numpy()
y_train_numpy = y_train.to_numpy()
X_test_numpy = X_test.to_numpy()
y_test_numpy = y_test.to_numpy()


acc_list = []
auc_list = []
kappa_list = []
twf_list = []
hdf_list = []
pwf_list = []
osf_list = []
av_succ = []
comb_list = []

# hyper-parameter tuning using grid search happens here!!
param_grid = {'function': ['sigmoid', 'tanh'], 'phi': np.arange(0.5, 1.0, 0.1), 'T': [5, 10, 15], 'alpha': [0, 1.0E-2, 1.0E+2]}

grid = ParameterGrid(param_grid)

for comb in tqdm(range(0,len(grid)), desc='Combinations'):
    
    #Build the LTCN Classifier
    model = LTCN(method='inverse')
    #display("Combination " + str(comb))
    model.__init__(**grid[comb])
    model.fit(X_train_numpy,y_train_numpy)
    
    #Predictions for the test set
    y_test_pred_ltcn = model.predict(X_test_numpy)
    y_test_pred_ltcn_final = y_test_pred_ltcn.round()
    
    #Evaluation metrics for the test dataset
    accuracy_ltcn = accuracy_score(y_test_numpy, y_test_pred_ltcn.round())
    cohen_score_ltcn = cohen_kappa_score(y_test_numpy, y_test_pred_ltcn.round())
    auc_ltcn = roc_auc_score(y_test_numpy, y_test_pred_ltcn.round())

    #Append in lists
    acc_list.append(accuracy_ltcn)
    kappa_list.append(cohen_score_ltcn)
    auc_list.append(auc_ltcn)
    
    #LTCN reasoning process
    # Z : Matrix with the neurons' last activation values.
    A = X_test_numpy
    A0 = A
    Z = A0

    for t in range(model.T):
        A = model.phi * model.transform(np.matmul(A, model.W1)) + (1 - model.phi) * A0
        Z = np.concatenate((Z, A), axis=1)
    local_feature_importance_ltcn = pd.DataFrame(A, columns=['Type', 'AirtemperatureK', 'ProcesstemperatureK', 'Rotationalspeedrpm', 'TorqueNm', 'Toolwearmin'])
    
    #Find the observations' indexes that the model correctly predicted as faulty
    true_positive_indexes = []
    i=1;

    #According to the chosen model
    #For example, y_test_pred_lgbmc for LGBM Classifier
    for k in range(0,len(y_test)):
        if (y_test.loc[k,'Machinefailure']==int(y_test_pred_ltcn_final[k])) and (y_test.loc[k,'Machinefailure']==1):
            true_positive_indexes.append(k)
            i=i+1;

    #Find the observations that the selected model correctly predicted as faulty based on the previous indexes
    true_positive_observations = pd.DataFrame()
    for k in range(0,len(true_positive_indexes)):
        new_row = pd.concat([original_X_test.loc[true_positive_indexes[k],:],y_test.loc[true_positive_indexes[k],:]], axis=0)
        true_positive_observations = true_positive_observations.append(new_row, ignore_index=True)
    
    if not true_positive_observations.empty:
        
    
        true_positive_observations_with_failure_modes = pd.DataFrame()
        true_positive_observations_with_failure_modes = ai4i2020_encoded_balanced.join(true_positive_observations.set_index(['Type', 'AirtemperatureK', 'ProcesstemperatureK', 'Rotationalspeedrpm', 'TorqueNm', 'Toolwearmin', 'Machinefailure']), ['Type', 'AirtemperatureK', 'ProcesstemperatureK', 'Rotationalspeedrpm', 'TorqueNm', 'Toolwearmin', 'Machinefailure'], how='right')       
        final_true_positive_observations_with_failure_modes = true_positive_observations_with_failure_modes.reset_index(drop=True)

    
        #display("Number of true positive predictions: " + str(len(final_true_positive_observations_with_failure_modes)))

        number_of_TWF_failures_in_true_positive_predictions = 0
        number_of_HDF_failures_in_true_positive_predictions = 0
        number_of_PWF_failures_in_true_positive_predictions = 0
        number_of_OSF_failures_in_true_positive_predictions = 0
        number_of_random_failures_in_true_positive_predictions=0

        for k in range(0,len(final_true_positive_observations_with_failure_modes)):
            if final_true_positive_observations_with_failure_modes.loc[k,"TWF"]==1: #if the failure mode is TWF
                number_of_TWF_failures_in_true_positive_predictions = number_of_TWF_failures_in_true_positive_predictions + 1;
            if final_true_positive_observations_with_failure_modes.loc[k,"HDF"]==1: #if the failure mode is HDF
                number_of_HDF_failures_in_true_positive_predictions = number_of_HDF_failures_in_true_positive_predictions + 1;
            if final_true_positive_observations_with_failure_modes.loc[k,"PWF"]==1: #if the failure mode is PWF
                number_of_PWF_failures_in_true_positive_predictions = number_of_PWF_failures_in_true_positive_predictions + 1;
            if final_true_positive_observations_with_failure_modes.loc[k,"OSF"]==1: #if the failure mode is OSF
                number_of_OSF_failures_in_true_positive_predictions = number_of_OSF_failures_in_true_positive_predictions + 1;
            if (final_true_positive_observations_with_failure_modes.loc[k,"Machinefailure"]==1) and (final_true_positive_observations_with_failure_modes.loc[k,"TWF"]==0) and (final_true_positive_observations_with_failure_modes.loc[k,"HDF"]==0) and (final_true_positive_observations_with_failure_modes.loc[k,"PWF"]==0) and (final_true_positive_observations_with_failure_modes.loc[k,"OSF"]==0) and (final_true_positive_observations_with_failure_modes.loc[k,"RNF"]==0):
                number_of_random_failures_in_true_positive_predictions = number_of_random_failures_in_true_positive_predictions+1;
    
        #Calculate the correct explanations
        correct_explanations = 0
        correct_explanations_TWF = 0
        correct_explanations_HDF = 0
        correct_explanations_PWF = 0
        correct_explanations_OSF = 0

        for k in range(0,len(true_positive_observations)):
            #if the failure mode is TWF
            if final_true_positive_observations_with_failure_modes.loc[k,"TWF"]==1:
                #if tool wear is the most important feature
                if local_feature_importance_ltcn.loc[true_positive_indexes[k],"Toolwearmin"] == max(local_feature_importance_ltcn.loc[true_positive_indexes[k],:]):
                    correct_explanations = correct_explanations+1;
                    correct_explanations_TWF = correct_explanations_TWF + 1;
                
            #if the failure mode is HDF
            if final_true_positive_observations_with_failure_modes.loc[k,"HDF"]==1: 
                maximum1 = max(local_feature_importance_ltcn.loc[true_positive_indexes[k],:]) 
                maximum2 = max(local_feature_importance_ltcn.loc[true_positive_indexes[k],:], key = lambda x: min(local_feature_importance_ltcn.loc[true_positive_indexes[k],:])-1 if (x == maximum1) else x)
                if (local_feature_importance_ltcn.loc[true_positive_indexes[k],"AirtemperatureK"] == maximum1) or (local_feature_importance_ltcn.loc[true_positive_indexes[k],"AirtemperatureK"] == maximum2) or (local_feature_importance_ltcn.loc[true_positive_indexes[k],"ProcesstemperatureK"] == maximum1) or (local_feature_importance_ltcn.loc[true_positive_indexes[k],"ProcesstemperatureK"] == maximum2):
                    correct_explanations = correct_explanations+1;
                    correct_explanations_HDF = correct_explanations_HDF + 1;
            
            
            #if the failure mode is PWF
            if final_true_positive_observations_with_failure_modes.loc[k,"PWF"]==1: 
                maximum1 = max(local_feature_importance_ltcn.loc[true_positive_indexes[k],:]) 
                maximum2 = max(local_feature_importance_ltcn.loc[true_positive_indexes[k],:], key = lambda x: min(local_feature_importance_ltcn.loc[true_positive_indexes[k],:])-1 if (x == maximum1) else x)
                if (local_feature_importance_ltcn.loc[true_positive_indexes[k],"Rotationalspeedrpm"] == maximum1) or (local_feature_importance_ltcn.loc[true_positive_indexes[k],"Rotationalspeedrpm"] == maximum2) or (local_feature_importance_ltcn.loc[true_positive_indexes[k],"TorqueNm"] == maximum1) or (local_feature_importance_ltcn.loc[true_positive_indexes[k],"TorqueNm"] == maximum2):
                    correct_explanations = correct_explanations+1;
                    correct_explanations_PWF = correct_explanations_PWF + 1;
            
            #if the failure mode is OSF        
            if final_true_positive_observations_with_failure_modes.loc[k,"OSF"]==1:
                maximum1 = max(local_feature_importance_ltcn.loc[true_positive_indexes[k],:]) 
                maximum2 = max(local_feature_importance_ltcn.loc[true_positive_indexes[k],:], key = lambda x: min(local_feature_importance_ltcn.loc[true_positive_indexes[k],:])-1 if (x == maximum1) else x)
                if (local_feature_importance_ltcn.loc[true_positive_indexes[k],"Toolwearmin"] == maximum1) or (local_feature_importance_ltcn.loc[true_positive_indexes[k],"Toolwearmin"] == maximum2) or (local_feature_importance_ltcn.loc[true_positive_indexes[k],"TorqueNm"] == maximum1) or (local_feature_importance_ltcn.loc[true_positive_indexes[k],"TorqueNm"] == maximum2):
                    correct_explanations = correct_explanations+1;
                    correct_explanations_OSF = correct_explanations_OSF + 1;
        
        denominator = 0
        
        if number_of_TWF_failures_in_true_positive_predictions != 0:
            twf_list.append(correct_explanations_TWF/number_of_TWF_failures_in_true_positive_predictions)
            twf_success = correct_explanations_TWF/number_of_TWF_failures_in_true_positive_predictions
            denominator = denominator + 1
        else:
            twf_list.append(correct_explanations_TWF)
            twf_success = 0
        if number_of_HDF_failures_in_true_positive_predictions != 0:
            hdf_list.append(correct_explanations_HDF/number_of_HDF_failures_in_true_positive_predictions)
            hdf_success = correct_explanations_HDF/number_of_HDF_failures_in_true_positive_predictions
            denominator = denominator + 1
        else:
            hdf_list.append(correct_explanations_HDF)
            hdf_success = 0
        if number_of_PWF_failures_in_true_positive_predictions != 0:
            pwf_list.append(correct_explanations_PWF/number_of_PWF_failures_in_true_positive_predictions)
            pwf_success = correct_explanations_PWF/number_of_PWF_failures_in_true_positive_predictions
            denominator = denominator + 1
        else:
            pwf_list.append(correct_explanations_PWF)
            pwf_success = 0
        if number_of_OSF_failures_in_true_positive_predictions != 0:
            osf_list.append(correct_explanations_OSF/number_of_OSF_failures_in_true_positive_predictions)
            osf_success = correct_explanations_OSF/number_of_OSF_failures_in_true_positive_predictions
            denominator = denominator + 1
        else:
            osf_list.append(correct_explanations_OSF)
            osf_success = 0
    
        average_success = (twf_success+hdf_success+pwf_success+osf_success)/denominator
        av_succ.append(average_success)
        comb_list.append(comb)
        
    
result_df = pd.DataFrame({'Accuracy': acc_list, 'AUC': auc_list, 'Kappa':kappa_list, 'TWF':twf_list, 'HDF':hdf_list, 'PWF':pwf_list, 'OSF':osf_list, 'Average Success':av_succ, 'Combinations':comb_list})
final_df = result_df.sort_values(by=['AUC'], ascending=False)
final_df = final_df.reset_index(drop=True)
best_estimator_params = grid[final_df.loc[0,"Combinations"]]
display(best_estimator_params)

Combinations:   0%|          | 0/90 [00:00<?, ?it/s]

  return linalg.solve(A, Xy, assume_a="pos", overwrite_a=True).T
  return linalg.solve(A, Xy, assume_a="pos", overwrite_a=True).T
  return linalg.solve(A, Xy, assume_a="pos", overwrite_a=True).T
  return linalg.solve(A, Xy, assume_a="pos", overwrite_a=True).T
  return linalg.solve(A, Xy, assume_a="pos", overwrite_a=True).T


In [None]:
final_df