In [None]:
pip install portalocker

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting portalocker
  Downloading portalocker-2.7.0-py2.py3-none-any.whl (15 kB)
Installing collected packages: portalocker
Successfully installed portalocker-2.7.0


In [None]:
# Stacked LSTM implementation
# Currently, LSTM layer number is fixed at 3. Investigating how to make layer number variable while allowing gradients to flow properly
# At the moment, the issue is the hidden states and cell states not being able to be concantenated into a single tensor. Doing so would
# allow for variable layers, but this stops gradient flow due to tensor slicing being an in-place operation, so another method needs to
# be found to achieve this result

import torch
from torchtext import datasets
from torch import nn
import random

# Architecture hyperparams
num_chars = 64
LSTM_sz = 512
LSTM_layer_n = 3
dropout_p = 0.5
train_batch_sz = 50
truncate_len = 100

sample_topk = 3
sample_freq = 25
special_char = [" ", "<", ">", "/", "\"", "\'", ":", ";", ".", "(", ")", "!"]

# learning rate
lr = 3e-4

# Epochs to train model for (each epoch loops through the corpus once)
epochs = 50

# Number of characters to sample from model each testing cycle
sample_length = 500

# Convert character to index
def char_index(x):
  if ord(x) < 91 and ord(x) > 64:
    return ord(x) - 65
  if ord(x) < 123 and ord(x) > 96:
    return ord(x) - 71
  return special_char.index(x) + 52

# Filter characters in input data for relevant characters
def keep_char(x):
  return (ord(x) < 91 and ord(x) > 64) or (ord(x) < 123 and ord(x) > 96) or x in special_char

# Convert index to character
def index_char(ind):
  if ind < 26:
    return chr(ind + 65)
  if ind < 52:
    return chr(ind + 71)
  return special_char[ind - 52]

class LSTM(nn.Module):
  def __init__(self, sz):
    super().__init__()

    self.f = nn.Sequential(
        nn.Linear(2 * sz, sz),
        nn.Sigmoid()
    )

    self.i1 = nn.Sequential(
        nn.Linear(2 * sz, sz),
        nn.Sigmoid()
    )

    self.i2 = nn.Sequential(
        nn.Linear(2 * sz, sz),
        nn.Tanh()
    )

    self.o = nn.Sequential(
        nn.Linear(2 * sz, sz),
        nn.Sigmoid()
    )

    self.dropout = nn.Dropout(dropout_p)

  def forward(self, inp, hidden, cell):

    inp = self.dropout(inp)
    xh = torch.cat((inp, hidden))
    cell = cell * self.f(xh)
    cell = cell + (self.i1(xh) * self.i2(xh))
    hidden = torch.tanh(cell) * self.o(xh)

    return hidden, hidden, cell

class StackedLSTM(nn.Module):
  def __init__(self, layer_num, inp_sz, LSTM_sz):
    super().__init__()
    self.layers = layer_num

    self.embed = nn.Embedding(inp_sz, LSTM_sz)
    self.LSTM_layers = nn.ModuleList([LSTM(LSTM_sz).to(device) for _ in range(layer_num)])
    self.output = nn.Linear(LSTM_sz, inp_sz)

  def forward(self, inp, hidden_state1, hidden_state2, hidden_state3, cell_state1, cell_state2, cell_state3):
    out = self.embed(inp)
    new_hidden1 = hidden_state1
    new_hidden2 = hidden_state2
    new_hidden3 = hidden_state3
    new_cell1 = cell_state1
    new_cell2 = cell_state2
    new_cell3 = cell_state3

    out, new_hidden1, new_cell1 = self.LSTM_layers[0](out, new_hidden1, new_cell1)
    out, new_hidden2, new_cell2 = self.LSTM_layers[1](out, new_hidden2, new_cell2)
    out, new_hidden3, new_cell3 = self.LSTM_layers[2](out, new_hidden3, new_cell3)

    out = torch.softmax(self.output(out), dim = 0)

    return out, new_hidden1, new_hidden2, new_hidden3, new_cell1, new_cell2, new_cell3

# Check if GPU is available
device = "cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu"

model = StackedLSTM(LSTM_layer_n, num_chars, LSTM_sz).to(device)
opt = torch.optim.Adam(model.parameters(), lr = lr)
criterion = nn.NLLLoss()

# Train model
def training_cycle():
  # Training data (IMDB database in torchtext)
  training_dataloader = iter(datasets.IMDB(split="train", root = "data"))

  model.train()

  train_cycle_num = 1

  while True:
    try:
      print(f"Train Cycle: {train_cycle_num}")
      train_cycle_num += 1

      loss = 0
      sampled_chars = 0

      for _ in range(train_batch_sz):
        string = ''.join(filter(keep_char, next(training_dataloader)[1]))

        hidden1 = torch.zeros(LSTM_sz).to(device)
        hidden2 = torch.zeros(LSTM_sz).to(device)
        hidden3 = torch.zeros(LSTM_sz).to(device)
        cell1 = torch.zeros(LSTM_sz).to(device)
        cell2 = torch.zeros(LSTM_sz).to(device)
        cell3 = torch.zeros(LSTM_sz).to(device)

        for i in range(0, min(len(string) - 1, truncate_len - 1)):
          out, hidden1, hidden2, hidden3, cell1, cell2, cell3 = model(torch.tensor(char_index(string[i])).to(device), hidden1, hidden2, hidden3, cell1, cell2, cell3)

          loss += criterion(torch.log(out), torch.tensor(char_index(string[i+1])).to(device))
          sampled_chars += 1

      loss = loss / sampled_chars

      model.zero_grad()
      loss.backward()

      torch.nn.utils.clip_grad_norm_(model.parameters(), 3)
      opt.step()

      print(f"Average Loss Per Character: {round(loss.item(), 3)}")

      if (train_cycle_num % sample_freq == 0):
        test_cycle()

    except StopIteration:
      break

