In [4]:
import n2d2
import torch
from torchvision import transforms
from tonic.datasets import NMNIST
import numpy as np
import pickle

## Preprocessing: 3 channels (positive events, negative events, average timestamp)

In [5]:
def preprocess_nmnist_frames_3_channels(data_path=None, nr_samp=1):
      import h5py
      import os
      import torchvision.transforms as transforms
      import numpy as np
      import random

      length = []
      samples = []
      labels = []

      def dvs_frames(data_path,item,filename):
          with h5py.File(f"{data_path}/{item}/{filename}", "r") as f:

                x = np.array(f["spikes"]["x"])
                y = np.array(f["spikes"]["y"])
                t = np.array(f["spikes"]["t"])

                fe = np.array(f["frame"], dtype="float32")
                time_image = np.zeros([28,28])
                total_events_matrix = fe[0] + fe[1]
                for i in range(0,300):
                  time_image[y[i]][x[i]] += t[i]/total_events_matrix[y[i]][x[i]]

                frame = []

                frame.append(fe[0]) # negative events
                frame.append(fe[1]) # pozitive events
                frame.append(np.array(time_image, dtype="float32")) # timestamp average events
                label = int(f["label"][()].decode("utf-8")) # retriving label for the frame
                frame = np.array(frame)
                # frame = np.transpose(np.array(frame), (1,2,0))


                transform_1 = transforms.Compose(
          [transforms.ToTensor(), transforms.Lambda(lambda x: x / fe[0].max() ), transforms.Normalize((0.5,), (0.5,))]
      )
                transform_2 = transforms.Compose(
          [transforms.ToTensor(), transforms.Lambda(lambda x: x / fe[1].max() ), transforms.Normalize((0.5,), (0.5,))]
      )
                transform_3 = transforms.Compose(
          [transforms.ToTensor(), transforms.Lambda(lambda x: x / frame[2].max() ), transforms.Normalize((0.5,), (0.5,))]
      )
                frame[0] = transform_1(frame[0])
                frame[1] = transform_2(frame[1])
                frame[2] = transform_3(frame[2])
                return [frame, label]
          return False
      
      print("Preprocessing of event data started ...")
      for item in os.listdir(data_path):
        for i in range(0,nr_samp): # we can choose the number of frames to take for each sample
          data = sorted(os.listdir(f"{data_path}/{item}/"))
          if len(data)-2<0:
            frame_to_take = random.randint(0,0)
          else:
            frame_to_take = random.randint(0,len(data)-2) # generating a random index (-2 because the last frame of time window was degenerated frame)
          filename = sorted(os.listdir(f"{data_path}/{item}/"))[frame_to_take]
          try:
            data=dvs_frames(data_path,item,filename)
            if data:
                samples.append(data[0])
                labels.append(data[1])
            else:
                print("Prolbem to open file: ", filename)
          except Exception as e:
            print(f"Attention: {e}")

          if len(samples)%1000 == 0:
                print(len(samples))

      print("Data finished")

      return samples, labels

In [6]:
train_data_spike_3_channels = preprocess_nmnist_frames_3_channels("dataset_train_spike/Train", nr_samp=1)
test_data_spike_3_channels = preprocess_nmnist_frames_3_channels("dataset_test_spike/Test", nr_samp=1)

data_3_channels = train_data_spike_3_channels[0] + test_data_spike_3_channels[0]
labels_3_channels = train_data_spike_3_channels[1] + test_data_spike_3_channels[1]

Preprocessing of event data started ...
1000
2000
3000
4000
5000
6000
7000
8000
9000
10000
11000
12000
13000
14000
15000
16000
17000
18000
19000
20000
21000
22000
23000
24000
25000
26000
27000
28000
29000
30000
31000
32000
33000
34000
35000
36000
37000
38000
39000
40000
41000
42000
43000
44000
45000
46000
47000
48000
49000
50000
51000
52000
53000
54000
55000
56000
57000
58000
59000
60000
Data finished
Preprocessing of event data started ...
1000
2000
3000
4000
5000
6000
7000
8000
9000
10000
Data finished


In [7]:
with open('data_3_channels.pkl', 'wb') as file:
    pickle.dump(data_3_channels, file)
with open('labels_3_channels.pkl', 'wb') as file:
    pickle.dump(labels_3_channels, file)

## Preprocessing: 2 channels (positive events, negative events)

