<a href="https://colab.research.google.com/github/zelalemamera-stonybrook/projects-sandbox/blob/main/Recurrent_Neural_Networks.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [92]:
import torch
import torch.nn as nn
import re

In [93]:
with open('/content/charles_dickens_christmas_carol.txt', 'r') as f:
  scroodge_book = f.read()

In [94]:
def clean_sentences(sentence_list):
  '''
  Add SOS - start of sentence and EOS end of sentence markers.
  Add EOSq for end of sentence question
  replace ; : with EOS
  replace ! with ,
  remove - i.e. treat every compound word as one unit
  remove all numbers
  remove \
  remove "
  '''
  cleaned_sentence_list = []
  for sentence in sentence_list:
    filtered = re.sub(r"[;:,]", r"", sentence)
    filtered = re.sub(r"\.", r" eos", filtered)
    filtered = re.sub(r"\?", r" eosq", filtered)
    filtered = re.sub(r"!", r"", filtered)
    filtered = re.sub(r"-", r"", filtered)
    filtered = re.sub(r"\d", r"", filtered)
    filtered = re.sub(r"\\", r"", filtered)
    filtered = re.sub(r"\"", r"", filtered)
    filtered = re.sub(r"\s+", r" ", filtered)
    filtered = filtered.lower()
    cleaned_sentence_list.append(f"sos {filtered}")
  return cleaned_sentence_list



In [95]:
def clean_book_and_tokenize_sentences(book):
  '''
  '''
  list_of_undesirables = ["STAVE I:  MARLEY'S GHOST", "STAVE II:  THE FIRST OF THE THREE SPIRITS", "STAVE III:  THE SECOND OF THE THREE SPIRITS",
                   "A CHRISTMAS CAROL", "Stave IV:  The Last of the Spirits", "Stave V:  The End of It"]
  for string in list_of_undesirables:
    book = re.sub(string, r"", book)
  book = re.sub("\n", r" ", book)
  sentence_list = [". marley was dead."]
  appendage = re.findall(r"\..*?\.", book)
  for appendend in appendage:
    sentence_list.append(appendend)
  for i, sentence in enumerate(sentence_list):
    sentence_list[i] = sentence[1:].strip()
  cleaned_sentence_list = clean_sentences(sentence_list)
  return cleaned_sentence_list


In [96]:
sentence_list = clean_book_and_tokenize_sentences(scroodge_book)

In [97]:
a = [1,2,3, 4, 5, 6]
b = [2,3,4,5,6]
c = [3,4,5,6]
d = zip(a,b,c)
for i in d:
  print(i)

(1, 2, 3)
(2, 3, 4)
(3, 4, 5)
(4, 5, 6)


In [98]:
sentence_list[-20:]

