#### Template
*Machine Learning and Neural Networks on “ Deep Learning of a public dataset”*

Type annotation for return types. def sample() -> int .

sample  function returns  an int

It is common to run unit tests as single python file. However, for ease of submission and markings, a standalone notebook was created to test SVM_Tools class.

Class SVM_Tools contains seven methods, but only four in this notebook for preprocess and cache testing:

 1. get_psd(self,data_signal, frequency=256, segment_length=256)  -> numpy array of power signal density features. Unable to run CHB-MIT dataset in a single process. Numpy array size is too large to fit into memory.
 2. get_psd_batch(self, batch_size=1000) -> numpy array of power signal features. Run get_psd in batches.
 3. get_wavelet(self, scales=np.arange(1,30), wavelet='cmor',batch_size=50) -> returns np.ndarray of shape (n_samples, n_features)
 4. a cache decorator. see section on caching, three cells below.

Learning points during implementation are:
 - Decorator does not pass self into nested function. Tried assign cache_decorator with @static method, but this prevents use of self.cache as self is not passed
 - Instead, manually apply decorator function. Call cache_decorator with "cache" as argument. The returned nested decorator is called with test_function as argument which returns wrapper. Wrapper reassigned back to self.test_function.


In [1]:
# Array, file access, signal procesing libraries
import numpy as np
import pandas as pd
import os
import importlib.util as util
from matplotlib import pyplot as plt
from scipy.signal import  welch, butter, filtfilt  # estimate signal power across all frequencies
import pywt # wavelet transformation algorithm. Python package to install is called PyWavelets
import unittest  # in-built unit testing module



In [23]:

class SVM_Tools:
    # unnecessary attributes and methods removed from class
    # decorator does not pass self into nested function. So assign cache_decorator with @static method, but this prevents use of self.cache as self is not passed
    # instead manually apply decorator function. Call cache_decorator with "cache" as argument. The returned nested decorator is called with test_function as argument which returns wrapper. Wrapper reassigned back to self.test_function.

    def __init__(self, data, labels):
        self.full_data = data
        self.full_labels = labels
        self.cache = {}
        self.test_function = self.cache_decorator("cache")(self.test_function)

    def cache_decorator(self, name):
        def inside_decorator(func):
            # call nested function
            def wrapper(*args,**kwargs):
                array_result = func(*args,**kwargs)
                try:
                    if not isinstance(array_result, np.ndarray):
                        array_result = np.array(array_result)
                except TypeError:
                    print("Wrong data type")
                # store key-value pair in cache_dict
                self.cache[name] = array_result
                return array_result
            return wrapper
        return inside_decorator

    def test_function(self, array):
        return np.array(array)

    # feature to be extracted is the power spectral density for each channel
    def get_psd(self, frequency=256, segment_length=256) -> np.ndarray:
        # sample size too large for single step processing. Resulted in out of memory error.
        # Batch process data instead.

        psd_features = []
        for channel in self.full_data:
            freq, psd = welch(x=channel, fs=frequency, nperseg=segment_length)
            psd_features.append(psd)
        return np.array(psd_features)

    def get_psd_batch(self, batch_size=1000) -> np.ndarray:
        batch_size = 100
        full_features = []
        #pass batch size of samples to get_psd. Store it and concatenate all obtained result into single numpy array.
        for start in range(0,self.full_data.shape[1],batch_size):
            end = min(start + batch_size, self.full_data.shape[1])
            each_batch = self.full_data[:,start:end]
            batch_psd = self.get_psd(each_batch)
            full_features.append(batch_psd)

        # each batch is appended as a numpy array into full_features. Concatenate the batches into a single batch (row wise).
        full_features = np.concatenate(full_features, axis = 1)
        return full_features


    def get_wavelet(self, scales=np.arange(1,30), wavelet='cmor',batch_size=50) -> np.ndarray:
        # useful brain activity 1 - 50 Hz
        # apply band filter. Remove > 50 Hz, and < 1 Hz frequencies
        # Nyquist frequency is twice the frequency that can be reconstituted from data
        # parameters to create filter using butter function from scipy.signal
        sampling_frequency = 256  # Sampling frequency from CHB-MIT dataset
        nyquist = 0.5 * sampling_frequency
        low_hz = 1
        high_hz = 50
        order = 4  # scipy.signal.butter documentation recommends 4 for bandpass

        # User Butterworth filter to create bandpass filter
        lower_limit = low_hz / nyquist
        higher_limit = high_hz / nyquist
        b,a = butter(order, [lower_limit, higher_limit], btype='band')
        # returns (numerator, denominator)

        # Apply Butterworth filter to filter data
        # returns same shaped numpy array with > 50 & < 1 values filtered out
        filtered_signals = filtfilt(b,a,self.full_data,axis =-1)
        n_samples, n_channels, n_datapoints = filtered_signals.shape

        samples_features = [] # list of features for each sample
        # for each sample, extract wavelet features for each chanel, and find mean of all channels in a sample
        for sample in range(n_samples):
            channel_features = [] # list of features for each channel
            for channel in range(n_channels):
                # perform Morlet wavelet transform using pywt library for each channel separately
                # cwt is continuo  wavelet transformation (for continuous variable)
                # 128 different scales ( "resolutions") applied using np.arange(1,128)
                coefficient, freq = pywt.cwt(filtered_signals[sample, channel], scales, wavelet)

                # coefficient are complex numbers, may be positive or negative. Squared value removes negative numbers
                # np.abs(coefficient) returns magnitude  np.abs(coefficient)**2 returns power (intensity or energy in different frequency band.
                power = np.abs(coefficient)**2

                # take mean over time axis
                feature = np.mean(power, axis = 1)
                channel_features.append(feature)

            # concatenate all channel features for this sample
            samples_features.append(np.concatenate(channel_features, axis=0))  # alternate, np.stack - useful if need to create new axis
        np_features = np.array(samples_features)
        print("shape",np_features.shape)
        return np_features