In [8]:
def preprocess_nmnist_frames_2_channels(data_path=None, nr_samp=1):
      import h5py
      import os
      import torchvision.transforms as transforms
      import numpy as np
      import random

      length = []
      samples = []
      labels = []

      def dvs_frames(data_path,item,filename):
          with h5py.File(f"{data_path}/{item}/{filename}", "r") as f:

                x = np.array(f["spikes"]["x"])
                y = np.array(f["spikes"]["y"])
                t = np.array(f["spikes"]["t"])

                fe = np.array(f["frame"], dtype="float32")
                time_image = np.zeros([28,28])
                total_events_matrix = fe[0] + fe[1]
                for i in range(0,300):
                  time_image[y[i]][x[i]] += t[i]/total_events_matrix[y[i]][x[i]]

                frame = []

                frame.append(fe[0]) # negative events
                frame.append(fe[1]) # pozitive events
                #frame.append(np.array(time_image, dtype="float32")) # timestamp average events
                label = int(f["label"][()].decode("utf-8")) # retriving label for the frame
                frame = np.array(frame)
                # frame = np.transpose(np.array(frame), (1,2,0))


                transform_1 = transforms.Compose(
          [transforms.ToTensor(), transforms.Lambda(lambda x: x / fe[0].max() ), transforms.Normalize((0.5,), (0.5,))]
      )
                transform_2 = transforms.Compose(
          [transforms.ToTensor(), transforms.Lambda(lambda x: x / fe[1].max() ), transforms.Normalize((0.5,), (0.5,))]
      )
                transform_3 = transforms.Compose(
          [transforms.ToTensor(), transforms.Lambda(lambda x: x / frame[2].max() ), transforms.Normalize((0.5,), (0.5,))]
      )
                frame[0] = transform_1(frame[0])
                frame[1] = transform_2(frame[1])
                #frame[2] = transform_3(frame[2])
                return [frame, label]
          return False
      
      print("Preprocessing of event data started ...")
      for item in os.listdir(data_path):
        for i in range(0,nr_samp): # we can choose the number of frames to take for each sample
          data = sorted(os.listdir(f"{data_path}/{item}/"))
          if len(data)-2<0:
            frame_to_take = random.randint(0,0)
          else:
            frame_to_take = random.randint(0,len(data)-2) # generating a random index (-2 because the last frame of time window was degenerated frame)
          filename = sorted(os.listdir(f"{data_path}/{item}/"))[frame_to_take]
          try:
            data=dvs_frames(data_path,item,filename)
            if data:
                samples.append(data[0])
                labels.append(data[1])
            else:
                print("Prolbem to open file: ", filename)
          except Exception as e:
            print(f"Attention: {e}")
                  
        if len(samples)%1000 == 0:
            print(len(samples))


      print("Data finished")


      return samples, labels

In [9]:
train_data_spike_2_channels = preprocess_nmnist_frames_2_channels("dataset_train_spike/Train", nr_samp=1)
test_data_spike_2_channels = preprocess_nmnist_frames_2_channels("dataset_test_spike/Test", nr_samp=1)

data_2_channels = train_data_spike_2_channels[0] + test_data_spike_2_channels[0]
labels_2_channels = train_data_spike_2_channels[1] + test_data_spike_2_channels[1]

Preprocessing of event data started ...
1000
2000
3000
4000
5000
6000
7000
8000
9000
10000
11000
12000
13000
14000
15000
16000
17000
18000
19000
20000
21000
22000
23000
24000
25000
26000
27000
28000
29000
30000
31000
32000
33000
34000
35000
36000
37000
38000
39000
40000
41000
42000
43000
44000
45000
46000
47000
48000
49000
50000
51000
52000
53000
54000
55000
56000
57000
58000
59000
60000
Data finished
Preprocessing of event data started ...
1000
2000
3000
4000
5000
6000
7000
8000
9000
10000
Data finished


In [10]:
with open('data_2_channels.pkl', 'wb') as file:
    pickle.dump(data_2_channels, file)
with open('labels_2_channels.pkl', 'wb') as file:
    pickle.dump(labels_2_channels, file)

## Preprocessing: Positive Events + Negative Events

