# Homework 4

Name: Stanly Gomes

Student ID: 801118166

GitHub Repository: https://github.com/stanlygomes/RealTimeML

In [30]:
from tqdm import tqdm
import torch
from torch import nn
from torch.nn import functional as F
from torch.utils.data import Dataset, DataLoader, random_split, Subset

import numpy as np
import pandas as pd
import cv2
from matplotlib import pyplot as plt
from ptflops import get_model_complexity_info

import torchvision
from torchvision import transforms, datasets
from torchmetrics.classification import MulticlassAccuracy, BinaryAccuracy
from torchmetrics import ConfusionMatrix
from datetime import datetime

## Import and preprocess TimeMachine.txt Dataset

In [7]:
import urllib.request
import os
import re
import collections

url = 'https://www.gutenberg.org/files/35/old/35.txt'
start_string = 'The Time Machine, by H. G. Wells [1898]\r\n\r\n'

with urllib.request.urlopen(url) as response:
    text = response.read().decode('utf-8')

# Find the index of the start string
start_idx = text.find(start_string)
if start_idx == -1:
    raise ValueError('Start string not found')

# Extract the text after the start string
text = text[start_idx+len(start_string):]

# Remove any remaining metadata at the end of the file
end_string = '*** END OF THIS PROJECT GUTENBERG EBOOK THE TIME MACHINE ***'
end_idx = text.find(end_string)
if end_idx != -1:
    text = text[:end_idx]
    
# Saving unprocessed dataset
if not os.path.exists('timemachine.txt'):
    with open('timemachine.txt', 'w') as f:
        f.write(text)
else:
    print('File exists already!')

# Preprocess the text
lines = [re.sub('[^A-Za-z]+', ' ', line).strip().lower() for line in text.split('\n')]

print(f'# text lines: {len(lines)}')
print(lines[0])
print(lines[10])

File exists already!
# text lines: 3229

lights in the lilies of silver caught the bubbles that flashed and


In [32]:
class TimeMachine(Dataset):
    def _download(self):
        url = 'https://www.gutenberg.org/files/35/old/35.txt'
        start_string = 'The Time Machine, by H. G. Wells [1898]\r\n\r\n'

        with urllib.request.urlopen(url) as response:
            text = response.read().decode('utf-8')

        # Find the index of the start string
        start_idx = text.find(start_string)
        if start_idx == -1:
            raise ValueError('Start string not found')

        # Extract the text after the start string
        text = text[start_idx+len(start_string):]

        # Remove any remaining metadata at the end of the file
        end_string = '*** END OF THIS PROJECT GUTENBERG EBOOK THE TIME MACHINE ***'
        end_idx = text.find(end_string)
        if end_idx != -1:
            text = text[:end_idx]

        # Saving unprocessed dataset
        if not os.path.exists('timemachine.txt'):
            with open('timemachine.txt', 'w') as f:
                f.write(text)
        else:
            print('File exists already!')
        
        return text
data = TimeMachine()
raw_text = data._download()
raw_text[10]

File exists already!


'\n'

In [25]:
def tokenize(lines, token='word'):
    if token == 'word':
        return [token for line in lines for token in line.split()]
    elif token == 'char':
        return [char for line in lines for char in list(line)]
    else:
        print('Error: Unknown token type: ' + token)

tokens = tokenize(lines)
for i in range(11):
    print(tokens[i])

i
the
time
traveller
for
so
it
will
be
convenient
to


In [26]:
def count_corpus(tokens):
    # Here `tokens` is a 1D list or 2D list
    if len(tokens) == 0 or isinstance(tokens[0], list):
        # Flatten a list of token lists into a list of tokens
        tokens = [token for line in tokens for token in line]
    return collections.Counter(tokens)

In [27]:
# Implement vocabulary
class Vocab:
    def __init__(self, tokens=[], min_freq=0, reserved_tokens=[]):
        if tokens and isinstance(tokens[0], list):
            tokens = [token for line in tokens for token in line]
        
        # Count token frequencies
        counter = collections.Counter(tokens)
        self.token_freqs = sorted(counter.items(), key=lambda x: x[1],
                                   reverse=True)
        # List of unique tokens
        self.idx_to_token = list(sorted(set(['<unk>'] + reserved_tokens + [
            token for token, freq in self.token_freqs if freq >= min_freq])))
        self.token_to_idx = {token: idx
                             for idx, token in enumerate(self.idx_to_token)}
        # for token, freq in self._token_freqs:
        #     if freq < min_freq:
        #         break
        #     if token not in self.token_to_idx:
        #         self.idx_to_token.append(token)
        #         self.token_to_idx[token] = len(self.idx_to_token) - 1
                
    def unk(self):
        return self.token_to_idx['<unk>']
    
    def __len__(self):
        return len(self.idx_to_token)
    
    # def __getitem__(self, tokens):
    #     if not isinstance(tokens, (list, tuple)):
    #         return self.token_to_idx(tokens, self.unk)
    #     return [self.__getitem__(token) for token in tokens]
    
    def __getitem__(self, tokens):
        if not isinstance(tokens, (list, tuple)):
            return self.token_to_idx.get(tokens, self.token_to_idx['<unk>'])
        return [self.token_to_idx.get(token, self.token_to_idx['<unk>']) for token in tokens]

    
    def to_tokens(self, indices):
        if hasattr(indices, '__len__') and len(indices) > 1:
            return [self.idx_to_token[int(index)] for index in indices]
        return self.idx_to_token[indices]
    
    # def token_freqs(self):
    #     return self._token_freqs

In [28]:
vocab = Vocab(tokens)
indices = vocab[tokens[:10]]
print('indices:', indices)
print('words:', vocab.to_tokens(indices))

indices: [1995, 4045, 4112, 4171, 1568, 3694, 2185, 4492, 319, 795]
words: ['i', 'the', 'time', 'traveller', 'for', 'so', 'it', 'will', 'be', 'convenient']


words: i
indices: 1995
words: to
indices: 4122


## Problem 1: Train RNN, LSTM, and GRU on varying hyperparameters

In [4]:
# Implement GRU architecture from lecture/D2L textbook
class GRUScratch(nn.Module):
    def __init__(self, num_inputs, hidden_states, sigma=0.01):
        super().__init__()
        
        # Draw the weights from a Gaussian distribution with standard deviation to be sigma, setting bias to 0
        initial_weight = lambda *shape: nn.Parameter(torch.randn(*shape) * sigma)
        triple = lambda: (initial_weight(num_inputs, hidden_states),
                          initial_weight(hidden_states, hidden_states),
                          nn.Parameter(torch.zeroes(hidden_states)))
        
        self.W_xz, self.W_hz, self.b_z = triple()  # Update gate
        self.W_xr, self.W_hr, self.b_r = triple()  # Reset gate
        self.W_xh, self.W_hh, self.b_h = triple()  # Candidate hidden state
    
    def forward(self, inputs, H=None):
        if H is None:
            # Initial state with shape: (batch_size, num_hiddens)
            H = torch.zeros((inputs.shape[1], self.hidden_states),
                          device=inputs.device)
        outputs = []
        for X in inputs:
            Z = torch.sigmoid(torch.matmul(X, self.W_xz) +
                            torch.matmul(H, self.W_hz) + self.b_z)
            R = torch.sigmoid(torch.matmul(X, self.W_xr) +
                            torch.matmul(H, self.W_hr) + self.b_r)
            H_tilde = torch.tanh(torch.matmul(X, self.W_xh) +
                            torch.matmul(R * H, self.W_hh) + self.b_h)
            H = Z * H + (1 - Z) * H_tilde
            outputs.append(H)
        return outputs, H