various time series classification methods

In [2]:
from sktime.datasets import load_UCR_UEA_dataset
import tensorflow as tf
from tensorflow import keras
import pandas as pd
import numpy as np

In [8]:
def get_datasets(name):
    X_train, y_train = load_UCR_UEA_dataset(name, split = 'train', return_type = 'numpy2d')
    X_test, y_test = load_UCR_UEA_dataset(name, split = 'test', return_type = 'numpy2d')

    labels_idx = {label: idx for idx, label in enumerate(np.unique(np.append(y_train, y_test)))}

    y_train = np.array([labels_idx[y] for y in y_train])
    y_test = np.array([labels_idx[y] for y in y_test])

    return X_train, y_train, X_test, y_test

datasets = [get_datasets('ECG5000'), get_datasets('Car'), get_datasets('ECG5000')]


In [18]:

from sklearn.model_selection import train_test_split


REPEAT_SIZE = 2
BUFFER_SIZE = 10000
BATCH_SIZE = 32


class LoggerCallback(keras.callbacks.Callback):
        def on_epoch_end(self, epoch, logs=None):
            print('\rFinished epoch ' + str(epoch), end='')

def get_callbacks(validation=True):
    return [LoggerCallback(), keras.callbacks.EarlyStopping(monitor = 'val_loss' if validation else 'loss', patience = 10, restore_best_weights = True)]

In [12]:

def build_fully_connected(input_size, output_size):
    return keras.Sequential([
        keras.layers.InputLayer(input_shape = [input_size]),
        keras.layers.Dense(300, activation = 'relu'),
        keras.layers.Dropout(0.25),
        keras.layers.Dense(100, activation = 'relu'),
        keras.layers.Dropout(0.25),
        keras.layers.Dense(50, activation = 'relu'),
        keras.layers.Dropout(0.25),
        keras.layers.Dense(20, activation = 'relu'),
        keras.layers.Dropout(0.25),
        keras.layers.Dense(output_size, activation = 'softmax')
    ])

def build_rnn(input_size, output_size):
    return keras.Sequential([
        keras.layers.SimpleRNN(64, input_shape = [input_size, 1], return_sequences = True),
        keras.layers.SimpleRNN(64),
        keras.layers.Dense(32, activation = 'relu'),
        keras.layers.Dense(output_size, activation = 'softmax')
    ])


def build_cnn(input_size, output_size):
    return keras.Sequential([
        keras.layers.InputLayer(input_shape = [input_size, 1]),
        keras.layers.Conv1D(16, 3, activation = 'relu'),
        keras.layers.MaxPool1D(2),
        keras.layers.Conv1D(32, 3, activation = 'relu'),
        keras.layers.MaxPool1D(2),
        keras.layers.Conv1D(64, 3, activation = 'relu'),
        keras.layers.Flatten(),
        keras.layers.Dense(128, activation = 'relu'),
        keras.layers.Dense(output_size, activation = 'softmax')
    ])

def build_cnn_gru(input_size, output_size):
    return keras.Sequential([
        keras.layers.InputLayer(input_shape = [input_size, 1]),
        keras.layers.Conv1D(64, 3, activation = 'relu'),
        keras.layers.MaxPool1D(2),
        keras.layers.Conv1D(64, 3, activation = 'relu'),
        keras.layers.MaxPool1D(2),
        keras.layers.GRU(20),
        keras.layers.Dense(256, activation = 'relu'),
        keras.layers.Dropout(0.25),
        keras.layers.Dense(32, activation = 'relu'),
        keras.layers.Dense(output_size, activation = 'softmax')
    ])

def evaluate(X_train, y_train, X_test, y_test):
    input_size, output_size = X_train.shape[1], np.unique(y_train).shape[0]

    X_train, X_valid, y_train, y_valid = train_test_split(X_train, y_train, test_size = 0.2, stratify = y_train, shuffle = True)
    X_train = tf.data.Dataset.from_tensor_slices((X_train, y_train)).repeat(REPEAT_SIZE).shuffle(BUFFER_SIZE).batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)
    X_valid = tf.data.Dataset.from_tensor_slices((X_valid, y_valid)).repeat(REPEAT_SIZE).shuffle(BUFFER_SIZE).batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)

    models = [
        ('Fully connected', build_fully_connected),
        ('RNN', build_rnn),
        ('CNN', build_cnn),
        ('CNN GRU', build_cnn_gru)
    ]

    scores = {}

    for name, build_model in models:
        model = build_model(input_size, output_size)
        model.compile(optimizer = 'sgd', loss = 'sparse_categorical_crossentropy', metrics = ['accuracy'])
        model.fit(X_train, validation_data = X_valid, epochs = 100, verbose = 0, callbacks = get_callbacks())
        score = model.evaluate(X_test, y_test)[1]
        
        scores[name] = score
    
    return scores

