In [1]:
import math
import os
from tempfile import TemporaryDirectory
from typing import Tuple

In [2]:
import torch
from torch import nn, Tensor
from torch.nn import TransformerEncoder, TransformerEncoderLayer
from torch.utils.data import dataset

In [3]:
# torch.set_default_device('cuda') # sets the default device for everytensor and model to GPU

In [4]:
class TransformerModel(nn.Module):
    # ntokens represents the size of the vocabulary
    def __init__(self, ntoken, d_model, nhead, d_hid, nlayers, dropout = 0.5):
        super().__init__()
        self.model_type = 'Transformer'
        self.pos_encoder = PositionalEncoding(d_model, dropout)
        encoder_layer = TransformerEncoderLayer(d_model, nhead, d_hid, dropout)
        self.transformer_encoder = TransformerEncoder(encoder_layer, nlayers)
        self.embedding = nn.Embedding(ntoken, d_model)
        self.d_model = d_model
        # the final layer will produce the probability distribution over all possible values hence will use ntoken
        self.linear = nn.Linear(d_model, ntoken)
        
        self.init_weights()
        
    def init_weights(self):
        init_range = 0.1
        self.embedding.weight.data.uniform_(-init_range, init_range)
        self.linear.bias.data.zero_()
        self.linear.weight.data.uniform_(-init_range, init_range)
        
    def forward(self, src, src_mask=None):
        src = self.embedding(src) * math.sqrt(self.d_model)
        src = self.pos_encoder(src)
        output = self.transformer_encoder(src, src_mask)
        output = self.linear(output)
        return output

In [5]:
class PositionalEncoding(nn.Module):
    
    def __init__(self, d_model, dropout =  0.1, max_len = 5000):
        super().__init__()
        self.dropout = nn.Dropout(dropout)
        
        position = torch.arange(max_len).unsqueeze(1)
        denm = torch.exp(torch.arange(0, d_model, 2) * -math.log(10000)/d_model)
        
        # positional encoding is applied within the ordering of the feature dimension
        pe = torch.zeros(max_len, 1, d_model)
        pe[:, 0, 0::2] = torch.sin(position * denm)
        pe[:, 0, 1::2] = torch.cos(position * denm)
        
        self.register_buffer('pe', pe)
        
    def forward(self, x):
        x = x + self.pe[:x.size(0)]
        return self.dropout(x)

In [6]:
# loading and batching data
!pip install portalocker
!pip install torchdata

[0m

In [7]:
from torchtext.datasets import WikiText2
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator

train_iter = WikiText2(split='train')
tokenizer = get_tokenizer('basic_english')
vocab = build_vocab_from_iterator(map(tokenizer, train_iter), specials=['<unk>'])
vocab.set_default_index(vocab['<unk>'])

def data_process(raw_text_iter):
    
    data = [torch.tensor(vocab(tokenizer(item)), dtype=torch.long) for item in raw_text_iter]
    # filters out the tensors which are non-zero, makes a tuple of all of them, then 
    # concatenates the elements of the tuple
    return torch.cat(tuple(filter(lambda t: t.numel() > 0, data)))

train_iter, val_iter, test_iter = WikiText2()
train_data = data_process(train_iter)
val_data = data_process(val_iter)
test_data = data_process(test_iter)

# batch the single sequence data into multiple sequences sequences of size bsz for cleaner processing

def batchify(data, bsz):
    seq_len = data.size(0) // bsz
    data = data[:seq_len * bsz]
    data = data.view(bsz, seq_len).contiguous()
    return data


batch_size = 20
eval_size = 10

train_data = batchify(train_data, batch_size)
val_data = batchify(val_data, eval_size)
test_data = batchify(test_data, eval_size)

In [8]:
# the model learns from sequences of length we feed to the model and will be able 
# to generate inferences accordingly

# say if the model is trained to learn from sequences of length 30 then it will be 
# able to learn the pattern for next word through those 30 last words. This value is 
# represented through bptt parameter

In [9]:
bptt = 35

In [10]:
# returns the input, target pairs for the transformer to train on
# the target is nothing but the right shifted input. basically each word position  
# has to learn to predict the next word that will come there given the current word.
def get_batch(source, i):
    sequence_length = min(bptt, len(source) -1 -i) # so that the index of the last 
    # token does not excede the sequence length
    data = source[i:i+sequence_length]
    target = source[i+1:i+sequence_length+1]
    
    return data, target

In [11]:
ntokens = len(vocab)
emsize = 200
d_hid = 200
nlayers = 2
nhead = 2
dropout = 0.2
model = TransformerModel(d_model=emsize, ntoken=ntokens, nhead=nhead, d_hid=d_hid, nlayers=nlayers, dropout=dropout)

In [12]:
import time

criterion = nn.CrossEntropyLoss()
lr = 5.0
optimizer = torch.optim.SGD(model.parameters(), lr=lr)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 1.0, gamma = 0.95)

def train(model):
    model.train()
    total_loss = 0
    log_interval = 200
    start_time = time.time()
    
    num_batches = len(train_data) // bptt
    
    for batch, i in enumerate(range(0, train_data.size(0) - 1, bptt)):
        data, targets = get_batch(train_data, i)
        output = model(data)
        output_flat = output.view(-1, ntokens)
        loss = criterion(output_flat, targets)
        
        optimizer.zero_grad()
        loss.backward()
        torch.utils.clip_grad_norm_(model.parameters(), 0.5)
        optimizer.step()
        
        total_loss += loss.item()
        if batch % log_interval == 0 and batch > 0:
            lr = scheduler.get_last_lr()[0]
            ms_per_batch = (time.time() - start_time) * 1000 / log_interval
            cur_loss = total_loss / log_interval
            ppl = math.exp(cur_loss)
            print(f"| batch {batch} / num_batches {num_batches} | lr : {lr} | ms/batch {ms_per_batch} | loss : {cur_loss} | ppl : {ppl}")
            total_loss = 0
            start_time = time.time()
    
def evaluate(model, eval_data):
    model.eval()
    total_loss = 0
    with torch.no_grad():
        for i in range(0, eval_data.size(0) - 1, bptt):
            data, targets = get_batch(eval_data, i)
            sequence_length = data.size(0)
            output = model(data)
            
            output_flat = output.view(-1, ntokens)
            loss = criterion(output_flat, targets)
            
            total_loss += loss
        
    return total_loss / (eval_data.size(0) - 1)

            

In [1]:
best_val_loss = 100
epochs = 3

with TemporaryDirectory() as tempdir:
    best_model_path = os.path.join(tempdir, "best_model_params.pt")
    
    for epoch in range(1, epochs+1):
        epoch_start_time = time.time()
        train(model)
        val_loss = evaluate(model, val_data)
        val_ppl = math.exp(val_loss)
        elapsed = time.time() - epoch_start_time
        print("-" * 90)
        print(f"End of the epoch {epoch} | time : {time} | valid loss : {val_loss} | ppl : {val_ppl}")
        print("-" * 90)
        
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save(model.state.dict(), best_model_path)
            
        scheduler.step()
    model.load_state_dict(torch.load(best_model_path))

NameError: name 'TemporaryDirectory' is not defined