In [11]:
def preprocess_nmnist_frames_sum(data_path=None, nr_samp=1):
      import h5py
      import os
      import torchvision.transforms as transforms
      import numpy as np
      import random

      length = []
      samples = []
      labels = []
    
      def frame_label(data_path, item, filename):
            with h5py.File(f"{data_path}/{item}/{filename}", "r") as f:
                frame = np.array(f["frame"][0]+f["frame"][1],dtype="float32")
                transform = transforms.Compose([transforms.ToTensor(), transforms.Lambda(lambda x: x / frame.max() ), transforms.Normalize((0.5,), (0.5,))])
                frame = transform(frame) # transforming data of frames
                label = int(f["label"][()].decode("utf-8")) # retriving label for the frame
                return [frame, label]
            return False

      
      print("Preprocessing of event data started ...")
      for item in os.listdir(data_path):
        for i in range(0,nr_samp): # we can choose the number of frames to take for each sample
          data = sorted(os.listdir(f"{data_path}/{item}/"))
          if len(data)-2<0:
            frame_to_take = random.randint(0,0)
          else:
            frame_to_take = random.randint(0,len(data)-2) # generating a random index (-2 because the last frame of time window was degenerated frame)
            filename = sorted(os.listdir(f"{data_path}/{item}/"))[frame_to_take]
          try:
            data=frame_label(data_path,item,filename)
            if data:
                samples.append(data[0].numpy())
                labels.append(data[1])
            else:
                print("Prolbem to open file: ", filename)
          except Exception as e:
                print(f"Attention: {e}")  
              

          if len(samples)%1000 == 0:
                print(len(samples))

      print("Data finished")


      return samples, labels

In [None]:
train_data_spike_sum = preprocess_nmnist_frames_sum("dataset_train_spike/Train", nr_samp=1)
test_data_spike_sum = preprocess_nmnist_frames_sum("dataset_test_spike/Test", nr_samp=1)

data_sum = train_data_spike_sum[0] + test_data_spike_sum[0]
labels_sum = train_data_spike_sum[1] + test_data_spike_sum[1]

Preprocessing of event data started ...
1000
2000
3000
4000
5000
6000
7000
8000
9000
10000
11000
12000
13000
14000
15000
16000
17000
Attention: [Errno 2] Unable to synchronously open file (unable to open file: name = 'dataset_train_spike/Train/17665_53b41/4.h5', errno = 2, error message = 'No such file or directory', flags = 0, o_flags = 0)
18000
19000
20000
21000
22000
23000
24000
25000


In [None]:
with open('data_sum.pkl', 'wb') as file:
    pickle.dump(data_sum, file)
with open('labels_sum.pkl', 'wb') as file:
    pickle.dump(labels_sum, file)

## Preprocessing: Positive Events - Negative Events

In [None]:
def preprocess_nmnist_frames_sub(data_path=None, nr_samp=1):
      import h5py
      import os
      import torchvision.transforms as transforms
      import numpy as np
      import random

      length = []
      samples = []
      labels = []
    
      def frame_label(data_path, item, filename):
            with h5py.File(f"{data_path}/{item}/{filename}", "r") as f:
                frame = np.array(f["frame"][1]-f["frame"][0],dtype="float32")
                transform = transforms.Compose([transforms.ToTensor(), transforms.Lambda(lambda x: x / frame.max() ), transforms.Normalize((0.5,), (0.5,))])
                frame = transform(frame) # transforming data of frames
                label = int(f["label"][()].decode("utf-8")) # retriving label for the frame
                return [frame, label]
            return False

      
      print("Preprocessing of event data started ...")
      for item in os.listdir(data_path):
        for i in range(0,nr_samp): # we can choose the number of frames to take for each sample
          data = sorted(os.listdir(f"{data_path}/{item}/"))
          if len(data)-2<0:
            frame_to_take = random.randint(0,0)
          else:
            frame_to_take = random.randint(0,len(data)-2) # generating a random index (-2 because the last frame of time window was degenerated frame)
            filename = sorted(os.listdir(f"{data_path}/{item}/"))[frame_to_take]
          try:
            data=frame_label(data_path,item,filename)
            if data:
                samples.append(data[0].numpy())
                labels.append(data[1])
            else:
                print("Prolbem to open file: ", filename)
          except Exception as e:
                print(f"Attention: {e}")  
              

          if len(samples)%1000 == 0:
                print(len(samples))

      print("Data finished")


      return samples, labels

In [None]:
train_data_spike_sub = preprocess_nmnist_frames_sub("dataset_train_spike/Train", nr_samp=1)
test_data_spike_sub = preprocess_nmnist_frames_sub("dataset_test_spike/Test", nr_samp=1)

data_sub = train_data_spike_sub[0] + test_data_spike_sub[0]
labels_sub = train_data_spike_sub[1] + test_data_spike_sub[1]

In [None]:
with open('data_sub.pkl', 'wb') as file:
    pickle.dump(data_sub, file)
with open('labels_sub.pkl', 'wb') as file:
    pickle.dump(labels_sub, file)

## Preprocessing: Only Positive Events

