<a href="https://colab.research.google.com/github/kr7/DynamicROCKET/blob/main/DynamicROCKET.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**DynamicROCKET**

This notebook contains the implementation related to the following study:

K. Buza, M. Antal (2024): *ROCKET with dynamic convolution for time series classification*

Please note that this implementation is **not optimized for run time**, but to illustrate the training and test process of ROCKET and DynamicROCKET.

In [None]:
import numpy as np
from numpy import random as rnd
from scipy.stats import ttest_rel
from sklearn.model_selection import StratifiedKFold
from sklearn.linear_model import RidgeClassifier

In [None]:
!wget http://www.timeseriesclassification.com/aeon-toolkit/Archives/Univariate2018_arff.zip

--2024-01-30 05:08:35--  http://www.timeseriesclassification.com/aeon-toolkit/Archives/Univariate2018_arff.zip
Resolving www.timeseriesclassification.com (www.timeseriesclassification.com)... 109.123.71.232
Connecting to www.timeseriesclassification.com (www.timeseriesclassification.com)|109.123.71.232|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 548832105 (523M) [application/zip]
Saving to: ‘Univariate2018_arff.zip’


2024-01-30 05:09:13 (14.2 MB/s) - ‘Univariate2018_arff.zip’ saved [548832105/548832105]



In [None]:
!unzip Univariate2018_arff.zip

Archive:  Univariate2018_arff.zip
   creating: Univariate_arff/
   creating: Univariate_arff/ACSF1/
  inflating: Univariate_arff/ACSF1/ACSF1.txt  
  inflating: Univariate_arff/ACSF1/ACSF1_TEST.arff  
  inflating: Univariate_arff/ACSF1/ACSF1_TEST.txt  
  inflating: Univariate_arff/ACSF1/ACSF1_TRAIN.arff  
  inflating: Univariate_arff/ACSF1/ACSF1_TRAIN.txt  
  inflating: Univariate_arff/ACSF1/README.md  
   creating: Univariate_arff/Adiac/
  inflating: Univariate_arff/Adiac/Adiac.txt  
  inflating: Univariate_arff/Adiac/Adiac_TEST.arff  
  inflating: Univariate_arff/Adiac/Adiac_TEST.txt  
  inflating: Univariate_arff/Adiac/Adiac_TRAIN.arff  
  inflating: Univariate_arff/Adiac/Adiac_TRAIN.txt  
   creating: Univariate_arff/AllGestureWiimoteX/
  inflating: Univariate_arff/AllGestureWiimoteX/AllGestureWiimoteX.txt  
  inflating: Univariate_arff/AllGestureWiimoteX/AllGestureWiimoteX_TEST.arff  
  inflating: Univariate_arff/AllGestureWiimoteX/AllGestureWiimoteX_TEST.txt  
  inflating: Univari

In [None]:
# In order to perform 10-fold cross-validation, we
# will merge the provided train and test splits and
# we will split the data during the cross-validation

def load_data(file_name_prefix):
  train_data_with_class_labels = np.genfromtxt(file_name_prefix+'_TRAIN.txt')
  test_data_with_class_labels = np.genfromtxt(file_name_prefix+'_TEST.txt')

  data_with_class_labels = np.vstack( (train_data_with_class_labels,
                                      test_data_with_class_labels))
  data_without_class_labels = data_with_class_labels[:,1:]

  return data_without_class_labels, data_with_class_labels[:,0]

In [None]:
%load_ext cython

In [None]:
%%cython

cimport cython
from libc.stdlib cimport malloc, free


def dtw(ts1p, ts2p):
    cdef int LEN_TS1
    cdef int LEN_TS2
    cdef int i
    cdef int j
    cdef float * ts1
    cdef float * ts2
    cdef float * dtw_matrix
    cdef float d

    LEN_TS1 = len(ts1p)
    LEN_TS2 = len(ts2p)

    ts1 = <float *> malloc(LEN_TS1*cython.sizeof(float))
    ts2 = <float *> malloc(LEN_TS2*cython.sizeof(float))
    dtw_matrix = <float *> malloc(LEN_TS1*LEN_TS2*cython.sizeof(float))
      # this is a flattened DTW matrix dtw_matrix[i,j] -> dtw_matrix[i*LEN_TS2 + j]
    if ts1 is NULL or ts2 is NULL:
      raise MemoryError()
    for i in xrange(LEN_TS1):
      ts1[i] = ts1p[i]
    for i in xrange(LEN_TS2):
      ts2[i] = ts2p[i]

    dtw_matrix[0] = abs(ts1[0]-ts2[0])

    for i in range(1, LEN_TS1):
      dtw_matrix[i*LEN_TS2] = dtw_matrix[(i-1)*LEN_TS2]+abs(ts1[i]-ts2[0])

    for j in range(1, LEN_TS2):
      dtw_matrix[j] = dtw_matrix[j-1]+abs(ts1[0]-ts2[j])

    for i in range(1, LEN_TS1):
      for j in range(1, LEN_TS2):
        dtw_matrix[i*LEN_TS2+j] = min(dtw_matrix[(i-1)*LEN_TS2+j-1],
                                      dtw_matrix[(i-1)*LEN_TS2+j],
                                      dtw_matrix[i*LEN_TS2+j-1]) + abs(ts1[i]-ts2[j])

    d=dtw_matrix[ LEN_TS1*LEN_TS2-1 ]

    free(ts1)
    free(ts2)
    free(dtw_matrix)

    return d

