In [1]:
import os
import h5py

import numpy as np
import matplotlib.pyplot as plt
from matplotlib.gridspec import GridSpec
import seaborn as sns

import torch
import torch.nn as nn
import torchvision
from torch.utils import data
import heidel_utils
from importlib import reload # reload 
reload(heidel_utils)


<module 'heidel_utils' from '/Users/svenkerstjens/msc-thesis/MSc-thesis/heidel_utils.py'>

tensor([[97, 14, 12,  ..., 22, 78,  3],
        [36, 82, 60,  ..., 35, 50, 80],
        [57, 99, 21,  ..., 26, 68, 51],
        ...,
        [89, 24,  5,  ..., 59, 90, 96],
        [98, 64,  8,  ..., 78, 65, 88],
        [14, 18, 23,  ..., 12, 72, 59]], device='cuda:0')

In [50]:
from collections import namedtuple
NeuronState = namedtuple('NeuronState', ['U', 'I', 'S'])

class LIFDensePopulation(nn.Module):
    # NeuronState = namedtuple('NeuronState', ['U', 'I', 'S'])
    def __init__(self, in_channels, out_channels, bias=True, alpha = .9, beta=.85, batch_size=10,W=None):
        super(LIFDensePopulation, self).__init__()
        self.fc_layer = nn.Linear(in_channels, out_channels)
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.batch_size = batch_size
        self.alpha = alpha
        self.beta = beta
        self.state = NeuronState(U=torch.zeros(batch_size, out_channels).to(device),
                                 I=torch.zeros(batch_size, out_channels).to(device),
                                 S=torch.zeros(batch_size, out_channels).to(device))
        self.NeuronState = self.state
        self.fc_layer.weight.data.uniform_(-.3, .3)
        self.fc_layer.bias.data.uniform_(-.01, .01)


    def forward(self, Sin_t):
        state = self.state
        U = self.alpha*state.U + state.I - state.S #mem
        I = self.beta*state.I + self.fc_layer(Sin_t) #syn
        # update the neuronal state
        S = smooth_step(U)
        self.state = NeuronState(U=U, I=I, S=S)
        self.NeuronState = self.state
        #state = NeuronState(U=U, I=I, S=S)
        return self.state

    def init_state(self):

        out_channels = self.out_channels
        self.state = NeuronState(U=torch.zeros(self.batch_size, out_channels,device=device),
                                 I=torch.zeros(self.batch_size, out_channels,device=device),
                                 S=torch.zeros(self.batch_size, out_channels,device=device))
        self.NeuronState = self.state

    def init_mod_weights(self,W):
        self.fc_layer.weight = torch.nn.Parameter(self.fc_layer.weight.data * torch.Tensor(W))

class LifRecPopulation(nn.Module):
    # NeuronState = namedtuple('NeuronState', ['U', 'I', 'S'])
    def __init__(self, in_channels, out_channels, bias=True, alpha = .9, beta=.85, batch_size=10,W=None):
        super(LifRecPopulation, self).__init__()
        self.fc_layer = nn.Linear(in_channels, out_channels)
        self.rec_layer = nn.Linear(out_channels, out_channels)
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.batch_size = batch_size
        self.alpha = alpha
        self.beta = beta
        self.state = NeuronState(U=torch.zeros(batch_size, out_channels).to(device),
                                 I=torch.zeros(batch_size, out_channels).to(device),
                                 S=torch.zeros(batch_size, out_channels).to(device))
        self.NeuronState = self.state
        self.fc_layer.weight.data.uniform_(-.3, .3)
        self.fc_layer.bias.data.uniform_(-.01, .01)
        self.rec_layer.weight.data.uniform_(-.3, .3)
        self.rec_layer.bias.data.uniform_(-.01, .01)


    def forward(self, Sin_t):
        state = self.state
        U = self.alpha*state.U + state.I - state.S #mem
        I = self.beta*state.I + self.fc_layer(Sin_t) + self.rec_layer(state.S) #syn
        # update the neuronal state
        S = smooth_step(U)
        self.state = NeuronState(U=U, I=I, S=S)
        self.NeuronState = self.state
        #state = NeuronState(U=U, I=I, S=S)
        return self.state

    def init_state(self):

        out_channels = self.out_channels
        self.state = NeuronState(U=torch.zeros(self.batch_size, out_channels,device=device),
                                 I=torch.zeros(self.batch_size, out_channels,device=device),
                                 S=torch.zeros(self.batch_size, out_channels,device=device))
        self.NeuronState = self.state

    def init_mod_weights(self,W):
        self.fc_layer.weight = torch.nn.Parameter(self.fc_layer.weight.data * torch.Tensor(W))