# Sample from model to test progress every once in a while
def test_cycle():
  model.eval()
  init_char = random.randint(0, num_chars - 1)

  hidden1 = torch.zeros(LSTM_sz).to(device)
  hidden2 = torch.zeros(LSTM_sz).to(device)
  hidden3 = torch.zeros(LSTM_sz).to(device)
  cell1 = torch.zeros(LSTM_sz).to(device)
  cell2 = torch.zeros(LSTM_sz).to(device)
  cell3 = torch.zeros(LSTM_sz).to(device)

  for i in range(sample_length):
    print(index_char(init_char), end = "")

    out, hidden1, hidden2, hidden3, cell1, cell2, cell3 = model(torch.tensor(init_char).to(device), hidden1, hidden2, hidden3, cell1, cell2, cell3)
    top_chars = torch.topk(out, sample_topk)
    init_char = top_chars[1][list(torch.utils.data.WeightedRandomSampler(nn.functional.softmax(top_chars[0], dim = 0), 1))[0]].item()

# Training cycles
for epoch in range(epochs):
  print(f"Epoch {epoch}")
  training_cycle()

Epoch 0
Train Cycle: 1
Average Loss Per Character: 4.162
Train Cycle: 2
Average Loss Per Character: 4.153
Train Cycle: 3
Average Loss Per Character: 4.142
Train Cycle: 4
Average Loss Per Character: 4.127
Train Cycle: 5
Average Loss Per Character: 4.097
Train Cycle: 6
Average Loss Per Character: 4.048
Train Cycle: 7
Average Loss Per Character: 3.95
Train Cycle: 8
Average Loss Per Character: 3.751
Train Cycle: 9
Average Loss Per Character: 3.465
Train Cycle: 10
Average Loss Per Character: 3.422
Train Cycle: 11
Average Loss Per Character: 3.334
Train Cycle: 12
Average Loss Per Character: 3.247
Train Cycle: 13
Average Loss Per Character: 3.143
Train Cycle: 14
Average Loss Per Character: 3.167
Train Cycle: 15
Average Loss Per Character: 3.166
Train Cycle: 16
Average Loss Per Character: 3.219
Train Cycle: 17
Average Loss Per Character: 3.2
Train Cycle: 18
Average Loss Per Character: 3.137
Train Cycle: 19
Average Loss Per Character: 3.152
Train Cycle: 20
Average Loss Per Character: 3.17
Train

In [None]:
import torch
from torchtext import datasets
from torch import nn
import random

# Architecture hyperparams
num_chars = 64
LSTM_inp_sz = 256
LSTM_hidden_sz = 512
sample_topk = 3
sample_freq = 50
special_char = [" ", "<", ">", "/", "\"", "\'", ":", ";", ".", "(", ")", "!"]

# learning rate
lr = 1e-3

# Epochs to train model for (each epoch loops through the corpus once)
epochs = 50

# Number of characters to sample from model each testing cycle
sample_length = 250

# Convert character to index
def char_index(x):
  if ord(x) < 91 and ord(x) > 64:
    return ord(x) - 65
  if ord(x) < 123 and ord(x) > 96:
    return ord(x) - 71
  return special_char.index(x) + 52

# Filter characters in input data for relevant characters
def keep_char(x):
  return (ord(x) < 91 and ord(x) > 64) or (ord(x) < 123 and ord(x) > 96) or x in special_char

# Convert index to character
def index_char(ind):
  if ind < 26:
    return chr(ind + 65)
  if ind < 52:
    return chr(ind + 71)
  return special_char[ind - 52]

class LSTM(nn.Module):
  def __init__(self, inp_sz, hidden_sz):
    super().__init__()

    self.embed = nn.Embedding(num_chars, inp_sz)

    self.f = nn.Sequential(
        nn.Linear(inp_sz + hidden_sz, hidden_sz),
        nn.Sigmoid()
    )

    self.i1 = nn.Sequential(
        nn.Linear(inp_sz + hidden_sz, hidden_sz),
        nn.Sigmoid()
    )

    self.i2 = nn.Sequential(
        nn.Linear(inp_sz + hidden_sz, hidden_sz),
        nn.Tanh()
    )

    self.o = nn.Sequential(
        nn.Linear(inp_sz + hidden_sz, hidden_sz),
        nn.Sigmoid()
    )

    self.out = nn.Sequential(
        nn.Linear(hidden_sz, num_chars),
        nn.Softmax(dim = 0)
    )

    self.dropout = nn.Dropout(0.1)

  def forward(self, inp, hidden, cell):
    new_cell = cell

    inp = self.dropout(self.embed(inp))
    xh = torch.cat((inp, hidden))
    new_cell = new_cell * self.f(xh)
    new_cell = new_cell + (self.i1(xh) * self.i2(xh))
    new_hidden = torch.tanh(new_cell) * self.o(xh)
    new_out = self.out(new_hidden)

    return new_out, new_hidden, new_cell