In [None]:
def preprocess_nmnist_frames_positive(data_path=None, nr_samp=1):
      import h5py
      import os
      import torchvision.transforms as transforms
      import numpy as np
      import random

      length = []
      samples = []
      labels = []
    
      def frame_label(data_path, item, filename):
            with h5py.File(f"{data_path}/{item}/{filename}", "r") as f:
                frame = np.array(f["frame"][1],dtype="float32")
                transform = transforms.Compose([transforms.ToTensor(), transforms.Lambda(lambda x: x / frame.max() ), transforms.Normalize((0.5,), (0.5,))])
                frame = transform(frame) # transforming data of frames
                label = int(f["label"][()].decode("utf-8")) # retriving label for the frame
                return [frame, label]
            return False

      
      print("Preprocessing of event data started ...")
      for item in os.listdir(data_path):
        for i in range(0,nr_samp): # we can choose the number of frames to take for each sample
          data = sorted(os.listdir(f"{data_path}/{item}/"))
          if len(data)-2<0:
            frame_to_take = random.randint(0,0)
          else:
            frame_to_take = random.randint(0,len(data)-2) # generating a random index (-2 because the last frame of time window was degenerated frame)
            filename = sorted(os.listdir(f"{data_path}/{item}/"))[frame_to_take]
          try:
            data=frame_label(data_path,item,filename)
            if data:
                samples.append(data[0].numpy())
                labels.append(data[1])
            else:
                print("Prolbem to open file: ", filename)
          except Exception as e:
                print(f"Attention: {e}")  
              

          if len(samples)%1000 == 0:
                print(len(samples))

      print("Data finished")


      return samples, labels

In [None]:
train_data_spike_positive = preprocess_nmnist_frames_positive("dataset_train_spike/Train", nr_samp=1)
test_data_spike_positive = preprocess_nmnist_frames_positive("dataset_test_spike/Test", nr_samp=1)

data_positive = train_data_spike_positive[0] + test_data_spike_positive[0]
labels_positive = train_data_spike_positive[1] + test_data_spike_positive[1]

In [None]:
with open('data_positive.pkl', 'wb') as file:
    pickle.dump(data_positive, file)
with open('labels_positive.pkl', 'wb') as file:
    pickle.dump(labels_positive, file)

## Preprocessing: Only Negative Events

In [None]:
def preprocess_nmnist_frames_negative(data_path=None, nr_samp=1):
      import h5py
      import os
      import torchvision.transforms as transforms
      import numpy as np
      import random

      length = []
      samples = []
      labels = []
    
      def frame_label(data_path, item, filename):
            with h5py.File(f"{data_path}/{item}/{filename}", "r") as f:
                frame = np.array(f["frame"][0],dtype="float32")
                transform = transforms.Compose([transforms.ToTensor(), transforms.Lambda(lambda x: x / frame.max() ), transforms.Normalize((0.5,), (0.5,))])
                frame = transform(frame) # transforming data of frames
                label = int(f["label"][()].decode("utf-8")) # retriving label for the frame
                return [frame, label]
            return False

      
      print("Preprocessing of event data started ...")
      for item in os.listdir(data_path):
        for i in range(0,nr_samp): # we can choose the number of frames to take for each sample
          data = sorted(os.listdir(f"{data_path}/{item}/"))
          if len(data)-2<0:
            frame_to_take = random.randint(0,0)
          else:
            frame_to_take = random.randint(0,len(data)-2) # generating a random index (-2 because the last frame of time window was degenerated frame)
            filename = sorted(os.listdir(f"{data_path}/{item}/"))[frame_to_take]
          try:
            data=frame_label(data_path,item,filename)
            if data:
                samples.append(data[0].numpy())
                labels.append(data[1])
            else:
                print("Prolbem to open file: ", filename)
          except Exception as e:
                print(f"Attention: {e}")  
              

          if len(samples)%1000 == 0:
                print(len(samples))

      print("Data finished")


      return samples, labels

In [None]:
train_data_spike_negative = preprocess_nmnist_frames_negative("dataset_train_spike/Train", nr_samp=1)
test_data_spike_negative = preprocess_nmnist_frames_negative("dataset_test_spike/Test", nr_samp=1)

data_negative = train_data_spike_negative[0] + test_data_spike_negative[0]
labels_negative = train_data_spike_negative[1] + test_data_spike_negative[1]

In [None]:
with open('data_negative.pkl', 'wb') as file:
    pickle.dump(data_negative, file)
with open('labels_negative.pkl', 'wb') as file:
    pickle.dump(labels_negative, file)