class SmoothStep(torch.autograd.Function):
    '''
    Modified from: https://pytorch.org/tutorials/beginner/examples_autograd/two_layer_net_custom_function.html
    '''

    scale = 100.0

    @staticmethod
    def forward(aux, x):
        """
        In the forward pass we compute a step function of the input Tensor
        and return it. ctx is a context object that we use to stash information which
        we need to later backpropagate our error signals. To achieve this we use the
        ctx.save_for_backward method.
        """
        aux.save_for_backward(x)
        out = torch.zeros_like(x)

        out[x > 0] = 1.0
        return out

    def backward(aux, grad_output):
        
        #grad_input = grad_output.clone()
        input, = aux.saved_tensors
        grad_input = grad_output.clone()
        grad = grad_input/(SmoothStep.scale*torch.abs(input)+1.0)**2
        return grad


smooth_step = SmoothStep().apply

class OneHiddenModel(nn.Module):

    def __init__(self,in_channels,hidden_channels,out_channels,batch_size,alpha=.9,beta=.85,device='cpu',W=None):
        super(OneHiddenModel, self).__init__()

        self.in_channels = in_channels
        self.hidden_channels = hidden_channels
        self.out_channels = out_channels
        self.alpha = alpha
        self.beta = beta
        self.batch_size = batch_size
        self.device = device
        self.W = W
        self.layer1 = LIFDensePopulation(in_channels=self.in_channels,out_channels=self.hidden_channels,
                                         alpha=self.alpha,beta=self.beta,batch_size=self.batch_size,W=W).to(device)
        self.layer2 = LIFDensePopulation(in_channels=self.hidden_channels,out_channels=self.out_channels,
                                         alpha=self.alpha,beta=self.beta,batch_size=self.batch_size).to(device)

    def forward(self,Sin):
        hidden = self.layer1(Sin)
        out = self.layer2(hidden.S)
        return out

    def init_states(self):
        self.layer1.init_state()
        self.layer2.init_state()

    def init_mod_weights(self,W):
        self.layer1.fc_layer.weight = torch.nn.Parameter(self.layer1.fc_layer.weight.data * torch.Tensor(W))

class OneRecHiddenModel(nn.Module):

    def __init__(self,in_channels,hidden_channels,out_channels,batch_size,alpha=.9,beta=.85,device='cpu',W=None):
        super(OneRecHiddenModel, self).__init__()

        self.in_channels = in_channels
        self.hidden_channels = hidden_channels
        self.out_channels = out_channels
        self.alpha = alpha
        self.beta = beta
        self.batch_size = batch_size
        self.device = device
        self.W = W
        self.layer1 = LifRecPopulation(in_channels=self.in_channels,out_channels=self.hidden_channels,
                                         alpha=self.alpha,beta=self.beta,batch_size=self.batch_size,W=W).to(device)
        self.layer2 = LIFDensePopulation(in_channels=self.hidden_channels,out_channels=self.out_channels,
                                         alpha=self.alpha,beta=self.beta,batch_size=self.batch_size).to(device)

    def forward(self,Sin):
        hidden = self.layer1(Sin)
        out = self.layer2(hidden.S)
        return out

    def init_states(self):
        self.layer1.init_state()
        self.layer2.init_state()

    def init_mod_weights(self,W):
        self.layer1.fc_layer.weight = torch.nn.Parameter(self.layer1.fc_layer.weight.data * torch.Tensor(W))

