In [None]:
# Configuration - Docker container paths
BASE_DIR = "/nnv/code/nnv/examples/Submission/FORMALISE2025"
DATA_DIR = f"{BASE_DIR}/data"
MODELS_DIR = f"{BASE_DIR}/models"

In [None]:
! pip install tonic
! pip install snntorch
! pip install torchdata

In [None]:
# tonic imports
import tonic
import tonic.transforms as transforms  # Not to be mistaken with torchdata.transfroms
from tonic import DiskCachedDataset

# torch imports
import torch
from torch.utils.data import random_split
from torch.utils.data import DataLoader
import torchvision
import torch.nn as nn

# snntorch imports
import snntorch as snn
from snntorch import surrogate
import snntorch.spikeplot as splt
from snntorch import functional as SF
from snntorch import utils

# other imports
import matplotlib.pyplot as plt
from IPython.display import HTML
from IPython.display import display
import numpy as np
import torchdata
import os
from ipywidgets import IntProgress
import time
import statistics
import random

# Create dataset

In [None]:
random.seed(42)

In [None]:
root = DATA_DIR
os.listdir(os.path.join(root, 'STMNIST'))  # confirm the file exists

In [None]:
dataset = tonic.prototype.datasets.STMNIST(root=root, keep_compressed = False, shuffle = False)

In [None]:
sensor_size = tonic.prototype.datasets.STMNIST.sensor_size
sensor_size = tuple(sensor_size.values())

# Define a transform
frame_transform = transforms.Compose([transforms.ToFrame(sensor_size=sensor_size, time_window=20000)])

In [None]:
def transform_STMNIST(data, transform):
    data_size = len(data)
    test_size = int(data_size * 0.2)
    train_size = data_size - test_size

    train_bar = IntProgress(min=0, max=train_size)
    test_bar = IntProgress(min=0, max=test_size)

    testset = []
    trainset = []

    print('Porting over and transforming the trainset.')
    display(train_bar)
    for _ in range(train_size):
        events, target = next(iter(data))
        events = transform(events)
        trainset.append((events, target))
        train_bar.value += 1
    print('Porting over and transforming the testset.')
    display(test_bar)
    for _ in range(test_size):
        events, target = next(iter(dataset))
        events = transform(events)
        testset.append((events, target))
        test_bar.value += 1

    return (trainset, testset)

start_time = time.time()
trainset, testset = transform_STMNIST(dataset, frame_transform)
elapsed_time = time.time() - start_time

# Convert elapsed time to minutes, seconds, and milliseconds
minutes, seconds = divmod(elapsed_time, 60)
seconds, milliseconds = divmod(seconds, 1)
milliseconds = round(milliseconds * 1000)

# Print the elapsed time
print(f"Elapsed time: {int(minutes)} minutes, {int(seconds)} seconds, {milliseconds} milliseconds")

In [None]:
def create_dataset(trainset, testset, num_frames):
    # create the training data + labels as numpy arrays
    train_samples = []
    train_labels = []

    for i in range(len(trainset)):
        event, label = trainset[i]

        # technically irrelevant now that num_frames is a required arg
        # if num_frames is None:
          # event = np.pad(event, ((0, largest_num_frames_train - event.shape[0]), (0, 0), (0, 0), (0, 0)), mode='constant')

        # else:
        # make sure we have enough frames to subsample; if we don't then use all available + pad
        if num_frames > event.shape[0]:
          event = np.pad(event, ((0, num_frames - event.shape[0]), (0, 0), (0, 0), (0, 0)), mode='constant')
        # otherwise we can subsample
        else:
          # Calculate the step size k
          k = event.shape[0] / num_frames

          # Generate the indices by taking every kth frame, rounded to the nearest integer
          indices = [int(round(i * k)) for i in range(num_frames)]

          # Ensure indices are within the range [0, total_frames - 1]
          indices = [min(idx, event.shape[0] - 1) for idx in indices]

          # Use the indices to select the frames from the video
          event = event[indices]

        train_samples.append(event)
        train_labels.append(label)

    train_samples = np.array(train_samples)
    train_labels = np.array(train_labels)

    # save the numpy arrays
    train_fp = os.path.join(root, 'STMNIST', f'train_samples_{num_frames}.npy')
    train_lp = os.path.join(root, 'STMNIST', f'train_labels_{num_frames}.npy')
    np.save(train_fp, train_samples)
    np.save(train_lp, train_labels)

    # create the testing data + labels as numpy arrays
    test_samples = []
    test_labels = []

    for i in range(len(testset)):
        event, label = testset[i]

        # if num_frames is None:
          # event = np.pad(event, ((0, largest_num_frames_test - event.shape[0]), (0, 0), (0, 0), (0, 0)), mode='constant')

        # else:
        # make sure we have enough frames to subsample; if we don't then use all available + pad
        if num_frames > event.shape[0]:
          event = np.pad(event, ((0, num_frames - event.shape[0]), (0, 0), (0, 0), (0, 0)), mode='constant')
        # otherwise we can subsample
        else:
          # Calculate the step size k
          k = event.shape[0] / num_frames

          # Generate the indices by taking every kth frame, rounded to the nearest integer
          indices = [int(round(i * k)) for i in range(num_frames)]

          # Ensure indices are within the range [0, total_frames - 1]
          indices = [min(idx, event.shape[0] - 1) for idx in indices]

          # Use the indices to select the frames from the video
          event = event[indices]

        test_samples.append(event)
        test_labels.append(label)

    test_samples = np.array(test_samples)
    test_labels = np.array(test_labels)

    # save the numpy arrays
    test_fp = os.path.join(root, 'STMNIST', f'test_samples_{num_frames}.npy')
    test_lp = os.path.join(root, 'STMNIST', f'test_labels_{num_frames}.npy')
    np.save(test_fp, test_samples)
    np.save(test_lp, test_labels)

In [None]:
create_dataset(trainset, testset, num_frames=64)
create_dataset(trainset, testset, num_frames=32)
create_dataset(trainset, testset, num_frames=16)
create_dataset(trainset, testset, num_frames=8)
create_dataset(trainset, testset, num_frames=4)

# Fix ordering of dimensions

In [None]:
def normalize_video(video):
    min_val = video.min()
    max_val = video.max()
    return (video - min_val) / (max_val - min_val) if max_val > min_val else video

In [None]:
# for training data
for l in ['64', '32', '16', '8', '4']:
    td = np.load(os.path.join(root, 'STMNIST', 'train', f'train_samples_{l}.npy'))
    normalized_td = np.array([normalize_video(video) for video in td])
    normalized_td = np.transpose(normalized_td, (0, 2, 1, 3, 4))
    np.save(os.path.join(root, 'STMNIST', 'train', f'train_samples_{l}.npy'), normalized_td)

In [None]:
# for testing data
for l in ['64', '32', '16', '8', '4']:
    td = np.load(os.path.join(root, 'STMNIST', 'test', f'test_samples_{l}.npy'))
    normalized_td = np.array([normalize_video(video) for video in td])
    normalized_td = np.transpose(normalized_td, (0, 2, 1, 3, 4))
    np.save(os.path.join(root, 'STMNIST', 'test', f'test_samples_{l}.npy'), normalized_td)