In [None]:
import matplotlib.pyplot as plt
import torch
import torch.nn.functional as F
from tqdm import tqdm

Comparing:
- character level vs words level

### 1. Creating Dataset

In [None]:
data_file = "../data/data.txt"
with open(data_file, 'r') as f:
    data = f.readlines()
data = [line.strip() for line in data]
data = [line.lower() for line in data]

In [None]:
data[:20]

In [None]:
# dictionnaries
corpus = set(' '.join(data).split(' '))
itoc = {i:c for i, c in enumerate(corpus)}
ctoi = {c:i for i, c in enumerate(corpus)}
num_words = len(corpus)
print(num_words) # first idx is ''

In [None]:
def generarate_dataset(data: [str], context_size: int = 2):
    X, y = [], []
    context = [0] * context_size
    for line in data:
        words = line.split(' ')
        for word in words:
            idx = ctoi[word]
            X.append(context)
            y.append(idx)
            context = context[1:] + [idx]
    X = torch.tensor(X).float()
    y = torch.tensor(y)
    return X, y

In [None]:
context_size = 128
X, y = generarate_dataset(data, context_size)
n1, n2 = int(0.8 * len(X)), int(0.9 * len(X))
X_train, y_train = X[:n1], y[:n1]
X_val, y_val = X[n1:n2], y[n1:n2]
X_test, y_test = X[n2:], y[n2:]

In [None]:
X_train.shape

### 2. Creating Models

In [None]:
class BasicRNNModel(torch.nn.Module):

    def __init__(self, features_in: int, features_out: int, n_hidden: int, num_layers: int, device: torch.device):
        super(BasicRNNModel, self).__init__()
        self.features_in = features_in
        self.n_hidden = n_hidden
        self.features_out = features_out
        self.device = device

        self.l1 = torch.nn.RNN(features_in, hidden_size=n_hidden, num_layers=num_layers, 
            nonlinearity='tanh', bidirectional=False, dropout=0.01).to(self.device)
        self.dropout = torch.nn.Dropout(p=0.1)
        self.l2 = torch.nn.Linear(n_hidden, features_out).to(self.device)

    def forward(self, x):
        self.out, _ = self.l1(x)
        self.out = self.dropout(self.out)
        self.out = self.l2(self.out)
        return self.out

In [None]:
n_hidden = 64

device = torch.device('mps') if torch.backends.mps.is_available() else torch.device('cpu')
model1 = BasicRNNModel(features_in=context_size, features_out=num_words, n_hidden=n_hidden, 
                       num_layers=3, device=device)


In [None]:
# l1 = torch.nn.RNN(context_size, hidden_size=n_hidden, num_layers=3, nonlinearity='tanh', 
#                       bidirectional=False, dropout=0.01)
# l2 = torch.nn.Linear(n_hidden, num_words, bias=False)

In [None]:
class LSTMModel(torch.nn.Module):

    def __init__(self, features_in: int, n_hidden: int, features_out: int, num_layers: int, bias: bool = False, device: torch.device):
        self.features_in = features_in
        self.features_out = features_out 
        self.n_hidden = n_hidden 
        self.num_layers = num_layers
        self.bias = bias

        self.lstm = torch.nn.LSTM(input_size=self.features_in, hidden_size=self.n_hidden, num_layers=self.num_layers, 
                                  bias=self.bias).to(self.device)
        self.dropout = torch.nn.Dropout(p=0.2)
        self.linear = torch.nn.Linear(self.n_hidden, self.features_out).to(self.device)

    def forward(self, x):
        self.out, _ = self.lstm(x)
        self.out = self.dropout(self.out)
        self.out = self.linear(self.out)
        return self.out

In [None]:
# out.shape

In [None]:
# criterion = torch.nn.CrossEntropyLoss()
# criterion(out, y_train[[0]])

### 3. Training Models

In [None]:
num_epochs = 20000
batch_size = 32

def train(model, X, y, verbose: bool = False):
    """
    Params
    ------

    
    Returns
    -------
    lossi: [float]
        
    """
    # optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
    optimizer = torch.optim.RMSProp(model.parameters(), lr=0.01)
    criterion = torch.nn.CrossEntropyLoss()
    losses = []
    
    for i in range(num_epochs):
        # bach size indexing
        idxs = torch.randint(0, len(X), (batch_size, ))
        xb, yb = X[idxs], y[idxs]
    
        # forward pass to make prediction
        outputs = model.forward(xb)
        loss = criterion(outputs, yb)
        losses.append(loss)
    
        # backward pass to compute gradient
        optimizer.zero_grad()
        loss.backward()
    
        # optimizer to update gradients
        optimizer.step()
    
        # print stats
        if verbose and i % 1000 == 0:
            print(f"Epoch {i+1}: {loss}")

    return losses

In [None]:
losses = train(model1, X_train.to(device), y_train.to(device), verbose=True)

### 4. Evaluating the Model

### 5. Sampling from the model

In [None]:
def generate_sample(model: torch.nn.Module, context_size: int, device: torch.device, temperature: float = 1.0):
    context = [0] * context_size
    res = []
    # generator = torch.Generator(device=device).manual_seed(420)
    while True:
        # predict next word
        tensorized_context = torch.tensor([context]).float().to(device)
        output = model1(tensorized_context).div(temperature).exp()
        # idx = torch.multinomial(output, 1, generator=generator).item()
        idx = torch.multinomial(output, 1).item()
        next_word = itoc[idx]
        res.append(next_word)

        # update context
        context = context[1:] + [idx]

        if idx == 0:
            break
    return ' '.join(res)

In [None]:
for i in range(100):
    print(generate_sample(model1, context_size, temperature=0.5, device=device))