class StackedLSTM(nn.Module):
  def __init__(self, layer_num, inp_sz, hidden_sz):
    super().__init__()


# Training data (IMDB database in torchtext)
training_dataloader = iter(datasets.IMDB(split="train", root = "data"))

# Check if GPU is available
device = "cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu"

model = LSTM(LSTM_inp_sz, LSTM_hidden_sz).to(device)
opt = torch.optim.Adam(model.parameters(), lr = lr)
criterion = nn.NLLLoss()

# Train model
def training_cycle():
  model.train()

  train_cycle_num = 1

  while True:
    try:
      print(f"Train Cycle: {train_cycle_num}")

      train_cycle_num += 1
      string = ''.join(filter(keep_char, next(training_dataloader)[1]))

      hidden = torch.zeros(LSTM_hidden_sz).to(device)
      cell = torch.zeros(LSTM_hidden_sz).to(device)
      loss = 0

      for i in range(0, len(string) - 1):
        out, hidden, cell = model(torch.tensor(char_index(string[i])).to(device), hidden, cell)

        # print(index_char(torch.argmax(out).item()), end = "")
        loss += criterion(torch.log(out), torch.tensor(char_index(string[i+1])).to(device))

      loss = loss / (len(string) - 1)

      model.zero_grad()
      loss.backward()

      torch.nn.utils.clip_grad_norm_(model.parameters(), 5)
      opt.step()

      print(f"Loss Per Character: {loss}")

      if (train_cycle_num % sample_freq == 0):
        test_cycle()

    except StopIteration:
      break

# Sample from model to test progress every once in a while
def test_cycle():
  model.eval()
  init_char = random.randint(0, num_chars - 1)

  hidden = torch.zeros(LSTM_hidden_sz).to(device)
  cell = torch.zeros(LSTM_hidden_sz).to(device)

  for i in range(sample_length):
    print(index_char(init_char), end = "")

    out, hidden, cell = model(torch.tensor(init_char).to(device), hidden, cell)
    top_chars = torch.topk(out, sample_topk)
    init_char = top_chars[1][list(torch.utils.data.WeightedRandomSampler(nn.functional.softmax(top_chars[0], dim = 0), 1))[0]].item()

# Training cycles
for epoch in range(epochs):
  print(f"Epoch {epoch}")
  training_cycle()

  test_cycle()

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
Loss Per Character: 1.5031567811965942
Train Cycle: 830
Loss Per Character: 1.6725573539733887
Train Cycle: 831
Loss Per Character: 1.4136106967926025
Train Cycle: 832
Loss Per Character: 1.5947377681732178
Train Cycle: 833
Loss Per Character: 1.5626455545425415
Train Cycle: 834
Loss Per Character: 1.403511643409729
Train Cycle: 835
Loss Per Character: 1.4312916994094849
Train Cycle: 836
Loss Per Character: 1.583768367767334
Train Cycle: 837
Loss Per Character: 1.8262192010879517
Train Cycle: 838
Loss Per Character: 1.5295615196228027
Train Cycle: 839
Loss Per Character: 1.366904854774475
Train Cycle: 840
Loss Per Character: 1.5089192390441895
Train Cycle: 841
Loss Per Character: 1.4516977071762085
Train Cycle: 842
Loss Per Character: 1.4279417991638184
Train Cycle: 843
Loss Per Character: 1.792280912399292
Train Cycle: 844
Loss Per Character: 1.84310781955719
Train Cycle: 845
Loss Per Character: 1.5804636478424072
Train 

KeyboardInterrupt: ignored

In [None]:
# Stacked LSTM implementation
import torch
from torchtext import datasets
from torch import nn
import random

# Architecture hyperparams
num_chars = 64
LSTM_sz = 512
LSTM_layer_n = 3
sample_topk = 3
sample_freq = 50
special_char = [" ", "<", ">", "/", "\"", "\'", ":", ";", ".", "(", ")", "!"]

# learning rate
lr = 3e-4

# Epochs to train model for (each epoch loops through the corpus once)
epochs = 50

# Number of characters to sample from model each testing cycle
sample_length = 250

# Convert character to index
def char_index(x):
  if ord(x) < 91 and ord(x) > 64:
    return ord(x) - 65
  if ord(x) < 123 and ord(x) > 96:
    return ord(x) - 71
  return special_char.index(x) + 52

# Filter characters in input data for relevant characters
def keep_char(x):
  return (ord(x) < 91 and ord(x) > 64) or (ord(x) < 123 and ord(x) > 96) or x in special_char