class ThreeHiddenModel(nn.Module):

    def __init__(self,in_channels,hidden_channels,out_channels,batch_size,alpha=.9,beta=.85,device='cpu',W=None):
        super(ThreeHiddenModel, self).__init__()

        self.in_channels = in_channels
        self.hidden_channels = hidden_channels
        self.out_channels = out_channels
        self.alpha = alpha
        self.beta = beta
        self.batch_size = batch_size
        self.device = device
        self.W = W
        self.layer1 = LIFDensePopulation(in_channels=self.in_channels,out_channels=self.hidden_channels,
                                         alpha=self.alpha,beta=self.beta,batch_size=self.batch_size,W=W).to(device)
        self.layer2 = LIFDensePopulation(in_channels=self.hidden_channels,out_channels=self.hidden_channels,
                                         alpha=self.alpha,beta=self.beta,batch_size=self.batch_size).to(device)
        self.layer3 = LIFDensePopulation(in_channels=self.hidden_channels,out_channels=self.hidden_channels,
                                         alpha=self.alpha,beta=self.beta,batch_size=self.batch_size).to(device)
        self.layer4 = LIFDensePopulation(in_channels=self.hidden_channels,out_channels=self.hidden_channels,
                                         alpha=self.alpha,beta=self.beta,batch_size=self.batch_size).to(device)
        self.layer5 = LIFDensePopulation(in_channels=self.hidden_channels,out_channels=self.out_channels,
                                         alpha=self.alpha,beta=self.beta,batch_size=self.batch_size).to(device)

    def forward(self,Sin):
        hidden1 = self.layer1(Sin)
        hidden2 = self.layer2(hidden1.S)
        hidden3 = self.layer3(hidden2.S)
        hidden4 = self.layer4(hidden3.S)
        out = self.layer5(hidden4.S)
        return out

    def init_states(self):
        self.layer1.init_state()
        self.layer2.init_state()
        self.layer3.init_state()
        self.layer4.init_state()
        self.layer5.init_state()

    def init_mod_weights(self,W):
        self.layer1.fc_layer.weight = torch.nn.Parameter(self.layer1.fc_layer.weight.data * torch.Tensor(W))
        self.layer2.fc_layer.weight = torch.nn.Parameter(self.layer2.fc_layer.weight.data * torch.Tensor(W))
        self.layer3.fc_layer.weight = torch.nn.Parameter(self.layer3.fc_layer.weight.data * torch.Tensor(W))
        self.layer4.fc_layer.weight = torch.nn.Parameter(self.layer4.fc_layer.weight.data * torch.Tensor(W))


