## Data Generation
Generate data by generating random integers, converting to binary representation, then dumping into array

In [1]:
import torch
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [2]:
import numpy as np
import re
np.random.seed(12345)
intrep = np.random.randint(0, 256)
print(intrep)
binrep = np.binary_repr(intrep, width=8)
print(binrep)
arrep = np.flip(np.array(re.split('',binrep)[1:-1], dtype=int))
print(arrep)

226
11100010
[0 1 0 0 0 1 1 1]


In [3]:
import numpy as np
import re
np.random.seed(54321)

ints = np.random.randint(0, 256, size=100_000)
A = ints.copy()[0:50_000].astype(np.int64)
B = ints.copy()[50_000:].astype(np.int64)
C = A*B.astype(np.int64)

Astr, Bstr, Cstr = np.empty(len(A), dtype=object), np.empty(len(A), dtype=object), np.empty(len(A), dtype=object)

for i in range(len(A)):
    Astr[i] = np.binary_repr(A[i], width=8)
    Bstr[i] = np.binary_repr(B[i], width=8)
    Cstr[i] = np.binary_repr(C[i], width=16)
print(Astr)

Aarr, Barr, Carr = np.empty((len(A),8), dtype=object), np.empty((len(A),8), dtype=object), np.empty((len(A),16), dtype=object)

for i in range(len(A)):
    Aarr[i] = np.flip(np.array(re.split('', Astr[i])[1:-1], dtype=int))
    Barr[i] = np.flip(np.array(re.split('', Bstr[i])[1:-1], dtype=int))
    Carr[i] = np.flip(np.array(re.split('', Cstr[i])[1:-1], dtype=int))
print(Aarr)

['01010001' '10001010' '00011010' ... '11100100' '00010100' '11010101']
[[1 0 0 ... 0 1 0]
 [0 1 0 ... 0 0 1]
 [0 1 0 ... 0 0 0]
 ...
 [0 0 1 ... 1 1 1]
 [0 0 1 ... 0 0 0]
 [1 0 1 ... 0 1 1]]


Function form of the above code for main code base

In [4]:
'''Functions to generate the training/test data, and interlace binary numbers for the input'''

import numpy as np
import re

def interlace(in1:np.ndarray, in2:np.ndarray) -> np.ndarray:
    '''
    Interlaces in1 and in2
    
    Example: 
    in1 = [[[0 1], [1 0], ...], ...]
    in2 = [[[1 0], [1 0], ...], ...]
    returns: [[[0 1], [1 0], [1 0], [1 0]], ...]
    '''
    laced = np.empty((in1.shape[0], in1.shape[1]*2, 2), dtype=int)

    for i in range(in1.shape[0]):
        for j in range(in1.shape[1]):
            laced[i, 2*j] = in1[i,j]
            laced[i, (2*j)+1] = in2[i,j]

    return laced

def generate_train_test(train_size, test_size, seed) -> np.ndarray:
    '''
    Generates the training and testing data given a random seed.

    We use one-hot defintiions of:
    1 = [0 1], 0 = [1 0]

    Returns:
    A_train - 2D array where each row is separated digits of 8-bit binary number
    B_train - 2D array where each row is separated digits of 8-bit binary number
    C_train - 2D array where each row is separated digits of a 16-bit binary number of A*B
    A_test - ...
    B_test - ...
    C_test - ...
    '''

    total_size = train_size + test_size

    # generate the random integers 0-255
    np.random.seed(seed)
    ints = np.random.randint(0, 256, size=total_size*2)
    A = ints.copy()[0:total_size].astype(np.int64)
    B = ints.copy()[total_size:].astype(np.int64)
    C = A*B.astype(np.int64)

    # arrays to hold the binary-representation-strings of the converted integers
    Astr, Bstr, Cstr = np.empty(len(A), dtype=object), np.empty(len(A), dtype=object), np.empty(len(A), dtype=object)

    for i in range(len(A)):
        Astr[i] = np.binary_repr(A[i], width=8)
        Bstr[i] = np.binary_repr(B[i], width=8)
        Cstr[i] = np.binary_repr(C[i], width=16)

    # arrays to hold the 0's and 1's that come from splitting the strings of binary-representation
    Aarr, Barr, Carr = np.empty((len(A),8,2), dtype=object), np.empty((len(A),8,2), dtype=object), np.empty((len(A),16, 2), dtype=object)

    for i in range(len(A)):
        for j, binary in enumerate(np.flip(np.array(re.split('', Astr[i])[1:-1], dtype=int))):
            Aarr[i, j] = [1, 0] if binary == 0 else [0, 1]

        for j, binary in enumerate(np.flip(np.array(re.split('', Bstr[i])[1:-1], dtype=int))):
            Barr[i, j] = [1, 0] if binary == 0 else [0, 1]

        for j, binary in enumerate(np.flip(np.array(re.split('', Cstr[i])[1:-1], dtype=int))):
            Carr[i, j] = [1, 0] if binary == 0 else [0, 1]
    
    return Aarr[0:train_size], Barr[0:train_size], Carr[0:train_size], Aarr[train_size:], Barr[train_size:], Carr[train_size:]