# Convert index to character
def index_char(ind):
  if ind < 26:
    return chr(ind + 65)
  if ind < 52:
    return chr(ind + 71)
  return special_char[ind - 52]

class LSTM(nn.Module):
  def __init__(self, sz):
    super().__init__()

    self.f = nn.Sequential(
        nn.Linear(2 * sz, sz),
        nn.Sigmoid()
    )

    self.i1 = nn.Sequential(
        nn.Linear(2 * sz, sz),
        nn.Sigmoid()
    )

    self.i2 = nn.Sequential(
        nn.Linear(2 * sz, sz),
        nn.Tanh()
    )

    self.o = nn.Sequential(
        nn.Linear(2 * sz, sz),
        nn.Sigmoid()
    )

    self.dropout = nn.Dropout(0.4)

  def forward(self, inp, hidden, cell):
    new_cell = cell

    inp = self.dropout(inp)
    xh = torch.cat((inp, hidden))
    new_cell = new_cell * self.f(xh)
    new_cell = new_cell + (self.i1(xh) * self.i2(xh))
    new_hidden = torch.tanh(new_cell) * self.o(xh)

    return new_out, new_hidden, new_cell

class StackedLSTM(nn.Module):
  def __init__(self, layer_num, inp_sz, LSTM_sz):
    super().__init__()
    self.layers = layer_num

    self.embed = nn.Embedding(inp_sz, LSTM_sz)
    self.LSTM_layers = [LSTM(LSTM_sz) for _ in range(layer_num)]
    self.output = nn.Linear(LSTM_sz, inp_sz)

  def forward(self, inp, hidden_states, cell_states):
    out = self.embed(inp)
    new_hidden = hidden_states
    new_cell = cell_states

    for i in range(self.layers):
      out, new_hidden[i], new_cell[i] = self.LSTM_layers[i](out, new_hidden[i], new_cell[i])

    out = self.output(out)

    return out, new_hidden, new_cell

# Training data (IMDB database in torchtext)
training_dataloader = iter(datasets.IMDB(split="train", root = "data"))

# Check if GPU is available
device = "cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu"

model = StackedLSTM(LSTM_layer_n, num_chars, LSTM_hidden_sz).to(device)
opt = torch.optim.Adam(model.parameters(), lr = lr)
criterion = nn.NLLLoss()

# Train model
def training_cycle():
  model.train()

  train_cycle_num = 1

  while True:
    try:
      print(f"Train Cycle: {train_cycle_num}")

      train_cycle_num += 1
      string = ''.join(filter(keep_char, next(training_dataloader)[1]))

      hidden = torch.zeros((LSTM_layer_n, LSTM_sz)).to(device)
      cell = torch.zeros((LSTM_layer_n, LSTM_sz)).to(device)
      loss = 0

      for i in range(0, len(string) - 1):
        out, hidden, cell = model(torch.tensor(char_index(string[i])).to(device), hidden, cell)

        loss += criterion(torch.log(out), torch.tensor(char_index(string[i+1])).to(device))

      loss = loss / (len(string) - 1)

      model.zero_grad()
      loss.backward()

      torch.nn.utils.clip_grad_norm_(model.parameters(), 3)
      opt.step()

      print(f"Loss Per Character: {loss}")

      if (train_cycle_num % sample_freq == 0):
        test_cycle()

    except StopIteration:
      break

# Sample from model to test progress every once in a while
def test_cycle():
  model.eval()
  init_char = random.randint(0, num_chars - 1)

  hidden = torch.zeros((LSTM_layer_n, LSTM_sz)).to(device)
  cell = torch.zeros((LSTM_layer_n, LSTM_sz)).to(device)

  for i in range(sample_length):
    print(index_char(init_char), end = "")

    out, hidden, cell = model(torch.tensor(init_char).to(device), hidden, cell)
    top_chars = torch.topk(out, sample_topk)
    init_char = top_chars[1][list(torch.utils.data.WeightedRandomSampler(nn.functional.softmax(top_chars[0], dim = 0), 1))[0]].item()

# Training cycles
for epoch in range(epochs):
  print(f"Epoch {epoch}")
  training_cycle()

  test_cycle()

In [None]:
from torch import nn
a = torch.tensor([1, 2, 3])

print(nn.Softmax(a))

Softmax(dim=tensor([1, 2, 3]))


In [None]:
# Experimental test

import torch
from torchtext import datasets
import math
import numpy as np
import random
from time import sleep
from torch import nn

