# Implementing a simple language model

In [None]:
import sys
import math
import time
import random
import urllib.request
import tqdm
import torch
from torch import nn, Tensor
import torch.nn.functional as F
from torch.nn import TransformerEncoder, TransformerEncoderLayer
from IPython.display import clear_output
import pandas as pd
from matplotlib import pyplot as plt

In [None]:
BLUE = "\033[1;34m"
GREEN = "\033[1;32m"
RED = "\033[1;31m"
NORMAL = "\033[1;30m"

## Mapping

In [None]:
url = "https://www.gutenberg.org/cache/epub/174/pg174.txt"
local_filename = "./data.txt"

try:
    with urllib.request.urlopen(url) as response, open(local_filename, 'wb') as out_file:
        data = response.read()
        out_file.write(data)
    print(f"File '{local_filename}' downloaded successfully.")
except Exception as e:
    print(f"Failed to download the file: {str(e)}")


In [None]:
BOOKFILEPATH = "./data.txt"
CONTEXT_SIZE = 256
LEARNING_RATE = 0.01
NUM_EPOCHS = 10

# Read the content of the file
with open(BOOKFILEPATH, 'r') as file:
    content = file.read()

book_size = len(content)
print(f'books size: {book_size}')
# Create a mapping of characters to numerical IDs
char_to_id = {char: idx for idx, char in enumerate(sorted(list(set(content))))}


def encode(text):
    """Encode a string into a list of IDs"""
    return [char_to_id[char] for char in text]


def decode(ids):
    """Decode a list of IDs into a string"""
    return ''.join([list(char_to_id.keys())[list(char_to_id.values()).index(i)] for i in ids])


# New code for sequence generator
def sequence_generator(filepath, context_size):
    """reads the file and returns a generator of (context, target) pairs"""
    buffer = []
    with open(filepath, 'r') as file:
        # generate random offset
        offset = random.randint(0, context_size // 2)
        file.seek(offset)
        for char in file.read():
            buffer += [char]
            if len(buffer) == context_size + 1:
                yield encode(buffer[:-1]), encode(buffer[1:])
                buffer = []


print(f'number of unique characters: {len(char_to_id)}')
print(char_to_id)
assert decode(encode('hello')) == 'hello'
print(f"encoded hello: {encode('hello')}")

## Model preparation

https://pytorch.org/tutorials/beginner/transformer_tutorial.html

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f'about to use device: {device} device')

class TransformerModel(nn.Module):
    """ A Transformer model with residual connections and batch normalization"""
    def __init__(self, ntoken: int, d_model: int, nhead: int, d_hid: int, nlayers: int, dropout: float = 0.5):
        super().__init__()
        self.model_type = 'Transformer'
        self.pos_encoder = PositionalEncoding(d_model, dropout)
        encoder_layers = TransformerEncoderLayer(d_model, nhead, d_hid, dropout)
        self.transformer_encoder = TransformerEncoder(encoder_layers, nlayers)
        self.embedding = nn.Embedding(ntoken, d_model)
        self.d_model = d_model
        self.linear = nn.Linear(d_model, ntoken)

        # Batch normalization layers
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)

        self.init_weights()

    def init_weights(self) -> None:
        initrange = 0.1
        self.embedding.weight.data.uniform_(-initrange, initrange)
        self.linear.bias.data.zero_()
        self.linear.weight.data.uniform_(-initrange, initrange)

    def forward(self, src: Tensor, src_mask: Tensor = None) -> Tensor:
        """
        Arguments:
            src: Tensor, shape [seq_len, batch_size]
            src_mask: Tensor, shape [seq_len, seq_len]

        Returns:
            output Tensor of shape [seq_len, batch_size, ntoken]
        """
        src = self.embedding(src) * math.sqrt(self.d_model)
        src = self.pos_encoder(src)
        
        # Apply residual connection and batch normalization before and after the transformer layer
        src = self.norm1(src + self.transformer_encoder(src, src_mask))
        output = self.norm2(src)
        
        output = self.linear(output)
        return output