class FiveHiddenModel(nn.Module):

    def __init__(self,in_channels,hidden_channels,out_channels,batch_size,alpha=.9,beta=.85,device='cpu',W=None):
        super(FiveHiddenModel, self).__init__()

        self.in_channels = in_channels
        self.hidden_channels = hidden_channels
        self.out_channels = out_channels
        self.alpha = alpha
        self.beta = beta
        self.batch_size = batch_size
        self.device = device
        self.W = W
        self.layer1 = LIFDensePopulation(in_channels=self.in_channels,out_channels=self.hidden_channels,
                                         alpha=self.alpha,beta=self.beta,batch_size=self.batch_size,W=W).to(device)
        self.layer2 = LIFDensePopulation(in_channels=self.hidden_channels,out_channels=self.hidden_channels,
                                         alpha=self.alpha,beta=self.beta,batch_size=self.batch_size).to(device)
        self.layer3 = LIFDensePopulation(in_channels=self.hidden_channels,out_channels=self.hidden_channels,
                                         alpha=self.alpha,beta=self.beta,batch_size=self.batch_size).to(device)
        self.layer4 = LIFDensePopulation(in_channels=self.hidden_channels,out_channels=self.hidden_channels,
                                         alpha=self.alpha,beta=self.beta,batch_size=self.batch_size).to(device)
        self.layer5 = LIFDensePopulation(in_channels=self.hidden_channels,out_channels=self.hidden_channels,
                                        alpha=self.alpha,beta=self.beta,batch_size=self.batch_size).to(device)
        self.layer6 = LIFDensePopulation(in_channels=self.hidden_channels,out_channels=self.hidden_channels,
                                         alpha=self.alpha,beta=self.beta,batch_size=self.batch_size).to(device)
        self.layer7 = LIFDensePopulation(in_channels=self.hidden_channels,out_channels=self.out_channels,
                                         alpha=self.alpha,beta=self.beta,batch_size=self.batch_size).to(device)
        #maybe try last layer non- spiking

    def forward(self,Sin):
        hidden1 = self.layer1(Sin)
        hidden2 = self.layer2(hidden1.S)
        hidden3 = self.layer3(hidden2.S)
        hidden4 = self.layer4(hidden3.S)
        hidden5 = self.layer5(hidden4.S)
        hidden6 = self.layer6(hidden5.S)
        out = self.layer7(hidden6.S)
        return out

    def init_states(self):
        self.layer1.init_state()
        self.layer2.init_state()
        self.layer3.init_state()
        self.layer4.init_state()
        self.layer5.init_state()
        self.layer6.init_state()
        self.layer7.init_state()

    def init_mod_weights(self,W):
        self.layer1.fc_layer.weight = torch.nn.Parameter(self.layer1.fc_layer.weight.data * torch.Tensor(W))
        self.layer2.fc_layer.weight = torch.nn.Parameter(self.layer2.fc_layer.weight.data * torch.Tensor(W))
        self.layer3.fc_layer.weight = torch.nn.Parameter(self.layer3.fc_layer.weight.data * torch.Tensor(W))
        self.layer4.fc_layer.weight = torch.nn.Parameter(self.layer4.fc_layer.weight.data * torch.Tensor(W))
        self.layer5.fc_layer.weight = torch.nn.Parameter(self.layer5.fc_layer.weight.data * torch.Tensor(W))
        self.layer6.fc_layer.weight = torch.nn.Parameter(self.layer6.fc_layer.weight.data * torch.Tensor(W))


In [51]:
nb_inputs  = 700
nb_hidden  = 200
nb_outputs = 20

time_step = 1e-3
nb_steps = 100
max_time = 1.4

batch_size = 256
if torch.cuda.is_available():
    device = torch.device("cuda")
    print(f'device: {device}')
else:
    device = torch.device("cpu")
    print(f'device: {device}')

device: cpu


In [45]:
# Here we load the Dataset
cache_dir = os.path.expanduser("~/data")
cache_subdir = "hdspikes"
heidel_utils.get_shd_dataset(cache_dir, cache_subdir)

train_file = h5py.File(os.path.join(cache_dir, cache_subdir, 'shd_train.h5'), 'r')
test_file = h5py.File(os.path.join(cache_dir, cache_subdir, 'shd_test.h5'), 'r')

x_train = train_file['spikes']
y_train = train_file['labels']
x_test = test_file['spikes']
y_test = test_file['labels']

Available at: /Users/svenkerstjens/data/hdspikes/shd_train.h5
Available at: /Users/svenkerstjens/data/hdspikes/shd_test.h5


RuntimeError: legacy constructor expects device type: cpu but device type: cuda was passed

In [14]:
device

device(type='cuda')

In [46]:
def sparse_data_generator_from_hdf5_spikes(X, y, batch_size, nb_steps, nb_units, max_time, shuffle=True):
    """ This generator takes a spike dataset and generates spiking network input as sparse tensors.

    Args:
        X: The data ( sample x event x 2 ) the last dim holds (time,neuron) tuples
        y: The labels
    """

    labels_ = np.array(y,dtype=np.int)
    number_of_batches = len(labels_)//batch_size
    sample_index = np.arange(len(labels_))

    # compute discrete firing times
    firing_times = X['times']
    units_fired = X['units']

    time_bins = np.linspace(0, max_time, num=nb_steps)

    if shuffle:
        np.random.shuffle(sample_index)

    total_batch_count = 0
    counter = 0
    while counter<number_of_batches:
        batch_index = sample_index[batch_size*counter:batch_size*(counter+1)]

        coo = [ [] for i in range(3) ]
        for bc,idx in enumerate(batch_index):
            times = np.digitize(firing_times[idx], time_bins)
            units = units_fired[idx]
            batch = [bc for _ in range(len(times))]

            coo[0].extend(batch)
            coo[1].extend(times)
            coo[2].extend(units)

        i = torch.LongTensor(coo).to(device)
        v = torch.FloatTensor(np.ones(len(coo[0]))).to(device)

        X_batch = torch.sparse.FloatTensor(i, v, torch.Size([batch_size,nb_steps,nb_units])).to(device)
        y_batch = torch.tensor(labels_[batch_index],device=device)

        yield X_batch.to(device=device), y_batch.to(device=device)

        counter += 1