# Main LSTM architecture definition
class LSTM(nn.Module):
  # Initialize state
  def __init__(self, inp_sz, hidden_sz, out_sz):
    super().__init__()
    self.inp_sz = inp_sz
    self.hidden_sz = hidden_sz
    self.cell_sz = hidden_sz
    self.output_sz = out_sz

    self.W_f = torch.nn.Parameter(torch.Tensor(self.inp_sz + self.hidden_sz, self.cell_sz))
    self.b_f = torch.nn.Parameter(torch.Tensor(self.cell_sz))

    self.W_ii = torch.nn.Parameter(torch.Tensor(self.inp_sz + self.hidden_sz, self.cell_sz))
    self.b_ii = torch.nn.Parameter(torch.Tensor(self.cell_sz))
    self.W_iC = torch.nn.Parameter(torch.Tensor(self.inp_sz + self.hidden_sz, self.cell_sz))
    self.b_iC = torch.nn.Parameter(torch.Tensor(self.cell_sz))

    self.W_o = torch.nn.Parameter(torch.Tensor(self.inp_sz + self.hidden_sz, self.cell_sz))
    self.b_o = torch.nn.Parameter(torch.Tensor(self.cell_sz))

    self.W_out = torch.nn.Parameter(torch.Tensor(self.cell_sz, self.output_sz))
    self.b_out = torch.nn.Parameter(torch.Tensor(self.output_sz))

    self.init_weights()

  # Initialize weights in LSTM according to Xavier Initialization
  def init_weights(self):
    for weight in self.parameters():
      weight.data.uniform_(-1.0/math.sqrt(self.inp_sz / 6.0), 1.0/math.sqrt(self.inp_sz / 6.0))

  # Forward Propagation Logic
  def forward(self, x, h, c):
    new_cell = c
    new_hidden = h

    hidden_inp = torch.cat((x, h))

    new_cell = new_cell * torch.sigmoid(hidden_inp @ self.W_f + self.b_f)
    new_cell = new_cell + (torch.sigmoid(hidden_inp @ self.W_ii + self.b_ii) * torch.tanh(hidden_inp @ self.W_iC + self.b_iC))

    new_hidden = torch.sigmoid(hidden_inp @ self.W_o + self.b_o) * torch.tanh(new_cell)

    new_output = torch.tanh(new_hidden @ self.W_out + self.b_out)

    return new_output, new_hidden, new_cell

# Filter characters in input data for relevant characters (alphabetic and spaces)
def keep_char(x):
  return x.isalpha() or x.isspace()

# Convert character to index in one-hot vector representation of characters
def char_index(x):
  if ord(x) < 90 and ord(x) != 32:
    return ord(x) - 65
  if ord(x) < 122 and ord(x) != 32:
    return ord(x) - 71
  return 52

# Convert character vector
def onehot_to_char(x):
  ind = np.argmax(x)

  if ind < 26:
    return chr(ind + 65)
  if ind != 52:
    return chr(ind + 71)
  return chr(32)

# Training data (IMDB database in torchtext)
training_dataloader = iter(datasets.IMDB(split="train", root = "data"))

# Epochs to train model for (each epoch loops through the corpus once)
epochs = 50

# Number of characters to sample from model each testing cycle
sample_length = 250

# Move model to GPU if available
device = "cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu"

# Model, optimizer, and loss function instances
LSTM_model_layer1 = LSTM(53, 512, 256).to(device)
LSTM_model_layer2 = LSTM(256, 512, 256).to(device)
LSTM_model_layer3 = LSTM(256, 512, 53).to(device)
optimizer = torch.optim.Adam(list(LSTM_model_layer1.parameters()) + list(LSTM_model_layer2.parameters()) + list(LSTM_model_layer3.parameters()))
loss = nn.CrossEntropyLoss()

# Train model
def training_cycle():
  LSTM_model_layer1.train()
  LSTM_model_layer2.train()
  LSTM_model_layer3.train()

  train_cycle_num = 1

  while True:
    try:

      print(f"Train Cycle: {train_cycle_num}")
      train_cycle_num += 1

      string = ''.join(filter(keep_char,next(training_dataloader)[1]))

      string_onehot = []

      ind = 0
      for char in string:
        ind += 1

        onehot_char = torch.zeros(53)
        # onehot_char[char_index(char)] = 1.0
        onehot_char[ind%2] = 1.0

        string_onehot.append(onehot_char)

      for i in range(1, len(string_onehot)):
        if i % 100 == 1:
          model_loss = 0
          model_hidden_states_l1 = []
          model_cell_states_l1 = []
          model_hidden_states_l2 = []
          model_cell_states_l2 = []
          model_hidden_states_l3 = []
          model_cell_states_l3 = []

          model_hidden_states_l1.append(torch.zeros(512))
          model_cell_states_l1.append(torch.zeros(512))
          model_hidden_states_l2.append(torch.zeros(512))
          model_cell_states_l2.append(torch.zeros(512))
          model_hidden_states_l3.append(torch.zeros(512))
          model_cell_states_l3.append(torch.zeros(512))

        if i % 100 < 75:
          model_output_l1, model_temp_hidden_state_l1, model_temp_cell_state_l1 = LSTM_model_layer1(string_onehot[i-1] * 2 - 1, model_hidden_states_l1[-1], model_cell_states_l1[-1])
          model_output_l2, model_temp_hidden_state_l2, model_temp_cell_state_l2 = LSTM_model_layer2(model_output_l1, model_hidden_states_l2[-1], model_cell_states_l2[-1])
          model_output_l3, model_temp_hidden_state_l3, model_temp_cell_state_l3 = LSTM_model_layer3(model_output_l2, model_hidden_states_l3[-1], model_cell_states_l3[-1])
        else:
          model_output_l1, model_temp_hidden_state_l1, model_temp_cell_state_l1 = LSTM_model_layer1(pred_char, model_hidden_states_l1[-1], model_cell_states_l1[-1])
          model_output_l2, model_temp_hidden_state_l2, model_temp_cell_state_l2 = LSTM_model_layer2(model_output_l1, model_hidden_states_l2[-1], model_cell_states_l2[-1])
          model_output_l3, model_temp_hidden_state_l3, model_temp_cell_state_l3 = LSTM_model_layer3(model_output_l2, model_hidden_states_l3[-1], model_cell_states_l3[-1])

          # print(onehot_to_char(string_onehot[i]), end = "")
          print(onehot_to_char(model_output_l3.detach()), end = "")
          #print(string_onehot[i])
          model_loss += loss((model_output_l3 +1) / 2, string_onehot[i])

        model_hidden_states_l1.append(model_temp_hidden_state_l1)
        model_cell_states_l1.append(model_temp_cell_state_l1)
        model_hidden_states_l2.append(model_temp_hidden_state_l2)
        model_cell_states_l2.append(model_temp_cell_state_l2)
        model_hidden_states_l3.append(model_temp_hidden_state_l3)
        model_cell_states_l3.append(model_temp_cell_state_l3)

        pred_char = model_output_l3

        if i % 100 == 0:
          print(f"Average Loss Per Char: {model_loss.item() / 50}")
          LSTM_model_layer1.zero_grad()
          LSTM_model_layer2.zero_grad()
          LSTM_model_layer3.zero_grad()
          model_loss.backward()

          torch.nn.utils.clip_grad_norm_(LSTM_model_layer1.parameters(), 1)
          torch.nn.utils.clip_grad_norm_(LSTM_model_layer2.parameters(), 1)
          torch.nn.utils.clip_grad_norm_(LSTM_model_layer3.parameters(), 1)
          optimizer.step()

      # Sample from model
      if train_cycle_num % 1 == 0:
        test_cycle()

    except StopIteration:
      break