class PositionalEncoding(nn.Module):
    """ A simple positional encoding module"""
    def __init__(self, d_model: int, dropout: float = 0.1, max_len: int = CONTEXT_SIZE * 2):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)

        position = torch.arange(max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
        pe = torch.zeros(max_len, 1, d_model)
        pe[:, 0, 0::2] = torch.sin(position * div_term)
        pe[:, 0, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)

    def forward(self, x: Tensor) -> Tensor:
        """
        Arguments:
            x: Tensor, shape ``[seq_len, batch_size, embedding_dim]``
        """
        x = x + self.pe[:x.size(0)]
        return self.dropout(x)


def encode_for_model(text):
    """gets a string and decode it to the form that is acceptable by the model"""
    input_data = encode(text)
    input_data = torch.tensor(input_data)
    input_data = input_data.reshape(-1, 1)
    return input_data


def generate_sequence(model, initial_sequence, new_length, temperature=1.0, top_k=12):
    """Generate a sequence of characters using the given model and an initial text sequence, temperature modifies the randomness of generated text"""
    print(GREEN + initial_sequence + RED, end='')
    sys.stdout.flush()
    model.eval()  # Set the model to evaluation mode
    with torch.no_grad():
        for _ in range(new_length):
            input_data = encode_for_model(initial_sequence)
            # use only the last CONTEXT_SIZE characters as context
            if len(input_data) > CONTEXT_SIZE:
                input_data = input_data[-CONTEXT_SIZE:]
            input_data = input_data.to(device)

            output = model(input_data)
            output = output[-1, :]  # take the last character from the output
            output = output / temperature  # Apply temperature
            # Apply top-k filtering
            mask, _ = torch.topk(output, min(len(char_to_id), top_k), dim=-1)
            # fill with -inf
            output = torch.where(output < mask[:, [-1]], torch.ones_like(output, dtype=output.dtype) * -float('Inf'), output)
            # appky softmax
            output = F.softmax(output, dim=-1)
            predicted_char_id = torch.multinomial(output, 1).item()
            # predicted_char_probs = F.softmax(output[-1, -1, :] / temperature, dim=-1)  # Apply temperature
            # predicted_char_id = torch.multinomial(predicted_char_probs, 1).item()

            new_char = decode([predicted_char_id])
            initial_sequence += new_char
            print(new_char, end='')
            # flush the output to see the progress
            sys.stdout.flush()
    print(NORMAL)
    model.train()  # Set the model back to training mode

## Datset preparation

In [None]:
# prepare dataset
print('preparing dataset...')
sequence_gen = sequence_generator(BOOKFILEPATH, CONTEXT_SIZE)
batch_x = []
batch_y = []
for context, target in tqdm.tqdm(sequence_gen, total=book_size // CONTEXT_SIZE):
    context = torch.tensor(context).reshape(-1, 1)
    target = torch.tensor(target).reshape(-1)
    batch_x.append(context)
    batch_y.append(target)

# stack all the batches
batch_x = torch.stack(batch_x)
batch_y = torch.stack(batch_y)
print(f'batch_x shape: {batch_x.shape}')
print(f'batch_y shape: {batch_y.shape}')
print(f'size of the dataset: {(2 * batch_x.element_size() * batch_x.nelement() / 1024 / 1024):2f} MB')
batch_x, batch_y = batch_x.to(device), batch_y.to(device)

## Training

In [None]:
plt.style.use('ggplot')
def plot_loss(losses : list):
    """Plot the loss curve"""
    plt.figure(figsize=(20, 5))
    # plt.xticks(range(len(losses)), range(len(losses)))
    plt.title('loss')
    plt.xlabel('iteration')
    plt.ylabel('cross entropy loss')
    plt.plot(losses)
    plt.show()

In [None]:
# Initialize the Transformer model
ntokens = len(char_to_id)  # size of vocabulary
emsize = 40  # embedding dimension
d_hid = 80  # dimension of the feedforward network model in ``nn.TransformerEncoder``
nlayers = 40  # number of ``nn.TransformerEncoderLayer`` in ``nn.TransformerEncoder`` -- 40 max
nhead = 40  # number of heads in ``nn.MultiheadAttention`` -- 40 max
dropout = 0.1  # dropout probability


model = TransformerModel(ntokens, emsize, nhead, d_hid, nlayers, dropout).to(device)
losses = []
print_step = len(batch_x) // 100

# if model exists, load it
try:
    model.load_state_dict(torch.load('./model.pt'))
    losses = pd.read_csv('./losses.csv').values.flatten().tolist()
    print('model loaded...')
except FileNotFoundError:
    print('model not found, training from scratch...')
    # print size of the model in MB and number of parameters
    print(f'size of the model: {(sum(p.numel() for p in model.parameters()) * 4 / 1024 / 1024):.2f} MB')
    print(f'number of parameters: {sum(p.numel() for p in model.parameters())}')

# Loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=LEARNING_RATE, momentum=0.3)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 1.0, gamma=0.99, verbose=True)


# training loop
print('training...')
model.train()
for epoch in range(NUM_EPOCHS):
    running_loss = 0
    current_time = time.time()
    # shuffle the dataset
    indices = torch.randperm(len(batch_x))
    batch_x = batch_x[indices]
    batch_y = batch_y[indices]
    for i, (context, target) in enumerate(zip(batch_x, batch_y)):
        optimizer.zero_grad()
        output = model(context).reshape(-1, len(char_to_id))
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
        if i % print_step == 0:
            model.eval()
            if i != 0:
                clear_output(wait=True)
                losses.append(running_loss / print_step)
                # save losses to csv
                pd.DataFrame(losses).to_csv('./losses.csv', index=False)
                plot_loss(losses)
                scheduler.step()
            print(f'progress: {(100 * i / len(batch_x)):.0f}% epoch: {epoch}/{NUM_EPOCHS} loss: {running_loss / print_step}, took: {time.time() - current_time:.2f} seconds')
            current_time = time.time()
            generate_sequence(model, 'As the first rays of sunlight gently illuminate the quiet ro', 30, temperature=1.1)
            generate_sequence(model, 'The old lighthouse stood tall against the rugged coastline, ', 30, temperature=1.0)
            generate_sequence(model, 'As the sun dipped below the horizon, John started painting t', 30, temperature=0.9)
            print()
            running_loss = 0
            # save model
            torch.save(model.state_dict(), './model.pt')
            model.train()