train_size = 10_000
test_size = 2_000

atr,btr,ctr,ate,bte,cte = generate_train_test(train_size, test_size, 12345)
# print(atr[0])
print(btr[0])
abtr, batr = interlace(atr, btr), interlace(btr, atr)
abte, bate = interlace(ate, bte), interlace(bte, ate)


[[0 1]
 [1 0]
 [0 1]
 [0 1]
 [1 0]
 [0 1]
 [1 0]
 [1 0]]


## Model

In [5]:
import torch
import torch.nn as nn
import torch.nn.functional as func

class Net(nn.Module):
    def __init__(self):
        super().__init__()

        self.lstm = nn.RNN(2, 32, num_layers=4, batch_first=True) # input size is either 2 or 1?
        self.linlayer1 = nn.Linear(32, 512) # go from hidden layer size of lstm to reduced size
        self.linLayer2 = nn.Linear(512, 128)
        self.linLayer3 = nn.Linear(128, 2) # output of 32 which gets reshaped
        self.softmax = nn.Softmax(dim=2)

    def forward(self, x, input_size):
        outputs, (hidden_state, cell_state) = self.lstm(x)
        # print('outputs shape:', outputs.shape)
        # print('hidden_state shape:', hidden_state.shape)
        # print('cell state shape:', cell_state.shape)

        # outputs, hidden_state = self.lstm(x)
        # output = self.linlayer1(hidden_state[-1])
        output = self.linlayer1(outputs)
        output = torch.sigmoid(output)

        output = self.linLayer2(output)
        output = torch.sigmoid(output)

        output = self.linLayer3(output)
        output = torch.sigmoid(output)
        # print('Layer three output size: ', output.shape)
        # output = torch.reshape(output, (input_size, 16, 2))
        return self.softmax(output)

    def reset(self):
        self.lstm.reset_parameters()
        self.linlayer1.reset_parameters()
        self.linLayer2.reset_parameters()
        self.linLayer3.reset_parameters()

class Net2(nn.Module):
    def __init__(self):
        super().__init__()

        self.lstm = nn.LSTM(2, 128, num_layers=2, batch_first=True) # input size is either 2 or 1?
        # self.rnn_to_fcl = nn.Linear(512, 64) # go from hidden layer size of lstm to reduced size
        self.fcl_to_output = nn.Linear(128, 32)
        self.softmax = nn.Softmax(dim=2)

    def forward(self, x, input_size):
        outputs, (h_n,c_n)= self.lstm(x)
        # print('outputs shape:', outputs.shape)
        # print('hidden_state shape:', hidden_state.shape)
        # print('cell state shape:', cell_state.shape)
        # output = self.rnn_to_fcl(h_n[-1])
        output = self.fcl_to_output(h_n[-1])
        output = torch.reshape(output, (input_size, 16, 2))
        return output

    def reset(self):
        self.lstm.reset_parameters()
        # self.rnn_to_fcl.reset_parameters()
        self.fcl_to_output.reset_parameters()

model = Net2().to(device)

In [6]:
import torch.optim as optim
learning_rate = 0.005
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

abtr = torch.from_numpy(abtr).type(torch.float).to(device)
batr = torch.from_numpy(batr).type(torch.float).to(device)
abte = torch.from_numpy(abte).type(torch.float).to(device)
bate = torch.from_numpy(bate).type(torch.float).to(device)
ctr = torch.from_numpy(ctr.astype(int)).type(torch.float).to(device)
cte = torch.from_numpy(cte.astype(int)).type(torch.float).to(device)

print(model.forward(abtr, train_size).shape, ctr.shape)
print(model.forward(abtr, train_size)[0])
# print(ctr[0])

torch.Size([10000, 16, 2]) torch.Size([10000, 16, 2])
tensor([[-0.0666, -0.0884],
        [-0.0886,  0.0245],
        [-0.0830, -0.0928],
        [ 0.1164, -0.0422],
        [ 0.0236, -0.0024],
        [-0.0382, -0.0276],
        [-0.0266, -0.0011],
        [-0.0116, -0.0234],
        [-0.0409,  0.0094],
        [ 0.0369,  0.0058],
        [-0.0529, -0.0739],
        [ 0.0116, -0.0543],
        [ 0.0435,  0.0352],
        [ 0.0893, -0.0641],
        [ 0.0248,  0.0333],
        [ 0.0641,  0.0730]], device='cuda:0', grad_fn=<SelectBackward0>)