# Sample from model to test progress every once in a while
def test_cycle():
  LSTM_model_layer1.eval()
  LSTM_model_layer2.eval()
  LSTM_model_layer3.eval()

  print("Test Cycle")
  char_dist = torch.ones(53)

  for i in range(1, 250):
    prev_char = torch.zeros(53)
    prev_char[list(torch.utils.data.WeightedRandomSampler(char_dist, 1))[0]] = 1.0

    model_hidden_states_l1 = []
    model_cell_states_l1 = []
    model_hidden_states_l2 = []
    model_cell_states_l2 = []
    model_hidden_states_l3 = []
    model_cell_states_l3 = []

    model_hidden_states_l1.append(torch.zeros(512))
    model_cell_states_l1.append(torch.zeros(512))
    model_hidden_states_l2.append(torch.zeros(512))
    model_cell_states_l2.append(torch.zeros(512))
    model_hidden_states_l3.append(torch.zeros(512))
    model_cell_states_l3.append(torch.zeros(512))

    model_output_l1, model_temp_hidden_state_l1, model_temp_cell_state_l1 = LSTM_model_layer1(prev_char * 2 - 1, model_hidden_states_l1[-1], model_cell_states_l1[-1])
    model_output_l2, model_temp_hidden_state_l2, model_temp_cell_state_l2 = LSTM_model_layer2(model_output_l1, model_hidden_states_l2[-1], model_cell_states_l2[-1])
    model_output_l3, model_temp_hidden_state_l3, model_temp_cell_state_l3 = LSTM_model_layer3(model_output_l2, model_hidden_states_l3[-1], model_cell_states_l3[-1])

    model_hidden_states_l1.append(model_temp_hidden_state_l1)
    model_cell_states_l1.append(model_temp_cell_state_l1)
    model_hidden_states_l2.append(model_temp_hidden_state_l2)
    model_cell_states_l2.append(model_temp_cell_state_l2)
    model_hidden_states_l3.append(model_temp_hidden_state_l3)
    model_cell_states_l3.append(model_temp_cell_state_l3)

    char_dist = torch.nn.functional.sigmoid(model_output_l3)
    print(onehot_to_char(model_output_l3.detach()), end = "")

# Training cycles
for epoch in range(epochs):
  print(f"Epoch {epoch}")
  training_cycle()

Epoch 0
Train Cycle: 1
JJPuuuuuddUdddRRR      UUAverage Loss Per Char: 1.9559210205078126
B BBiBBBBXXXXXXXXXllkkAkuAverage Loss Per Char: 1.8833927917480469
dddddinnnRRRRRRRRrccJeeeeAverage Loss Per Char: 1.9804585266113282
iiiiiiiiiwwwTTTTTTTTTTHHiAverage Loss Per Char: 1.8996270751953126
BBBBCCVwVBBgggggiiss  MERAverage Loss Per Char: 1.908892364501953
peeeepppiiBBBBBBSSSSssOOOAverage Loss Per Char: 1.85684326171875
eeeeeeBJBBBBBBCCCBBBBBBBiAverage Loss Per Char: 1.84258056640625
lSSSSSSSddddSSSSSSASSSSQAAverage Loss Per Char: 1.8156915283203126
JJJBBBBBBBBBBBBAAAAAAAAAAAverage Loss Per Char: 1.777398681640625
eeeeeeeeeeeeeeBBBBAAAAAA Average Loss Per Char: 1.7998463439941406
yyyyyyyyyKGGKeeeAAAAAAAAAAverage Loss Per Char: 1.7867172241210938
yyBBBBBBBBBBwAAAAABBBBQQBAverage Loss Per Char: 1.7754226684570313
eeAAAAAAAAAABBBBBBBAAABBBAverage Loss Per Char: 1.7643565368652343
AAmmmmmBBBBAAAAAAAAAAAAAAAverage Loss Per Char: 1.7601129150390624
AAAAAAAAAAABBBBBBBBBAAAAAAverage Loss Per Cha