['sos your uncle scrooge eos',
 "sos will you let me in fred eosq let him in it is a mercy he didn't shake his arm off eos",
 'sos nothing could be heartier eos',
 'sos so did topper when he came eos',
 'sos so did every one when they came eos',
 'sos oh he was early there eos',
 'sos and he did it yes he did the clock struck nine eos',
 'sos a quarter past eos',
 'sos he was full eighteen minutes and a half behind his time eos',
 'sos his hat was off before he opened the door his comforter too eos',
 'sos hallo growled scrooge in his accustomed voice as near as he could feign it eos',
 'sos i am behind my time eos',
 'sos yes eos',
 'sos step this way sir if you please eos',
 'sos it shall not be repeated eos',
 "sos  now i'll tell you what my friend said scrooge i am not going to stand this sort of thing any longer eos",
 'sos he had a momentary idea of knocking scrooge down with it holding him and calling to the people in the court for help and a straitwaistcoat eos',
 "sos a merrie

In [99]:
def generate_one_hot_mapping(sentence_list):
  '''
  '''
  words_to_int = {}
  int_to_words = {}
  size = 0
  for sentence in sentence_list:
    words = re.split(r"\s", sentence)
    for word in words:
      if word in words_to_int.keys():
        continue
      else:
        words_to_int[word] = size
        int_to_words[size] = word
        size += 1
  return (int_to_words, words_to_int)


In [100]:
def translate_to_num(sentence_list):
  '''
  '''
  num_dict, word_dict = generate_one_hot_mapping(sentence_list)
  translated_sequences = []
  for sentence in sentence_list:
    words = re.split(r"\s", sentence)
    sequence = []
    for word in words:
      try:
        sequence.append(word_dict[word])
      except Exception:
        continue
    translated_sequences.append(sequence)
  return translated_sequences, num_dict, word_dict

In [101]:
def generate_training_data(sentence_list):
  '''
  '''
  tensor_list = []
  string_sequences, num_dict, word_dict = translate_to_num(sentence_list)
  word_dimension = len(num_dict.keys())
  for sentence in string_sequences:
    zipped = zip(sentence, sentence[1:], sentence[2:])
    for tup in zipped:
      if tup in tensor_list:
        continue
      tensor_list.append(tup)
  matrix_list = []
  for tup in tensor_list:
    tensor = torch.zeros((3, word_dimension))
    for i in range(len(tup)):
      tensor[i,tup[i]] = 1
    matrix_list.append(tensor)
  return matrix_list, num_dict, word_dict

In [102]:
string_matrices, num_dict, word_dict = generate_training_data(sentence_list)

In [103]:
len(string_matrices)

13415

In [104]:
def to_strings(string_matrices, num_dict):
  '''
  '''
  string_list = []
  for matrix in string_matrices:
    word1 = num_dict[torch.argmax(matrix[0,:]).item()]
    word2 = num_dict[torch.argmax(matrix[1,:]).item()]
    word3 = num_dict[torch.argmax(matrix[2,:]).item()]
    string_list.append((word1,word2,word3))
  return string_list

In [105]:
string_list = to_strings(string_matrices, num_dict)

In [106]:
string_list[-30:]

[('a', 'man', 'as'),
 ('man', 'as', 'the'),
 ('as', 'the', 'good'),
 ('the', 'good', 'old'),
 ('good', 'old', 'city'),
 ('old', 'city', 'knew'),
 ('city', 'knew', 'or'),
 ('knew', 'or', 'any'),
 ('or', 'any', 'other'),
 ('any', 'other', 'good'),
 ('other', 'good', 'old'),
 ('old', 'city', 'town'),
 ('city', 'town', 'or'),
 ('town', 'or', 'borough'),
 ('or', 'borough', 'in'),
 ('borough', 'in', 'the'),
 ('in', 'the', 'good'),
 ('good', 'old', 'world'),
 ('old', 'world', 'eos'),
 ('sos', 'his', 'own'),
 ('his', 'own', 'heart'),
 ('own', 'heart', 'laughed'),
 ('heart', 'laughed', 'and'),
 ('laughed', 'and', 'that'),
 ('and', 'that', 'was'),
 ('that', 'was', 'quite'),
 ('was', 'quite', 'enough'),
 ('quite', 'enough', 'for'),
 ('enough', 'for', 'him'),
 ('for', 'him', 'eos')]

In [107]:
class RNN(nn.Module):

  def __init__(self, input_size, hidden_size, output_size):
    super().__init__()
    output_matrix = torch.rand((output_size, hidden_size), requires_grad=True)
    self.output_layer = nn.Parameter(output_matrix, requires_grad=True)
    self.output_layer_bias = nn.Parameter(torch.rand((output_size)), requires_grad=True)
    self.loss = nn.CrossEntropyLoss()
    self.output_size = output_size
    self.input_size = input_size
    self.hidden_size = hidden_size
    self.hidden_layer = nn.RNNCell(input_size, hidden_size)

  def forward(self, input_batch):
    hidden_list = []
    total_time = input_batch.shape[0]
    size_of_batch = input_batch.shape[1]
    h0 = torch.zeros((size_of_batch, self.hidden_size))
    for time_step in input_batch:
      h0 = self.hidden_layer(time_step, h0)
      hidden_list.append(h0)
    final_hidden = hidden_list[-1]
    output = []
    for tensor in final_hidden:
      current = (self.output_layer @ tensor) + self.output_layer_bias
      output.append(current)
    output = torch.vstack(tuple(output))
    return output



In [190]:
def train(model, string_matrices, epochs):
  '''
  '''
  model.train()
  optim = torch.optim.Adam(model.parameters())
  batched = batch_matrices(string_matrices)
  for i, batch in enumerate(batched):
    training = batch[:2]
    target = batch[-1]
    for j in range(epochs):
      print(f"batch: {i}", f"epoch: {j}")
      avg_loss = float('inf')
      maximum_iterations = 1
      last_improved = 0
      accuracy = 0
      while avg_loss > 1 and maximum_iterations < 2 and last_improved < 3:
        optim.zero_grad()
        output = model.forward(training)
        loss = model.loss(output, target)
        loss.backward()
        optim.step()
        avg_loss = loss.item()
        maximum_iterations+=1
        current_accuracy = model_accuracy(model, training, target)
        print(f"batch accuracy: {current_accuracy}")
        if current_accuracy <= accuracy:
          last_improved+=1
        accuracy = current_accuracy


In [189]:
def batch_matrices(string_matrices):
  '''
  '''
  N = 500
  length = len(string_matrices)
  batches = int(length / N)
  remainder = length % N
  batched_matrices = []
  for i in range(batches):
    batch = string_matrices[i * N: (i + 1) * N]
    input_tensor = []
    for t in range(3):
      current_time = []
      for matrix in batch:
        current_time.append(matrix[t,:])
      input_tensor.append(torch.vstack(tuple(current_time)))
    input_tensor = torch.vstack(tuple(input_tensor))
    input_tensor = torch.reshape(input_tensor, (3, N, -1))
    batched_matrices.append(input_tensor)
  remaining_batch = string_matrices[length - remainder: length]
  remainder_tensor = []
  for t in range(3):
    current_time = []
    for matrix in remaining_batch:
      current_time.append(matrix[t,:])
    current_time = torch.vstack(tuple(current_time))
    remainder_tensor.append(current_time)
  remainder_tensor = torch.vstack(tuple(remainder_tensor))
  remainder_tensor = torch.reshape(remainder_tensor, (3, remainder, -1))
  batched_matrices.append(remainder_tensor)
  return batched_matrices


In [140]:
def model_accuracy(model, input_data, target_data):
  '''
  '''
  model.eval()
  softmax = nn.Softmax(dim=1)
  predictions = torch.argmax(softmax(model.forward(input_data)), dim=1)
  target = torch.argmax(target_data, dim=1)
  score = 100 * (torch.sum(torch.eq(predictions, target))).item() / len(target_data)
  return score


In [111]:
vocab_size = len(num_dict.keys())
print(f"vocab size: {vocab_size}")
input_size = vocab_size
hidden_size = 1000
output_size = vocab_size
total_params = hidden_size * input_size + hidden_size * hidden_size + output_size * hidden_size + hidden_size + hidden_size + output_size
print(f"total parameters: {total_params}", f"\napproximate size: {float((total_params * 32) / float(10**9))} giga bytes")

vocab size: 2917
total parameters: 6838917 
approximate size: 0.218845344 giga bytes


In [188]:
rnn = RNN(input_size, hidden_size, output_size)

In [191]:
train(rnn, string_matrices, 20)

batch: 0 epoch: 0
batch accuracy: 7.2
batch: 0 epoch: 1
batch accuracy: 6.0
batch: 0 epoch: 2
batch accuracy: 8.0
batch: 0 epoch: 3
batch accuracy: 15.8
batch: 0 epoch: 4
batch accuracy: 17.0
batch: 0 epoch: 5
batch accuracy: 23.2
batch: 0 epoch: 6
batch accuracy: 30.2
batch: 0 epoch: 7
batch accuracy: 38.8
batch: 0 epoch: 8
batch accuracy: 45.4
batch: 0 epoch: 9
batch accuracy: 46.2
batch: 0 epoch: 10
batch accuracy: 52.6
batch: 0 epoch: 11
batch accuracy: 58.8
batch: 0 epoch: 12
batch accuracy: 68.6
batch: 0 epoch: 13
batch accuracy: 74.6
batch: 0 epoch: 14
batch accuracy: 77.4
batch: 0 epoch: 15
batch accuracy: 81.2
batch: 0 epoch: 16
batch accuracy: 83.8
batch: 0 epoch: 17
batch accuracy: 86.4
batch: 0 epoch: 18
batch accuracy: 89.2
batch: 0 epoch: 19
batch accuracy: 91.2
batch: 1 epoch: 0
batch accuracy: 5.2
batch: 1 epoch: 1
batch accuracy: 5.8
batch: 1 epoch: 2
batch accuracy: 7.6
batch: 1 epoch: 3
batch accuracy: 9.6
batch: 1 epoch: 4
batch accuracy: 11.4
batch: 1 epoch: 5
batc

In [148]:
def predict_next_word(model, sentence, num_dict, word_dict):
  '''
  '''
  filtered = re.sub(r"[;:,]", r"", sentence)
  filtered = re.sub(r"\.", r" eos", filtered)
  filtered = re.sub(r"\?", r" eosq", filtered)
  filtered = re.sub(r"!", r"", filtered)
  filtered = re.sub(r"-", r"", filtered)
  filtered = re.sub(r"\d", r"", filtered)
  filtered = re.sub(r"\\", r"", filtered)
  filtered = re.sub(r"\"", r"", filtered)
  filtered = re.sub(r"\s+", r" ", filtered)
  filtered = filtered.lower()
  token_list = re.split(r"\s", filtered)
  token_list = token_list[-2:]
  tensor = []
  for token in token_list:
    d = word_dict[token]
    t = torch.zeros((len(num_dict.keys()),))
    t[d] = 1
    tensor.append(t)
  tensor = torch.vstack(tuple(tensor))
  tensor = torch.reshape(tensor, (2, 1, len(num_dict.keys())))
  output = model.forward(tensor)
  output = output[0]
  prediction = torch.argmax(output).item()
  return num_dict[prediction]

In [239]:
generate_text(rnn, "marley was ", num_dict, word_dict)

enough
him
calling


In [236]:
def generate_text(model, string, num_dict, word_dict):
  '''
  '''
  token = predict_next_word(model, string, num_dict, word_dict)
  n = 0
  while(token != 'eos' and n < 2):
    print(token)
    token = predict_next_word(model, f"{string} {token} ", num_dict, word_dict)
    n+=1
  print(token)