# Submission information
This is a solution for Home Assignment 05 - Implementation of the ROCKET classification algorithm as can be seen in the article ["ROCKET: Exceptionally fast and accurate time series classication using random convolutional kernels" by Angus Dempster, Francois Petitjean, and Georey I. Webb](https://arxiv.org/pdf/1910.13051.pdf).

Submitted by the following team members:
- Adi Yablonka
- Dana Averbuch
- Nati Ghatan


In [1]:
# Import all necessary libraries
import math
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import math

from sklearn.utils import shuffle
from sklearn.linear_model import RidgeClassifier
from tqdm import tqdm
from google.colab import drive

In [2]:
# Define ROCKET model
class RocketNet(nn.Module):
    def __init__(self, data, class_labels, n_kernels, kernel_sizes):
        super(RocketNet, self).__init__()
        self.raw_data = data
        self.data = torch.max(data, torch.zeros_like(data))  # See section 3.2 in paper
        self.class_labels = class_labels
        self.n_samples = self.data.shape[0]
        self.signal_length = self.data.shape[1]
        self.n_kernels = n_kernels
        self.kernel_sizes = kernel_sizes
        self.__generate_random_kernels(self.n_kernels)
        self.classifier = None

    def __generate_random_kernels(self, n_kernels: int, groups=1, stride=1):
        # Initialize variables
        self.kernels = []
        self.bias_terms = []

        for kernel_index in range(n_kernels):
            # Stride: int = 1
            # Size: int
            current_kernel_size = np.random.choice(self.kernel_sizes)

            # Dilation: int
            maximum_allowed_dilation = math.log2((self.signal_length - 1) / (current_kernel_size - 1))
            current_dilation = np.random.randint(low=0, high=maximum_allowed_dilation)
            dilation_factor = math.floor(math.pow(2, current_dilation))

            # Padding: int
            current_padding_size = 0 if np.random.rand() <= 0.5 else \
                int(((current_kernel_size - 1) * dilation_factor) / 2)

            # Bias: bool
            current_bias = np.random.uniform(low=-1., high=1.)

            # Create kernel with selected randomized parameters
            current_kernel = nn.Conv1d(in_channels=1,
                                       out_channels=1,
                                       kernel_size=current_kernel_size,
                                       stride=stride,
                                       groups=groups,
                                       dilation=dilation_factor,
                                       padding=current_padding_size,
                                       bias=False)

            # Initialize kernel weights
            torch.nn.init.normal_(current_kernel.weight, mean=0.0, std=1.0)  # Draw from a Normal distribution
            current_kernel.weight.data = current_kernel.weight.data - current_kernel.weight.data.mean()  # Mean center

            # Accumulate randomized kernel
            self.kernels.append(current_kernel)
            self.bias_terms.append(current_bias)

    def compute_features_for_data(self, data):
        sample_features = []
        n_samples, signal_length = data.shape
        for sample_index in tqdm(range(n_samples)):
            # Reshape signal to meet model requirements, and feed it through the network
            current_signal = data[sample_index, :].view(1, signal_length)
            signal_features = self.forward(signal=current_signal)

            max_value = [x.max().item() for x in signal_features]
            ppv = [x[x > 0].shape[-1] / x.shape[-1] for x in signal_features]
            sample_features.append(max_value + ppv)

        return sample_features

    def train(self, alpha=1.0, tolerance=1e-3):
        # Transform data through convolution kernels
        features = self.compute_features_for_data(self.data)

        # Perform learning (Ridge regression)
        self.classifier = RidgeClassifier(alpha=alpha, tol=tolerance, normalize=False)
        self.classifier.fit(X=features, y=self.class_labels)
        print('Training score', self.classifier.score(X=features, y=self.class_labels))

    def predict(self, data, labels=None):
        features = self.compute_features_for_data(data)
        print('Test score', self.classifier.score(X=features, y=labels))

    def forward(self, signal):
        conv_output = []
        n_samples, signal_length = signal.shape
        for kernel, bias in zip(self.kernels, self.bias_terms):
            reshaped_signal = signal.view(n_samples, 1, signal_length)
            conv_output.append(kernel(reshaped_signal) + bias)
        return conv_output


In [13]:
## Define running parameters
# Convolution
number_of_kernels = 500
permitted_kernel_sizes = [7, 9, 11]  # Taken from the ROCKET article

# Classification
alpha = 1.0  # Regularization strength
tolerance = 1e-3  # Precision of the solution

In [14]:
## Load datasets
drive.mount('/content/drive/')
path_to_train = "/content/drive/MyDrive/Colab Notebooks/ElectricDevices_TRAIN.tsv"
path_to_test = "/content/drive/MyDrive/Colab Notebooks/ElectricDevices_TEST.tsv"

def load_data_from_file(path_to_file, do_shuffle=False):
  data = pd.read_csv(path_to_file, header=None, sep='\t')
  if do_shuffle:
    data = shuffle(data)
  data = torch.tensor(data.values.astype(np.float32))
  
  # Separate between class labels (first column) and actual data (rest of the columns)
  class_labels = data[:, 0]
  data = data[:, 1:]

  return data, class_labels

# Training data
train_data, train_class_labels = load_data_from_file(path_to_file=path_to_train,
                                 do_shuffle=True)


# Acquire data signal length
signal_length = train_data.shape[1]

# Test data
test_data, test_class_labels = load_data_from_file(path_to_file=path_to_test)

Drive already mounted at /content/drive/; to attempt to forcibly remount, call drive.mount("/content/drive/", force_remount=True).


In [15]:
# Define ROCKET network model
net = RocketNet(data=train_data,
                class_labels=train_class_labels,
                n_kernels=number_of_kernels,
                kernel_sizes=permitted_kernel_sizes)

In [16]:
## Verify that network is functioning properly
def __compute_expected_output_size(signal_length, kernel_size, padding, dilation, stride):
    # Source: https://arxiv.org/pdf/1603.07285.pdf , Page 28
    nominator = (signal_length + (2 * padding) - kernel_size - ((kernel_size - 1) * (dilation - 1)))
    return math.floor(nominator / stride) + 1

# Push training data through the network to get convolution results
results = net.forward(signal=train_data)

# Validate output sizes
for kernel_index in range(net.n_kernels):
    current_kernel = net.kernels[kernel_index]
    kernel_size = current_kernel.kernel_size[0]
    observed_output_size = results[kernel_index].shape[-1]
    padding_size = current_kernel.padding[0]
    stride_size = current_kernel.stride[0]
    dilation_size = current_kernel.dilation[0]
    expected_output_size = __compute_expected_output_size(signal_length=signal_length,
                                                          kernel_size=kernel_size,
                                                          padding=padding_size,
                                                          dilation=dilation_size,
                                                          stride=stride_size)
    assert expected_output_size == observed_output_size
print("Result sizes validated successfully!")

Result sizes validated successfully!


In [17]:
## Train the network
net.train(alpha=alpha, tolerance=tolerance)

100%|██████████| 8926/8926 [07:21<00:00, 20.23it/s]


Training score 0.8909926058704907


In [18]:
## Test the network
net.predict(data=test_data, labels=test_class_labels)

100%|██████████| 7711/7711 [06:22<00:00, 20.13it/s]


Test score 0.6350667877058748