In [7]:
batch_size = 200
test_set = torch.utils.data.TensorDataset(abte, cte)
train_set = torch.utils.data.TensorDataset(abtr, ctr)
test_loader = torch.utils.data.DataLoader(test_set, batch_size=batch_size, shuffle=False)
train_loader = torch.utils.data.DataLoader(train_set, batch_size=batch_size, shuffle=True)

In [8]:
num_epochs = 2000
display_epochs = 100



obj_vals= []
cross_vals= []

model.reset() # reset your parameters
loss = nn.CrossEntropyLoss()

# def closure():
#     optimizer.zero_grad()
#     objective = loss(model.forward(abtr), ctr, abtr.shape, ctr.shape)
#     objective.backward()
#     return objective

for epoch in range(num_epochs):

    for (train, train_targets), (test, test_targets) in zip(train_loader, test_loader):

        obj_val = loss(model.forward(train, batch_size), train_targets)
        
        optimizer.zero_grad()
        obj_val.backward()
        optimizer.step()
        obj_vals.append(obj_val.item())
                
        # as it trains check how well it tests
        with torch.no_grad(): 
            # don't track calculations in the following scope for the purposes of gradients
            cross_val = loss(model.forward(test, batch_size), test_targets)
            cross_vals.append(cross_val)

    if (epoch+1) % display_epochs == 0:
        print ('Epoch [{}/{}]\t Training Loss: {:.6f}'.format(epoch+1, num_epochs, obj_val.item()))
    if (epoch+1) % display_epochs == 0:
        print ('Epoch [{}/{}]\t Test Loss: {:.6f}'.format(epoch+1, num_epochs, cross_val.item()))
        
print('Final training loss: {:.4f}'.format(obj_vals[-1]))
print('Final test loss: {:.4f}'.format(cross_vals[-1]))

Epoch [100/2000]	 Training Loss: 18.913481
Epoch [100/2000]	 Test Loss: 19.411474
Epoch [200/2000]	 Training Loss: 17.707470
Epoch [200/2000]	 Test Loss: 19.187168
Epoch [300/2000]	 Training Loss: 17.402878
Epoch [300/2000]	 Test Loss: 19.587725
Epoch [400/2000]	 Training Loss: 17.420177
Epoch [400/2000]	 Test Loss: 19.682253
Epoch [500/2000]	 Training Loss: 17.325930
Epoch [500/2000]	 Test Loss: 19.491360
Epoch [600/2000]	 Training Loss: 17.201101
Epoch [600/2000]	 Test Loss: 19.661449
Epoch [700/2000]	 Training Loss: 18.054184
Epoch [700/2000]	 Test Loss: 20.148315
Epoch [800/2000]	 Training Loss: 17.173740
Epoch [800/2000]	 Test Loss: 20.151848
Epoch [900/2000]	 Training Loss: 17.257605
Epoch [900/2000]	 Test Loss: 19.901794
Epoch [1000/2000]	 Training Loss: 17.367727
Epoch [1000/2000]	 Test Loss: 20.063969
Epoch [1100/2000]	 Training Loss: 17.261475
Epoch [1100/2000]	 Test Loss: 20.143269
Epoch [1200/2000]	 Training Loss: 17.196390
Epoch [1200/2000]	 Test Loss: 20.308262
Epoch [130

In [22]:
np.set_printoptions(formatter={'float': lambda x: "{0:0.1f}".format(x)})
pred = model.forward(abte, test_size)
softmax = nn.Softmax(dim=2)
pred = softmax(pred).cpu().detach().numpy()

preds = np.round(pred).astype(int)
preds[0]

array([[1, 0],
       [1, 0],
       [0, 1],
       [0, 1],
       [0, 1],
       [0, 1],
       [0, 1],
       [1, 0],
       [0, 1],
       [0, 1],
       [1, 0],
       [1, 0],
       [1, 0],
       [0, 1],
       [0, 1],
       [1, 0]])

In [23]:
labels = test_loader.dataset[:][1].cpu().detach().numpy().astype(int)
data = test_loader.dataset[:][0]
# preds = np.round(model.forward(data, data.shape[0]).cpu().detach().numpy()).astype(int)

pred_sum = 0
for label, pred in zip(labels, preds):
    if np.array_equal(label, pred): pred_sum += 1

print(pred_sum/labels.shape[0])
print(labels.shape[0])

0.2235
2000