In [52]:
import time

model = OneRecHiddenModel(in_channels=nb_inputs,hidden_channels=nb_hidden,out_channels=nb_outputs,batch_size=batch_size,W=None).to(device)

ce_loss = torch.nn.CrossEntropyLoss().to(device)

params = model.parameters()
opt = torch.optim.Adam(params, lr=2e-4, betas=[0., .95]) #lr is the learning rate
#decay = .9

epochs = 100

for epoch in range(epochs):
    tic = time.time()
    avg_loss = 0
    model.train()
    sum_acc=0
    count = 0

    #batches
    for x,y in sparse_data_generator_from_hdf5_spikes(x_train, y_train, batch_size, nb_steps, nb_units=nb_inputs, max_time=max_time, shuffle=True):
        
        model.init_states()

        #Sprobe = torch.zeros((batch_size,model.out_channels),device=device)
        out = torch.zeros((batch_size,model.out_channels))
        out_rec = [out]
        #timesteps
        for n in range(nb_steps):
            out_state = model(x.to_dense()[:,n])
            out_rec.append(out_state.U)
            #add decay for leakiness
            #collect spikes over time
            #Sprobe = decay * Sprobe + out_state.S
        out_rec = torch.stack(out_rec,dim=1)
        prediction = torch.max(out_rec,1).values

        #prediction = Sprobe
        #if i==0:
        #    print(prediction)

        #accuracy = val_accuracy(prediction,y_train[train_batch_ids[i]])

        #class labels only
        loss = ce_loss(prediction,y)

        print(f'batchloss:  {loss}')

        #tonic & torch neuromorphic

        loss.backward()
        opt.step()
        opt.zero_grad()
        #model.init_mod_weights(W2)
        #sum_acc = sum_acc + accuracy
        #sum_loss = sum_loss + loss
        avg_loss = ((avg_loss * count) + loss)/(count+1)
        count += 1
    #avg_train_acc = sum_acc/(len(train_batch_ids))
    #avg_loss = sum_loss/(len(train_batch_ids))
    #if i == 3:/'
    #loss_hist = loss_hist + [float(avg_loss)]
    #acc_hist = acc_hist + [float(avg_train_acc)]

    toc = time.time()
    print(f'time for epoch {epoch}: {toc-tic}')
    if epoch%1==0:
        print(f'epoch {epoch}: \n loss: {avg_loss}')
        #print(f'train_acc: {avg_train_acc}')
    #val_acc,_,_ = validation_acc(X_test,y_test,model,test_batch_ids)
    #val_acc_hist = val_acc_hist + [val_acc]
    #print(f'val_acc: {val_acc}')



Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  labels_ = np.array(y,dtype=np.int)


batchloss:  7.76291561126709
batchloss:  8.135481834411621
batchloss:  8.35159683227539


KeyboardInterrupt: 

In [37]:
def batch_accuracy(prediction,y):
    return (np.argmax(prediction.detach().numpy(),axis=1) == y.numpy()).sum()/256*100

In [53]:
model = OneRecHiddenModel(in_channels=nb_inputs,hidden_channels=nb_hidden,out_channels=nb_outputs,batch_size=batch_size,W=None).to(device)


In [55]:
model

OneRecHiddenModel(
  (layer1): LifRecPopulation(
    (fc_layer): Linear(in_features=700, out_features=200, bias=True)
    (rec_layer): Linear(in_features=200, out_features=200, bias=True)
  )
  (layer2): LifRecPopulation(
    (fc_layer): Linear(in_features=200, out_features=20, bias=True)
    (rec_layer): Linear(in_features=20, out_features=20, bias=True)
  )
)