scores = []

for dataset in datasets:
    scores.append(evaluate(dataset[0], dataset[1], dataset[2], dataset[3]))

for model in scores[0].keys():
    model_scores = [s[model] for s in scores]
    print(model + ' average score ' + str(np.average(model_scores)))

Fully connected average score 0.8573333223660787
RNN average score 0.6925185223420461
CNN average score 0.8364444375038147
CNN GRU average score 0.6971851785977682


In [34]:
from sktime.classification.deep_learning.cnn import CNNClassifier

def build_cnn_classifier(input_size, output_size):
    return CNNClassifier(callbacks = get_callbacks(validation = False), n_epochs = 100)

def build_mlp(input_size, output_size):
    return keras.models.Sequential([
        keras.layers.InputLayer(input_shape = input_size),
        keras.layers.Dropout(0.1),
        keras.layers.Dense(500, activation = 'relu'),
        keras.layers.Dropout(0.2),
        keras.layers.Dense(500, activation = 'relu'),
        keras.layers.Dropout(0.2),
        keras.layers.Dense(500, activation = 'relu'),
        keras.layers.Dropout(0.3),
        keras.layers.Dense(output_size, activation = 'softmax')
    ])

def build_fcn(input_size, output_size):
    return keras.models.Sequential([
        keras.layers.InputLayer(input_shape = [input_size, 1]),
        keras.layers.Conv1D(128, 8),
        keras.layers.BatchNormalization(),
        keras.layers.ReLU(),
        keras.layers.Conv1D(256, 5),
        keras.layers.BatchNormalization(),
        keras.layers.ReLU(),
        keras.layers.Conv1D(128, 3),
        keras.layers.BatchNormalization(),
        keras.layers.ReLU(),
        keras.layers.GlobalAveragePooling1D(),
        keras.layers.Dense(output_size, activation = 'softmax')
    ])

def evaluate2(X_train, y_train, X_test, y_test):
    input_size, output_size = X_train.shape[1], np.unique(y_train).shape[0]

    X_train, X_valid, y_train, y_valid = train_test_split(X_train, y_train, test_size = 0.2, stratify = y_train, shuffle = True)

    models = [
        ('CNN Classifier', build_cnn_classifier),
        ('MLP', build_mlp),
        ('FCN', build_fcn)
    ]

    scores = {}

    converted_to_tf = False

    for name, build_model in models:

        model = build_model(input_size, output_size)

        if not converted_to_tf and (name == 'MLP' or name == 'FCN'):
            X_train = tf.data.Dataset.from_tensor_slices((X_train, y_train)).repeat(REPEAT_SIZE).shuffle(BUFFER_SIZE).batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)
            X_valid = tf.data.Dataset.from_tensor_slices((X_valid, y_valid)).repeat(REPEAT_SIZE).shuffle(BUFFER_SIZE).batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)
            converted_to_tf = True
        
        if name == 'CNN Classifier':
            model.fit(X_train, y_train)
            
            score = model.score(X_test, y_test)
        elif name == 'MLP':
            model.compile(optimizer = keras.optimizers.Adadelta(learning_rate = 0.1, epsilon = 1e-8), loss = 'sparse_categorical_crossentropy', metrics = ['accuracy'])
            model.fit(X_train, validation_data = X_valid, epochs = 100, verbose = 0, callbacks = get_callbacks())
            score = model.evaluate(X_test, y_test)[1]
        elif name == 'FCN':
            model.compile(optimizer = keras.optimizers.Adam(epsilon = 1e-8), loss = 'sparse_categorical_crossentropy', metrics = ['accuracy'])
            model.fit(X_train, validation_data = X_valid, epochs = 100, verbose = 0, callbacks = get_callbacks())
            score = model.evaluate(X_test, y_test)[1]
        
        scores[name] = score

    return scores