KeyboardInterrupt: ignored

In [None]:
# Experimental test

import torch
from torchtext import datasets
import math
import numpy as np
import random
from time import sleep
from torch import nn

# Main LSTM architecture definition
class LSTM(nn.Module):
  # Initialize state
  def __init__(self, inp_sz, hidden_sz, out_sz):
    super().__init__()
    self.inp_sz = inp_sz
    self.hidden_sz = hidden_sz
    self.cell_sz = hidden_sz
    self.output_sz = out_sz

    self.W_f = torch.nn.Parameter(torch.Tensor(self.inp_sz + self.hidden_sz, self.cell_sz))
    self.b_f = torch.nn.Parameter(torch.Tensor(self.cell_sz))

    self.W_ii = torch.nn.Parameter(torch.Tensor(self.inp_sz + self.hidden_sz, self.cell_sz))
    self.b_ii = torch.nn.Parameter(torch.Tensor(self.cell_sz))
    self.W_iC = torch.nn.Parameter(torch.Tensor(self.inp_sz + self.hidden_sz, self.cell_sz))
    self.b_iC = torch.nn.Parameter(torch.Tensor(self.cell_sz))

    self.W_o = torch.nn.Parameter(torch.Tensor(self.inp_sz + self.hidden_sz, self.cell_sz))
    self.b_o = torch.nn.Parameter(torch.Tensor(self.cell_sz))

    self.W_out = torch.nn.Parameter(torch.Tensor(self.cell_sz, self.output_sz))
    self.b_out = torch.nn.Parameter(torch.Tensor(self.output_sz))

    self.init_weights()

  # Initialize weights in LSTM according to Xavier Initialization
  def init_weights(self):
    for weight in self.parameters():
      weight.data.uniform_(-1.0/math.sqrt(self.inp_sz / 6.0), 1.0/math.sqrt(self.inp_sz / 6.0))

  # Forward Propagation Logic
  def forward(self, x, h, c):
    new_cell = c
    new_hidden = h

    hidden_inp = torch.cat((x, h))

    new_cell = new_cell * torch.sigmoid(hidden_inp @ self.W_f + self.b_f)
    new_cell = new_cell + (torch.sigmoid(hidden_inp @ self.W_ii + self.b_ii) * torch.tanh(hidden_inp @ self.W_iC + self.b_iC))

    new_hidden = torch.sigmoid(hidden_inp @ self.W_o + self.b_o) * torch.tanh(new_cell)

    new_output = torch.tanh(new_hidden @ self.W_out + self.b_out)

    return new_output, new_hidden, new_cell

# Filter characters in input data for relevant characters (alphabetic and spaces)
def keep_char(x):
  return x.isalpha() or x.isspace()

# Convert character to index in one-hot vector representation of characters
def char_index(x):
  if ord(x) < 90 and ord(x) != 32:
    return ord(x) - 65
  if ord(x) < 122 and ord(x) != 32:
    return ord(x) - 71
  return 52

# Convert character vector
def onehot_to_char(x):
  ind = np.argmax(x)

  if ind < 26:
    return chr(ind + 65)
  if ind != 52:
    return chr(ind + 71)
  return chr(32)

# Training data (IMDB database in torchtext)
training_dataloader = iter(datasets.IMDB(split="train", root = "data"))

# Epochs to train model for (each epoch loops through the corpus once)
epochs = 50

# Number of characters to sample from model each testing cycle
sample_length = 250

# Move model to GPU if available
device = "cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu"

# Model, optimizer, and loss function instances
LSTM_model_layer1 = LSTM(53, 512, 53).to(device)
optimizer = torch.optim.Adam(LSTM_model_layer1.parameters())
loss = nn.CrossEntropyLoss()

