In [115]:
import numpy as np
from data_rnn import load_ndfa, load_brackets
import torch
import torch.nn as nn
from torch.utils.data import DataLoader

In [112]:
def pad_and_convert_to_tensor_part4(sequences: list, max_len: int, w2i: dict) -> torch.Tensor:
    pad_index = w2i['.pad']
    start_token = w2i['.start']
    end_token = w2i['.end']
    # initialize a matrix of shape (len(sequences), max_len) with the pad_value
    padded_sequences = torch.full((len(sequences), max_len), pad_index, dtype=torch.long)
    # add start and end tokens to the sequences
    for i in padded_sequences:
        i[0] = start_token
        i[-1] = end_token

    # for every sequence
    for i, sequence in enumerate(sequences):
        # get the length of the sequence
        length = min(max_len-2, len(sequence))
        
        padded_sequences[i, 1:length+1] = torch.tensor(sequence[:length], dtype=torch.long)

    return padded_sequences

def convert_to_target(padded_sequences, max_len):
    # create a clone of the entire padded_sequences tensor
    target_sequences = padded_sequences.clone()
    
    for i in range(len(target_sequences)):
        # shift tokens to the left by 1 position for each sequence
        target_sequences[i, :-1] = padded_sequences[i, 1:]
        # append zero at the end for each sequence
        target_sequences[i, -1] = 0
    
    return target_sequences


In [119]:
class AutoRegressive_lstm(nn.Module):
    def __init__(
        self,
        vocab_size: int,
        emb_size: int,
        hidden_size: int,
        num_layers
    ):
        super(AutoRegressive_lstm, self).__init__()
        # layer 1
        self.embedding = nn.Embedding(vocab_size, emb_size)

        # layer 2: LSTM layer
        self.lstm = nn.LSTM(emb_size, hidden_size,num_layers=num_layers,
                             batch_first=True)
        # layer 3: output layer
        self.linear3 = nn.Linear(hidden_size, vocab_size)
        

    def forward(self, x: torch.Tensor):  
        # layer 1
        x = self.embedding(x)  
        # layer 2
        x, _ = self.lstm(x)  
        # layer 3 (output)
        x = self.linear3(x)  
        return x


In [272]:
EPOCHS = 3
BATCH_SIZE = 32
MAX_LEN = 20
#x_train, (i2w, w2i) = load_ndfa(n=150_000)
x_train, (i2w, w2i) = load_brackets(n=150_000)

# pad and convert to tensor
X_train_padded = pad_and_convert_to_tensor_part4(x_train, MAX_LEN, w2i)
y_train = convert_to_target(X_train_padded, MAX_LEN)

vocab_size = len(w2i)
emb_size = 32
hidden_size = 16


model = AutoRegressive_lstm(vocab_size, emb_size, hidden_size, num_layers=1)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

# create data loaders
train_dataset = torch.utils.data.TensorDataset(X_train_padded, y_train)
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)

# train
model.train()
for epoch in range(EPOCHS):
    epoch_loss = 0.0
    for batch, labels in train_loader:
        optimizer.zero_grad()
        
        output = model(batch)
        # Flatten output and target to compute loss per token
        flat_output = output.view(-1, output.shape[-1])
        flat_target = labels.view(-1)
        
        loss = criterion(flat_output, flat_target)
        loss.backward()
        optimizer.step()
        epoch_loss += loss.item()
    print(f"Epoch {epoch+1} Loss: {epoch_loss / len(train_loader):.4f}")


#evaluation 
seq = [w2i['.start'], w2i['('], w2i['('], w2i[')']]
pred = -1
model.eval()
print(f'initial sequence: {[i2w[index] for index in seq]}')

with torch.no_grad():
    while (pred != w2i['.end']) and (len(seq) < MAX_LEN):
        logit_pred = model(torch.tensor(seq))
        pred = torch.argmax(logit_pred[-1])
        seq.append(int(pred))

print(f'generated sequence: f{[i2w[index] for index in seq]}')

Epoch 1 Loss: 0.2374
Epoch 2 Loss: 0.1528
Epoch 3 Loss: 0.1600
initial sequence: ['.start', '(', '(', ')']
generated sequence: f['.start', '(', '(', ')', ')', '.pad', '.pad', '.pad', '.pad', '.pad', '.pad', '.pad', '.pad', '.pad', '.pad', '.pad', '.pad', '.pad', '.pad', '.end']


In [273]:
import torch.distributions as dist
import torch.nn.functional as F

def sample(lnprobs, temperature=1.0):
    """
    Sample an element from a categorical distribution
    :param lnprobs: Outcome logits
    :param temperature: Sampling temperature. 1.0 follows the given
    distribution, 0.0 returns the maximum probability element.
    :return: The index of the sampled element.
    """
    if temperature == 0.0:
        return lnprobs.argmax()
    p = F.softmax(lnprobs / temperature, dim=0)
    cd = dist.Categorical(p)
    return cd.sample()