scores = []

for dataset in datasets:
    scores.append(evaluate2(dataset[0], dataset[1], dataset[2], dataset[3]))

for model in scores[0].keys():
    model_scores = [s[model] for s in scores]
    print(model + ' scores ' + str(model_scores))

















CNN Classifier scores [0.9266666666666666, 0.6833333333333333, 0.9362222222222222]
MLP scores [0.9340000152587891, 0.699999988079071, 0.9404444694519043]
FCN scores [0.3535555601119995, 0.21666666865348816, 0.35422220826148987]



- Bi-Direction LSTM and CNN networks separately


In [35]:
from sktime.transformations.series.detrend import Detrender
from sktime.transformations.series.exponent import ExponentTransformer

def build_lstm_cnn(input_size, output_size):
    inputs = keras.layers.Input(shape = [input_size, 1])
    lstm = keras.layers.Bidirectional(keras.layers.LSTM(30, return_sequences = True))(inputs)
    cnn = keras.layers.Conv1D(60, 5, activation = 'relu', padding = 'same')(inputs)
    concat = keras.layers.Concatenate()([lstm, cnn])
    global_avg = keras.layers.GlobalAveragePooling1D()(concat)
    dense = keras.layers.Dense(20, activation='relu')(global_avg)
    softmax = keras.layers.Dense(output_size, activation = 'softmax')(dense)
    model = keras.models.Model(inputs = inputs, outputs = softmax)


    return model

def evaluate3(X_train, y_train, X_test, y_test):
    input_size, output_size = X_train.shape[1], np.unique(y_train).shape[0]

    X_train, X_valid, y_train, y_valid = train_test_split(X_train, y_train, test_size = 0.2, stratify = y_train, shuffle = True)

    models = [
        ('FCN', build_fcn),
        ('LSTM CNN', build_lstm_cnn)
    ]

    scores = {}

    for name, build_model in models:
        model = build_model(input_size, output_size)
        model.compile(optimizer = 'sgd', loss = 'sparse_categorical_crossentropy', metrics = ['accuracy'])

        if name == 'FCN':
            trans_1 = ExponentTransformer(power=2)
            trans_2 = Detrender()
            pipe = trans_1*trans_2
            X_train = pipe.fit_transform(X_train)
            X_valid = pipe.transform(X_valid) 
            X_test = pipe.transform(X_test)

        X_train_tf = tf.data.Dataset.from_tensor_slices((X_train, y_train)).repeat(REPEAT_SIZE).shuffle(BUFFER_SIZE).batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)
        X_valid_tf = tf.data.Dataset.from_tensor_slices((X_valid, y_valid)).repeat(REPEAT_SIZE).shuffle(BUFFER_SIZE).batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)

        model.fit(X_train_tf, validation_data = X_valid_tf, epochs = 100, verbose = 0, callbacks = get_callbacks())
        score = model.evaluate(X_test, y_test)[1]
        scores[name] = score

    return scores


scores = []

for dataset in datasets:
    scores.append(evaluate3(dataset[0], dataset[1], dataset[2], dataset[3]))

for model in scores[0].keys():
    model_scores = [s[model] for s in scores]
    print(model + ' scores ' + str(model_scores))

FCN scores [0.35333332419395447, 0.20000000298023224, 0.35333332419395447]
LSTM CNN scores [0.5837777853012085, 0.25, 0.5837777853012085]


Time series classification using sktime

In [37]:
from sktime.transformations.panel.catch22 import Catch22
from sklearn.model_selection import RandomizedSearchCV
from sklearn.tree import DecisionTreeClassifier
from sktime.classification.compose import SklearnClassifierPipeline


X_train, y_train, X_test, y_test = datasets[0]#ECG5000

pipe = SklearnClassifierPipeline(DecisionTreeClassifier(), [('exponent', ExponentTransformer()), ('catch22', Catch22())])
param_grid = {'classifier__min_samples_leaf': [1, 3, 5, 10], 'expoonent__power': [1, 2, 3], 'catch22__outlier_norm': [True, False]}