# Train model
def training_cycle():
  LSTM_model_layer1.train()

  train_cycle_num = 1

  while True:
    try:

      print(f"Train Cycle: {train_cycle_num}")
      train_cycle_num += 1

      string = ''.join(filter(keep_char,next(training_dataloader)[1]))

      string_onehot = []

      ind = 0
      for char in string:
        ind += 1

        onehot_char = torch.zeros(53)
        # onehot_char[char_index(char)] = 1.0
        onehot_char[ind%2] = 1.0

        string_onehot.append(onehot_char)

      for i in range(1, len(string_onehot)):
        if i % 100 == 1:
          model_loss = 0
          model_hidden_states_l1 = []
          model_cell_states_l1 = []

          model_hidden_states_l1.append(torch.zeros(512))
          model_cell_states_l1.append(torch.zeros(512))

        if i % 100 < 75:
          model_output_l1, model_temp_hidden_state_l1, model_temp_cell_state_l1 = LSTM_model_layer1(string_onehot[i-1] * 2 - 1, model_hidden_states_l1[-1], model_cell_states_l2[-1])
          model_output_l2, model_temp_hidden_state_l2, model_temp_cell_state_l2 = LSTM_model_layer2(model_output_l1, model_hidden_states_l2[-1], model_cell_states_l2[-1])
          model_output_l3, model_temp_hidden_state_l3, model_temp_cell_state_l3 = LSTM_model_layer3(model_output_l2, model_hidden_states_l3[-1], model_cell_states_l3[-1])
        else:
          model_output_l1, model_temp_hidden_state_l1, model_temp_cell_state_l1 = LSTM_model_layer1(pred_char, model_hidden_states_l1[-1], model_cell_states_l2[-1])
          model_output_l2, model_temp_hidden_state_l2, model_temp_cell_state_l2 = LSTM_model_layer2(model_output_l1, model_hidden_states_l2[-1], model_cell_states_l2[-1])
          model_output_l3, model_temp_hidden_state_l3, model_temp_cell_state_l3 = LSTM_model_layer3(model_output_l2, model_hidden_states_l3[-1], model_cell_states_l3[-1])

          # print(onehot_to_char(string_onehot[i]), end = "")
          print(onehot_to_char(model_output_l3.detach()), end = "")
          #print(string_onehot[i])
          model_loss += loss((model_output_l3 +1) / 2, string_onehot[i])

        model_hidden_states_l1.append(model_temp_hidden_state_l1)
        model_cell_states_l1.append(model_temp_cell_state_l1)
        model_hidden_states_l2.append(model_temp_hidden_state_l2)
        model_cell_states_l2.append(model_temp_cell_state_l2)
        model_hidden_states_l3.append(model_temp_hidden_state_l3)
        model_cell_states_l3.append(model_temp_cell_state_l3)

        pred_char = model_output_l3

        if i % 100 == 0:
          print(f"Average Loss Per Char: {model_loss.item() / 50}")
          LSTM_model_layer1.zero_grad()
          LSTM_model_layer2.zero_grad()
          LSTM_model_layer3.zero_grad()
          model_loss.backward()

          torch.nn.utils.clip_grad_norm_(LSTM_model_layer1.parameters(), 1)
          torch.nn.utils.clip_grad_norm_(LSTM_model_layer2.parameters(), 1)
          torch.nn.utils.clip_grad_norm_(LSTM_model_layer3.parameters(), 1)
          optimizer.step()

      # Sample from model
      if train_cycle_num % 1 == 0:
        test_cycle()

    except StopIteration:
      break

# Sample from model to test progress every once in a while
def test_cycle():
  LSTM_model_layer1.eval()
  LSTM_model_layer2.eval()
  LSTM_model_layer3.eval()

  print("Test Cycle")
  char_dist = torch.ones(53)

  for i in range(1, 250):
    prev_char = torch.zeros(53)
    prev_char[list(torch.utils.data.WeightedRandomSampler(char_dist, 1))[0]] = 1.0

    model_hidden_states_l1 = []
    model_cell_states_l1 = []
    model_hidden_states_l2 = []
    model_cell_states_l2 = []
    model_hidden_states_l3 = []
    model_cell_states_l3 = []

    model_hidden_states_l1.append(torch.zeros(512))
    model_cell_states_l1.append(torch.zeros(512))
    model_hidden_states_l2.append(torch.zeros(512))
    model_cell_states_l2.append(torch.zeros(512))
    model_hidden_states_l3.append(torch.zeros(512))
    model_cell_states_l3.append(torch.zeros(512))

    model_output_l1, model_temp_hidden_state_l1, model_temp_cell_state_l1 = LSTM_model_layer1(prev_char * 2 - 1, model_hidden_states_l1[-1], model_cell_states_l2[-1])
    model_output_l2, model_temp_hidden_state_l2, model_temp_cell_state_l2 = LSTM_model_layer2(model_output_l1, model_hidden_states_l2[-1], model_cell_states_l2[-1])
    model_output_l3, model_temp_hidden_state_l3, model_temp_cell_state_l3 = LSTM_model_layer3(model_output_l2, model_hidden_states_l3[-1], model_cell_states_l3[-1])

    model_hidden_states_l1.append(model_temp_hidden_state_l1)
    model_cell_states_l1.append(model_temp_cell_state_l1)
    model_hidden_states_l2.append(model_temp_hidden_state_l2)
    model_cell_states_l2.append(model_temp_cell_state_l2)
    model_hidden_states_l3.append(model_temp_hidden_state_l3)
    model_cell_states_l3.append(model_temp_cell_state_l3)

    char_dist = torch.nn.functional.sigmoid(model_output_l3)
    print(onehot_to_char(model_output_l3.detach()), end = "")

# Training cycles
for epoch in range(epochs):
  print(f"Epoch {epoch}")
  training_cycle()