# Analogous Recurrent ANN Trainer

This notebook can be used to generate and train a recurrent ANN so that the weights can be copied over to a SNN with the same architecture.

## Imports

In [1]:
import torch
import struct
import numpy as np
from torch import nn
import matplotlib.pyplot as plt
from torch.autograd import Variable
from torch.utils.data import Dataset, DataLoader

## Load Data

In [2]:
# load all data and prepare for vector conversion

# load data
f = open("Training Data\\train_5500.txt")
data = f.read()

# split data into sentences
sents = data.split('\n')

# split each sentence into words
for i in range(len(sents)):
    sents[i] = sents[i].split(' ')[:-1]

In [3]:
# prepare word2vector vocabulary (ty chatGPT :) )

def read_word_vectors(filepath):
    with open(filepath, 'rb') as f:
        header = f.readline()
        vocab_size, vector_size = map(int, header.split())
        binary_len = np.dtype('float32').itemsize * vector_size
        word_vectors = {}

        for _ in range(vocab_size):
            word = []
            while True:
                ch = f.read(1)
                if ch == b' ':
                    break
                if ch != b'\n':
                    word.append(ch)
            word = b''.join(word).decode('utf-8')
            vector = np.frombuffer(f.read(binary_len), dtype='float32')
            word_vectors[word] = vector

    return word_vectors

def get_word_vector(word, word_vectors):
    return word_vectors.get(word)

# Load the word vectors
word_vectors = read_word_vectors('Word2Vec from Paper\\word2vec\\trunk\\vectors.bin')

In [4]:
# test vocabulary

# Define a list of words to convert to vectors
words = ['example', 'word', 'vector', 'king', 'queen']

# Convert words to vectors
for word in words:
    vector = get_word_vector(word, word_vectors)
    print(vector.shape)
    if vector is not None:
        print(f"Word: {word}\nVector: {vector}\n")
    else:
        print(f"Word: {word} not found in vocabulary.\n")

(64,)
Word: example
Vector: [-2.4861143   2.514228   -2.51763     1.1023496  -1.9080325   0.2741574
  1.987924   -0.4649879  -1.3471494   3.144086   -1.9048123   1.5780666
 -0.08019609 -1.2507837  -2.59727    -0.28834745 -0.7053564  -2.2820096
 -1.7724434   1.340178    1.0592215   0.5715263  -0.39970756  0.19736235
  0.37494832 -0.23648897 -0.5271788  -0.87137115 -0.16628984  0.47225156
 -2.3885674   0.3888019   1.7539101  -0.90970224  0.7972985  -0.8713628
 -0.74113584  3.1902182   0.655787   -0.20875123 -0.16770692  2.0293825
 -0.6267522   0.5787317   1.579219    1.4347987  -0.7990051  -0.19155246
 -1.1973183   1.641335    2.0438645  -0.9134578  -1.5359813   0.15457954
 -1.0635711   2.7433052   0.22127318 -1.5445443  -0.5777184  -1.1103141
  0.9209189  -1.2365515   0.6689623   0.48781195]