In [None]:
def log2(x):
  return np.log(x)/np.log(2)


def get_random_convolution_params(seed, l_input):
  rnd.seed(seed)
  length = rnd.randint(0,2)*2+7
  weights = rnd.normal(0,1,length)
  bias = float(rnd.uniform(-1, 1))
  max_exp = np.floor(log2((l_input-1)/(length-1)))-1
  dilation = int( 2**(rnd.uniform(0,max_exp)) )
  padding = rnd.randint(0,1)
  return (length, weights, bias, dilation, padding)


def apply_convolution( time_series, convolution ):
  length, weights, bias, dilation, padding = convolution
  window_size = length*dilation
  time_series = list(time_series)

  if padding == 1:
    zeros = [0]*(dilation*(length-1)/2)
    time_series = zeros + time_series + zeros

  segments = np.array(
      [ time_series[i:i+window_size:dilation]
        for i in range(0, int(len(time_series)-window_size)) ] )

  weights_mat = weights.reshape(length,1)

  conv = ((segments@weights_mat) + bias)
  conv = conv.reshape(len(segments))

  dtw_conv = np.array( [dtw(s, weights) for s in segments] )

  return conv, dtw_conv


def ppv(series):
  return float(np.sum(series > 0) / np.size(series))


def get_rocket_features(time_series_dataset, convolutional_filters):
  dataset_features_max = [] # features with conventional convolution and max pooling
  dataset_features_ppv = [] # features with conventional convolution and PPV pooling
  dataset_features_dtw = [] # features with dynamic convolution and max pooling
  for ts in time_series_dataset:
    ts_features_max = []
    ts_features_ppv = []
    ts_features_dtw = []
    for c in convolutional_filters:
      convolved_ts, dtw_convolved_ts = apply_convolution(ts, c)
      ts_features_max.append( float(max(convolved_ts)) )
      ts_features_ppv.append( ppv(convolved_ts) )
      ts_features_dtw.append( float(max(dtw_convolved_ts)) )
    dataset_features_max.append( ts_features_max )
    dataset_features_ppv.append( ts_features_ppv )
    dataset_features_dtw.append( ts_features_dtw )
  return np.array(dataset_features_max), np.array(dataset_features_ppv), np.array(dataset_features_dtw)

In [None]:
def experiment(dataset):
  X,y = load_data( f"Univariate_arff/{dataset}/{dataset}")
  kf = StratifiedKFold(n_splits=10, random_state=42, shuffle=True)

  all_acc_rocket = []
  all_acc_dynamicRocket = []

  fold = 0
  print(f"FOLD  ROCKET  DynamicROCKET")
  for train_index, test_index in kf.split(X, y):
    fold = fold + 1

    # Convolutional filters are the same for ROCKET and DynamicROCKET
    convolutional_filters = []
    for i in range(10000):
      convolutional_filters.append(get_random_convolution_params(i, len(X[0])))

    # training data features
    features_train_max, features_train_ppv, features_train_dtw = get_rocket_features(X[train_index], convolutional_filters)

    # training labels
    y_train = y[train_index]

    # train ROCKET
    rocket = RidgeClassifier()
    rocket.fit( np.hstack((features_train_max, features_train_ppv)), y_train)

    # train DynamicROCKET
    dynamic_rocket = RidgeClassifier()
    dynamic_rocket.fit( np.hstack((features_train_dtw, features_train_max, features_train_ppv)), y_train)


    # test data features
    features_test_max, features_test_ppv, features_test_dtw = get_rocket_features(X[test_index], convolutional_filters)

    # test labels
    y_test  = y[test_index]

    # test ROCKET
    pred = rocket.predict( np.hstack((features_test_max, features_test_ppv)) )
    acc = np.mean(pred == y_test)
    all_acc_rocket.append(acc)

    # test DynamicROCKET
    pred = dynamic_rocket.predict( np.hstack((features_test_dtw, features_test_max, features_test_ppv)) )
    acc_d = np.mean(pred == y_test)
    all_acc_dynamicRocket.append(acc_d)

    print(f"{fold:4}  {acc:6.4f}  {acc_d:6.4f} ")

  significant = "No"
  if ttest_rel(all_acc_rocket, all_acc_dynamicRocket)[1] < 0.05:
    significant = "Yes"
  print(f"ROCKET mean accuracy:         {np.mean(all_acc_rocket):6.4f} ")
  print(f"       standard deviation:    {np.std(all_acc_rocket):6.4f}")
  print(f"DynamicROCKET mean accuracy:  {np.mean(all_acc_dynamicRocket):6.4f}")
  print(f"       standard deviation:    {np.std(all_acc_dynamicRocket):6.4f}")
  print(f"Significant:                  {significant}")

In [None]:
experiment("ECG200") # Change the name of the dataset here for experiments on other datasets

FOLD  ROCKET  DynamicROCKET
   1  0.8500  0.8500 
   2  0.9000  0.8500 
   3  0.8500  0.8000 
   4  0.9500  0.9000 
   5  0.7000  0.8000 
   6  0.9500  0.9000 
   7  1.0000  0.9500 
   8  0.8500  0.9000 
   9  0.9500  0.9500 
  10  0.8000  0.9000 
ROCKET mean accuracy:         0.8800 
       standard deviation:    0.0843
DynamicROCKET mean accuracy:  0.8800
       standard deviation:    0.0510
Significant:                  No