pipe.fit(X_train, y_train)
print('Baseline score ' + str(pipe.score(X_test, y_test)))

grid = RandomizedSearchCV(pipe, param_distributions=param_grid, n_iter=3)
grid.fit(X_train, y_train)
print('Best model ' + str(grid.best_estimator_))
print('Best score ' + str(grid.score(X_test, y_test)))

Baseline score 0.9051111111111111




Best model SklearnClassifierPipeline(classifier=DecisionTreeClassifier(min_samples_leaf=10),
                          transformers=[('exponent', ExponentTransformer()),
                                        ('catch22',
                                         Catch22(outlier_norm=True))])
Best score 0.9046666666666666


Multivariate time series classification

In [46]:
from sktime.classification.kernel_based import RocketClassifier

def get_datasets2(name):
    X_train, y_train = load_UCR_UEA_dataset(name, split = 'train', return_type = 'numpy3d')
    X_test, y_test = load_UCR_UEA_dataset(name, split = 'test', return_type = 'numpy3d')

    labels_idx = {label: idx for idx, label in enumerate(np.unique(np.append(y_train, y_test)))}

    y_train = np.array([labels_idx[y] for y in y_train])
    y_test = np.array([labels_idx[y] for y in y_test])

    return X_train, y_train, X_test, y_test

datasets2 = [get_datasets2('Epilepsy'), get_datasets2('EthanolConcentration')]

def build_minirocket(input_shape, output_size):
    return RocketClassifier(rocket_transform = 'minirocket')

def build_rocket(input_shape, output_size):
    return RocketClassifier()

def build_multivariate_mlp(input_shape, output_size):
    return keras.models.Sequential([
        keras.layers.InputLayer(input_shape = input_shape),
        keras.layers.Dropout(0.1),
        keras.layers.Dense(500, activation = 'relu'),
        keras.layers.Dropout(0.2),
        keras.layers.Dense(500, activation = 'relu'),
        keras.layers.Dropout(0.2),
        keras.layers.Dense(500, activation = 'relu'),
        keras.layers.Dropout(0.3),
        keras.layers.Flatten(),
        keras.layers.Dense(output_size, activation = 'softmax')
    ])

def evaluate4(X_train, y_train, X_test, y_test):
    input_shape, output_size = (X_train.shape[2], X_train.shape[1]), np.unique(y_train).shape[0]
    X_train, X_valid, y_train, y_valid = train_test_split(X_train, y_train, test_size = 0.2, stratify = y_train, shuffle = True)

    models = [
        ('MLP', build_multivariate_mlp),
        ('Mini Rocket', build_minirocket),
        ('Rocket', build_rocket)
    ]

    scores = {}

    for name, build_model in models:
        model = build_model(input_shape, output_size)

        if name == 'MLP':
            X_train_tf = tf.data.Dataset.from_tensor_slices((X_train.transpose((0, 2, 1)), y_train)).repeat(REPEAT_SIZE).shuffle(BUFFER_SIZE).batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)
            X_valid_tf = tf.data.Dataset.from_tensor_slices((X_valid.transpose((0, 2, 1)), y_valid)).repeat(REPEAT_SIZE).shuffle(BUFFER_SIZE).batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)

            model.compile(optimizer = keras.optimizers.Adadelta(learning_rate = 0.1, epsilon = 1e-8), loss = 'sparse_categorical_crossentropy', metrics = ['accuracy'])
            model.fit(X_train_tf, validation_data = X_valid_tf, epochs = 100, verbose = 0, callbacks = get_callbacks())
            score = model.evaluate(X_test.transpose((0, 2, 1)), y_test)[1]
        else:
            model.fit(X_train, y_train)
            score = model.score(X_test, y_test)

        scores[name] = score

    return scores


scores = []

for dataset in datasets2:
    scores.append(evaluate4(dataset[0], dataset[1], dataset[2], dataset[3]))

for model in scores[0].keys():
    model_scores = [s[model] for s in scores]
    print(model + ' scores ' + str(model_scores))




MLP scores [0.7971014380455017, 0.2737642526626587]
Mini Rocket scores [1.0, 0.4600760456273764]
Rocket scores [0.9782608695652174, 0.4220532319391635]