(64,)
Word: word
Vector: [-3.9669743   4.8188896  -2.954842    2.0429592   0.17152616  3.8808334
  1.2621093   0.4342894   0.2122565  -1.6349137  -1.7049528  -1.1492262
  4.1847644  -0.6373846   1.

In [6]:
# perform word to vector conversion

# perform conversion
vec_data = []
for sent in sents[:-1]:
    vecs = []
    vecs.append(sent[0])
    for word in sent[1:]:
        try:
            vec = get_word_vector(word, word_vectors)
            vecs.append(torch.from_numpy(vec))
        except:
            pass
    vecs.append(torch.zeros(64))
    vec_data.append(vecs)

# pad all sentences to length of longest sentence
max_len = max([len(sent) for sent in vec_data])
vec_data_pad = []
for sent in vec_data:
    pad_len = max_len - len(sent)
    for i in range(pad_len):
        sent.append(torch.zeros(64))
    vec_data_pad.append(sent)
vec_data = vec_data_pad

# split into training and test data
train_data = vec_data[:5000]
test_data = vec_data[5000:-1]

# NOTE: first word of each sentence is correct categ. -- last sentence is empty (excluded)

  vecs.append(torch.from_numpy(vec))


In [7]:
# create DataSet which can be used with PyTorch DataLoader

ans_key = { 'DESC' :  0,
            'ENTY' :  1,
            'ABBR' :  2,
            'HUM'  :  3,
            'LOC'  :  4,
            'NUM'  :  5 }

class QuestionDataset(Dataset):
    """ Question Dataset """
    
    def __init__(self, data):
        """
        data = list of (list of words -- first word is label)
        """
        self.labels = []
        self.sents = []
        for sent in data:
            lab_val = ans_key[sent[0].split(":")[0]]
            lab_arr = torch.tensor(lab_val)
            self.labels.append(lab_arr)
            self.sents.append(sent[1:])
    
    def __len__(self):
        return len(self.labels)
    
    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()
        
        sent = self.sents[idx]
        label = self.labels[idx]
        return sent, label

train_DSet = QuestionDataset(train_data)
test_DSet = QuestionDataset(test_data)

In [8]:
# Create data loaders.

batch_size = 1
train_dataloader = DataLoader(train_DSet, batch_size=batch_size, shuffle=True)
test_dataloader = DataLoader(test_DSet, batch_size=batch_size, shuffle=True)

for X, y in test_dataloader:
    print(X)
    print(y)
    break

[tensor([[-0.7112,  3.9187, -2.3859,  1.1708, -2.0570,  1.6660,  2.9410,  0.9965,
          1.8875,  2.1013, -1.9081,  1.4046,  2.7684, -1.1823, -1.0634, -0.7583,
          0.4377, -3.2159, -1.2020,  1.5030,  0.8661,  0.9120, -0.7257,  0.8150,
         -1.4561,  1.0184, -3.1227, -0.7783, -0.3952, -0.3743, -1.6230, -0.2691,
          1.9328, -0.7855,  1.0034, -0.2061,  0.9701,  2.3458, -0.4255,  1.8818,
         -0.5267,  2.2756,  0.1094, -0.7850,  0.7655, -3.7835, -1.4916, -3.5457,
         -1.0294, -0.0488,  0.8952,  0.5742,  0.1470, -0.7216, -0.5818,  2.2442,
         -1.0479, -1.2184, -1.7076,  0.3324,  0.3778, -1.3552,  1.4898,  2.8329]]), tensor([[-0.1255, -0.4126, -1.5036,  0.5658, -1.5322, -0.3062,  2.7857,  0.9233,
          0.3195, -0.4306, -2.1697,  0.2383, -0.0840, -0.7352,  1.3612, -0.5537,
         -0.4857, -0.1546, -1.3385, -0.5487, -0.7021,  1.0826, -0.8593,  1.2547,
         -1.8827,  0.5152, -2.4523, -0.1934, -1.8966, -1.3499,  1.0865, -0.1157,
         -1.6995, -0.402

## Create Model

In [19]:
# Get cpu, gpu or mps device for training.
device = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)
print(f"Using {device} device")

# Define model
class NeuralNetwork(nn.Module):
    def __init__(self):
        """ Builds recurrent neural network model """
        super().__init__()
        self.ff = nn.Sequential(
            nn.Linear(64, 48, bias=False),
            nn.ReLU(),
        )
        self.rnn = nn.RNN(48, 16, nonlinearity='relu', bias=False)
        self.out = nn.Sequential(
            nn.Linear(16, 6, bias=False),
            nn.LogSoftmax(dim=1),
        )

    def forward(self, q):
        """ Implements feed-forward then recurrent layer """
        ff_q = []
        for word in q:
            ff_q.append(self.ff(word))
        ff_q = torch.stack(ff_q)
        h_N = Variable(torch.zeros(1, 16)).to(device)
        rnn_out = Variable(torch.zeros(1, 16)).to(device)
        for word in ff_q:
            if not torch.all(word.eq(0)):
                rnn_out, h_N = self.rnn(word, h_N)
            else:
                break
        output = self.out(rnn_out)
        return output

model = NeuralNetwork().to(device)
print(model)

Using cuda device
NeuralNetwork(
  (ff): Sequential(
    (0): Linear(in_features=64, out_features=48, bias=False)
    (1): ReLU()
  )
  (rnn): RNN(48, 16, bias=False)
  (out): Sequential(
    (0): Linear(in_features=16, out_features=6, bias=False)
    (1): LogSoftmax(dim=1)
  )
)


In [20]:
# check model

param_list = [*model.parameters()]
len(param_list)

4

## Optimize Model

In [27]:
# create loss function and optimizer
loss_fn = nn.NLLLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

# define training routine
def train(dataloader, model, loss_fn, optimizer):
    flag = False
    size = len(dataloader.dataset)
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        X, y = torch.stack(X).to(device), y.to(device)

        # Compute prediction error and optimize
        optimizer.zero_grad()
        """ now handled inside model -- keep to count empties
        for word in X:
            if torch.all(word.eq(0)):
                print("FLAG!")
                print(X)
                flag = True
                break
            else:
                break
        """
        if not flag:
            pred = model(X)
            loss = loss_fn(pred, y)
            loss.backward()
            optimizer.step()

            # clamp diagonal hidden-hidden weights of RNN layer to 0
            model.rnn._parameters['weight_hh_l0'].data.diagonal().clamp_(min=0, max=0)

            # print loss and accuracy at selected iterations
            if batch % 1000 == 0:
                loss, current = loss.item(), (batch + 1)
                print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")
        else:
            flag = False
            pass

# define test routine
def test(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0
    i = 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = torch.stack(X).to(device), y.to(device) #torch.FloatTensor(y).to(device)
            pred = model(X)
            target = y#.argmax(1)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == target).type(torch.float).sum().item()
    test_loss /= num_batches
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

In [22]:
total_epochs = 0

In [23]:
# perform training and test performance over epochs

epochs = 40
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train(train_dataloader, model, loss_fn, optimizer)
    test(test_dataloader, model, loss_fn)
print("Done!")
total_epochs += epochs
print("\nTotal epochs: {0}".format(total_epochs))

Epoch 1
-------------------------------
loss: 1.889511  [    1/ 5000]
FLAG!
tensor([[[0., 0., 0.,  ..., 0., 0., 0.]],

        [[0., 0., 0.,  ..., 0., 0., 0.]],

        [[0., 0., 0.,  ..., 0., 0., 0.]],

        ...,

        [[0., 0., 0.,  ..., 0., 0., 0.]],

        [[0., 0., 0.,  ..., 0., 0., 0.]],

        [[0., 0., 0.,  ..., 0., 0., 0.]]], device='cuda:0')
loss: 1.670135  [ 1001/ 5000]
loss: 0.656711  [ 2001/ 5000]
FLAG!
tensor([[[0., 0., 0.,  ..., 0., 0., 0.]],

        [[0., 0., 0.,  ..., 0., 0., 0.]],

        [[0., 0., 0.,  ..., 0., 0., 0.]],

        ...,

        [[0., 0., 0.,  ..., 0., 0., 0.]],

        [[0., 0., 0.,  ..., 0., 0., 0.]],

        [[0., 0., 0.,  ..., 0., 0., 0.]]], device='cuda:0')
loss: 1.153944  [ 3001/ 5000]
FLAG!
tensor([[[0., 0., 0.,  ..., 0., 0., 0.]],

        [[0., 0., 0.,  ..., 0., 0., 0.]],

        [[0., 0., 0.,  ..., 0., 0., 0.]],

        ...,

        [[0., 0., 0.,  ..., 0., 0., 0.]],

        [[0., 0., 0.,  ..., 0., 0., 0.]],

        [[0., 0

loss: 2.712148  [ 2001/ 5000]
loss: 0.143268  [ 3001/ 5000]
loss: 0.052290  [ 4001/ 5000]
FLAG!
tensor([[[0., 0., 0.,  ..., 0., 0., 0.]],

        [[0., 0., 0.,  ..., 0., 0., 0.]],

        [[0., 0., 0.,  ..., 0., 0., 0.]],

        ...,

        [[0., 0., 0.,  ..., 0., 0., 0.]],

        [[0., 0., 0.,  ..., 0., 0., 0.]],

        [[0., 0., 0.,  ..., 0., 0., 0.]]], device='cuda:0')
FLAG!
tensor([[[0., 0., 0.,  ..., 0., 0., 0.]],

        [[0., 0., 0.,  ..., 0., 0., 0.]],

        [[0., 0., 0.,  ..., 0., 0., 0.]],

        ...,

        [[0., 0., 0.,  ..., 0., 0., 0.]],

        [[0., 0., 0.,  ..., 0., 0., 0.]],

        [[0., 0., 0.,  ..., 0., 0., 0.]]], device='cuda:0')
Test Error: 
 Accuracy: 69.4%, Avg loss: 0.872306 

Epoch 9
-------------------------------
loss: 0.071422  [    1/ 5000]
loss: 0.095234  [ 1001/ 5000]
FLAG!
tensor([[[0., 0., 0.,  ..., 0., 0., 0.]],

        [[0., 0., 0.,  ..., 0., 0., 0.]],

        [[0., 0., 0.,  ..., 0., 0., 0.]],

        ...,

        [[0., 0., 0

In [24]:
# in case training takes a wrong turn
temp_model_backup = model

In [25]:
# check all weights

param_list = [*model.parameters()]
i = 0
for lay in param_list:
    i += 1
    print("Layer {0}".format(i))
    print(lay.shape)
    print(lay)

Layer 1
torch.Size([48, 64])
Parameter containing:
tensor([[ 0.0558, -0.0961,  0.1141,  ...,  0.3348, -0.0348,  0.1416],
        [-0.2190,  0.0531,  0.0683,  ..., -0.1284,  0.3164, -0.0308],
        [ 0.2098,  0.0447, -0.2314,  ...,  0.2322,  0.0516, -0.1242],
        ...,
        [-0.1488,  0.0384,  0.1104,  ..., -0.1240, -0.1693, -0.1523],
        [-0.0349, -0.2311,  0.0342,  ...,  0.3226, -0.1347, -0.0107],
        [-0.0522, -0.1232, -0.5658,  ..., -0.1804,  0.0988,  0.0124]],
       device='cuda:0', requires_grad=True)
Layer 2
torch.Size([16, 48])
Parameter containing:
tensor([[ 2.1701e-01,  3.5191e-01, -2.4518e-01,  7.1747e-02,  3.5741e-01,
         -2.2205e-03,  2.5283e-01,  6.6492e-02,  3.3923e-01,  2.8964e-01,
         -4.9649e-02,  2.0624e-01, -9.9230e-04,  5.3794e-01, -1.5293e-01,
          6.6349e-02, -3.5385e-02,  2.5008e-01,  1.1570e-01, -7.7591e-02,
          1.9462e-01, -2.4641e-01,  1.2866e-01, -1.9357e-01,  4.3126e-01,
          4.3302e-01, -8.3412e-02,  6.9357e-02,  3

## Save Model

In [26]:
model_dir = "Recurrent ANN Models/"
model_name = "RANN_13.pth"

torch.save(model.state_dict(), model_dir + model_name)
print("Saved PyTorch Model State to " + model_dir + model_name)

Saved PyTorch Model State to Recurrent ANN Models/RANN_13.pth