##### Test Caching

In [17]:

class TestSVMTools(unittest.TestCase):
    def setUp(self):
        # Create an instance of class SVM_Tools
        input_array = [1, 2, 3, 4, 5]
        input_labels = [1, 2, 3, 4, 5]
        expected_array = np.array(input_array)
        self.svm_tools = SVM_Tools(input_array, input_labels)

    def test_cache_storage(self):
        # Create input data
        input_array = [1, 2, 3, 4, 5]
        expected_array = np.array(input_array)

        # Call test_function
        result_array = self.svm_tools.test_function(input_array)

        # Assert that key word "cache" is in cache dictionary
        self.assertTrue("cache" in self.svm_tools.cache)

        # Assert the function returns the correct array
        np.testing.assert_array_equal(result_array, expected_array)

# Run the tests
# In interactive environment, does not exit notebook after running tests, exit argument has to set to False
# In interactive environment, environment args may be passed to main , thus argv=[''] makes sure no additinal command is sent
unittest.main(argv=[''], verbosity=2,exit=False)

test_cache_storage (__main__.TestSVMTools) ... ok

----------------------------------------------------------------------
Ran 1 test in 0.003s

OK


<unittest.main.TestProgram at 0x1dfe368eb80>

##### Test Power Spectral Density

In [20]:


class TestSVMTools(unittest.TestCase):
    def setUp(self):
        # 1000 samples with two channels of random noise each
        self.sample_data = [np.random.randn(1000) for _ in range(2)]
        self.sample_labels = [np.random.randint(0,1,1000) for _ in range(2)]
        self.svm_tools = SVM_Tools(self.sample_data, self.sample_labels)

    # set up testing of get_psd
    def test_get_psd(self):
        # Call the get_psd method
        psd_result = self.svm_tools.get_psd()

        # Assert that the result is a numpy array
        self.assertIsInstance(psd_result, np.ndarray)

        # Assert the shape of the result
        self.assertEqual(psd_result.shape[0], len(self.sample_data))
        self.assertEqual(psd_result.shape[1], 129)  # Default segment length is 256, nperseg//2 + 1

        # Additional checks can be added based on expected PSD values

# Run the tests
unittest.main(argv=[''], verbosity=2, exit=False)


test_get_psd (__main__.TestSVMTools) ... ok

----------------------------------------------------------------------
Ran 1 test in 0.004s

OK


<unittest.main.TestProgram at 0x1dfe366cb50>

##### Test wavelet analysis

In [24]:

class TestSVMTools(unittest.TestCase):
    def setUp(self):
        # Sample data with random values with shape (10 samples, 2 channels, 256 datapoints)
        self.sample_data = np.random.randn(10, 2, 256)
        self.sample_labels = np.random.randint(0,1,10)
        self.svm_tools = SVM_Tools(self.sample_data, self.sample_labels)

    def test_get_wavelet(self):
        # Call the get_wavelet method
        result = self.svm_tools.get_wavelet()

        # Assert that the result is a numpy array
        self.assertIsInstance(result, np.ndarray)

        # Assert expected shape of (10 samples, 58 features - 29 scales per channel * 2 channels)
        # scale in method has been set from 1  to 30, ( 29 because each channel spans two scales)
        self.assertEqual(result.shape, (10, 58))


# Run tests
unittest.main(argv=[''], verbosity=2, exit=False)


  wavelet = DiscreteContinuousWavelet(wavelet)
ok

----------------------------------------------------------------------
Ran 1 test in 0.059s

OK


shape (10, 58)


<unittest.main.TestProgram at 0x1dfe367f400>