In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.nn.utils.rnn import pad_packed_sequence, pack_padded_sequence, pad_sequence
import numpy as np

In [2]:
trainDatasetSize = 100000
maxBStringLen = 50
batch_size = 32
iterations = 10000
epochs = 2

In [3]:
X_train = np.random.randint(2, size=(trainDatasetSize, maxBStringLen, 1))
Y_train = (X_train.cumsum(axis=1) % 2 == 0).astype("float32")

X_train = torch.from_numpy(X_train).float()
Y_train = torch.from_numpy(Y_train)

In [4]:
print(torch.cuda.is_available())
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

True


In [5]:
class xor_variable(nn.Module):
    def __init__(self):
        super(xor_variable, self).__init__()
        self.hidden_size = 2
        self.input_size = 1
        self.lstm = nn.LSTM(self.input_size, self.hidden_size, batch_first=True)
        self.fc = nn.Linear(self.hidden_size, 1)
    def forward(self, strings, lengths):
        packed_inputs = pack_padded_sequence(strings, lengths, batch_first=True, enforce_sorted=False).to(device)
        lstm_out, _ = self.lstm(packed_inputs)

        unpacked, _ = pad_packed_sequence(lstm_out, batch_first=True)

        logits = self.fc(unpacked)
        predictions = nn.Sigmoid()(logits)

        return predictions

    # def forward(self, inputs, lengths):
    #     # pack the inputs
    #     packed_inputs = pack_padded_sequence(inputs, lengths, batch_first=True).to(device)

    #     lstm_out, _ = self.lstm(packed_inputs)

    #     unpacked, _ = pad_packed_sequence(lstm_out, batch_first=True)

    #     logits = self.fc(unpacked)
    #     predictions = nn.Sigmoid()(logits)

    #     return predictions

In [6]:
def adjust_lengths(inputs, targets=None):
    cur_batch_size = inputs.size()[0]
    lengths = np.random.randint(1, maxBStringLen, size=cur_batch_size, dtype=int)

    lengths[0] = maxBStringLen

    lengths = -np.sort(-lengths)

    for i, sample_length in enumerate(lengths):
        inputs[i, lengths[i]:, ] = 0
        if str(type(targets)) != "<class 'NoneType'>":
            targets[i, lengths[i]:, ] = 0

    return lengths

In [7]:
model = xor_variable().to(device)
idxs = torch.randint(trainDatasetSize, (batch_size,))
X_batch = X_train[idxs].to(device)
Y_batch = Y_train[idxs].to(device) 
lengths_batch = adjust_lengths(X_batch, Y_batch)
model(X_batch, lengths_batch).shape

torch.Size([32, 50, 1])

In [8]:
optimizer = optim.Adam(model.parameters())
bce = nn.BCELoss()

In [9]:
def train(model, X, Y, optimizer, criterion):
    model.train()
    total_loss = 0
    for i in range(1, iterations+1):
        idxs = torch.randint(trainDatasetSize, (batch_size,))
        X_batch = X[idxs].to(device)
        Y_batch = Y[idxs].to(device)
        lengths = adjust_lengths(X_batch, Y_batch)

        optimizer.zero_grad()

        output = model(X_batch, lengths)

        loss = criterion(output, Y_batch)

        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        if i % 1000 == 0:
            accuracy = ((output > 0.5) == (Y_batch > 0.5)).sum() / (batch_size*50) * 100
            print(f"Training Iteration: {i}, Accuracy: {accuracy.item()}, Loss: {loss.item()}")

In [None]:
for i in range(epochs):
    print(f"EPOCH #{i+1}")
    train(model, X_train, Y_train, optimizer, bce)
    print()

EPOCH #1
Training Iteration: 1000, Accuracy: 71.375, Loss: 0.5426816344261169
Training Iteration: 2000, Accuracy: 72.375, Loss: 0.4594939649105072
Training Iteration: 3000, Accuracy: 70.5625, Loss: 0.45058146119117737
Training Iteration: 4000, Accuracy: 76.62499237060547, Loss: 0.36482033133506775
Training Iteration: 5000, Accuracy: 100.0, Loss: 0.07718625664710999
Training Iteration: 6000, Accuracy: 100.0, Loss: 0.044984739273786545
Training Iteration: 7000, Accuracy: 100.0, Loss: 0.026605801656842232
Training Iteration: 8000, Accuracy: 100.0, Loss: 0.016972707584500313
Training Iteration: 9000, Accuracy: 100.0, Loss: 0.01178149413317442
Training Iteration: 10000, Accuracy: 100.0, Loss: 0.007261615712195635

EPOCH #2
Training Iteration: 1000, Accuracy: 100.0, Loss: 0.004921977408230305
Training Iteration: 2000, Accuracy: 100.0, Loss: 0.003196754027158022
Training Iteration: 3000, Accuracy: 100.0, Loss: 0.0021345613058656454
Training Iteration: 4000, Accuracy: 100.0, Loss: 0.0014699203

In [None]:
testDatasetSize = 10000
X_test = np.random.randint(2, size=(testDatasetSize, maxBStringLen, 1))
Y_test = (X_test.cumsum(axis=1) % 2 == 0).astype("float32")

X_test = torch.from_numpy(X_test).float().to(device)
Y_test = torch.from_numpy(Y_test).to(device)

lengths = adjust_lengths(X_test, Y_test)
output = model(X_test, lengths)
print(f"Test Accuracy: {((output > 0.5) == (Y_test > 0.5)).sum() / (testDatasetSize*50) * 100}")