In [7]:
import torch
from torch import nn
from torch.nn import functional as F
from einops import rearrange

In [1]:
with open('cleanmarco.txt', 'r', encoding='utf-8') as f: 
    text = f.read()
print(text[:100])

Primavera
Funghi in città 

Il vento, venendo in città da lontano, le porta doni inconsueti, di cui 


## Tokenisation

In [None]:
#
print(sorted(set(text)))
vocab_size = len(sorted(set(text)))

['\n', ' ', '!', '(', ')', ',', '-', '.', ':', ';', '>', '?', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', '\\', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', '«', '»', 'È', 'à', 'è', 'é', 'ì', 'ò', 'ù', '’']


In [3]:
encode_dict = {l:n for n,l in enumerate(sorted(set(text)))}
decode_dict = {n:l for n,l in enumerate(sorted(set(text)))}

In [5]:
encode = lambda x: [encode_dict[l] for l in x]
decode = lambda x: "".join([decode_dict[n] for n in x])
print(encode ("i am a friendly tokeniser"))
print(decode(encode ("i am a friendly tokeniser")))


[43, 1, 35, 47, 1, 35, 1, 40, 52, 43, 39, 48, 38, 46, 59, 1, 54, 49, 45, 39, 48, 43, 53, 39, 52]
i am a friendly tokeniser


In [8]:
#WORK WITH TOKENISED DATA ON PYTORCH
data = torch.tensor(encode(text))
#train val split
train_data = data[:int(0.9*len(data))]
val_data = data[int(0.9*len(data)):]

## Useful functions in train loop

In [9]:
#DATALOADER
batch_size = 32
block_size = 64

def dataloader(split_type):
    #select correct split
    data = train_data if split_type == 'train' else val_data if split_type == 'val' else None

    #sample batch_size sentences from data
    sample_locs = torch.randint(0, (len(data) - block_size), (batch_size,))  
    input = torch.stack([data[loc:loc+block_size] for loc in sample_locs])
    output = torch.stack([data[loc+1:loc+block_size+1] for loc in sample_locs])

    return input, output
xtest, ytest = dataloader('train')

In [10]:
#LOSS ESTIMATION FOR TRAIN-VAL
#key point: loop over many examples to reduce noise in estimate of the loss
@torch.no_grad()   #deactivates autograd
def loss_estimation(n_iter_estimation):
    final_losses = []
    myGPT.eval()   #deactivates dropout
    for split_type in ['train', 'val']:
        split_loss = torch.zeros(n_iter_estimation)

        for i in range(n_iter_estimation):
            x, y = dataloader(split_type)
            _, loss = myGPT(x, y)
            split_loss[i] = loss.item()

        final_losses.append(split_loss.mean())

    myGPT.train()   #activates dropout (and batchnorm but not using it)

    return final_losses


## GPT Architecture

In [None]:
#MULTI HEAD ATTENTION
# i want to define it as a child class to have the "good" nn.Module stuff carry over
class MultiheadCausalAttention(nn.Module):
    """
    - takes input of Batch x seq_len x dim_embed 
    and produces output of same dim.
    - each head is processed in parallel by considering it as an extra axis
    - does not include residual connections or layernorm.
    . dropout on attention matrix and output.
    """
    def __init__(self, dim_embed, block_size, n_heads, dropout):
        super().__init__()
        self.dim_embed = dim_embed
        self.get_qkv = nn.Linear(dim_embed, 3*dim_embed, bias=False)  # setting dim_head x n_heads = dim_embed
        self.output = nn.Linear(dim_embed, dim_embed, bias=False)
        self.dim_head = dim_embed // n_heads
        self.n_heads = n_heads
        self.register_buffer('mask', torch.tril(torch.ones(block_size, block_size)))  # elements above the diagonal are zero (diagonal excluded)
        self.dropout_att = nn.Dropout(dropout)
        self.dropout_output = nn.Dropout(dropout)

    def forward(self, x):

        x = self.get_qkv(x)

        x = rearrange(x, 'b t (he emb) -> b he t emb', he = self.n_heads)  #divide into each head

        q, k, v = x.tensor_split(3, dim=-1)   #split on last axis cause we expanded on last axis -> emb // n_heads

        A = (q @ k.transpose(-2, -1))/ self.dim_head**0.5  #have to divide by sqrt of size of each head embedding, output is b x he x t x t

        A = A.masked_fill(self.mask == 0, float('-inf'))  #masking to set "future" tokens attention scores equal to -inf (-> zero after softmax)

        A = F.softmax(A, dim = -1)  #normalise on rows

        A = self.dropout_att(A)   #reg1

        x = A @ v  #exchange info between tokens with attention matrix

        x = rearrange(x, ' b he t emb -> b t (he emb)', he = self.n_heads)  #concatenate heads

        x = self.output(x)  #output projection

        x = self.dropout_output(x)  #reg2

        return x


class MLPblock(nn.Module):
    def __init__(self, dim_embed, dropout):
        super().__init__()
        self.l1 = nn.Linear(dim_embed, 4*dim_embed)
        self.nl = nn.ReLU()
        self.l2 = nn.Linear(4*dim_embed, dim_embed)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        x = self.l1(x)
        x = self.nl(x)
        x = self.l2(x)
        x = self.dropout(x) #added dropout reg

        return x


class Block(nn.Module):
    """
    transformer block with residual connections and pre-norm. using standard Layernorm implementation from pytorch
    using dropout in the attention weights, after attention mechanism and after MLP (already included in init blocks)
    """
    def __init__(self, dim_embed, dim_seq, n_heads, dropout):
        super().__init__()
        self.attention_block = MultiheadCausalAttention(dim_embed, dim_seq, n_heads, dropout)
        self.MLP = MLPblock(dim_embed, dropout)
        self.LN1 = nn.LayerNorm(dim_embed)  #layernorm only acts on last dim, otherwise there would be info leakage between tokens
        self.LN2 = nn.LayerNorm(dim_embed)

    def forward(self, x):
        x = x + self.attention_block(self.LN1(x))
        x = x + self.MLP(self.LN2(x))

        return x

In [12]:
class GPT(nn.Module):
    """
    - using lookup table for positional embedding
    """
    def __init__(self, vocab_size, dim_embed, dim_seq, n_heads, n_blocks, dropout):
        super().__init__()
        self.vsize = vocab_size
        self.dim_embed = dim_embed
        self.dim_seq = dim_seq
        self.n_heads = n_heads
        self.dropout = dropout
        self.max_context = dim_seq  #used in generation step to crop context window, set the same as the seq size during training (design choice)
        self.tok_embedding = nn.Embedding(self.vsize, self.dim_embed)
        self.pos_embedding = nn.Embedding(self.dim_seq, self.dim_embed)
        self.blocks = nn.ModuleList([Block(dim_embed=self.dim_embed, dim_seq=self.dim_seq, n_heads=self.n_heads, dropout = self.dropout) for _ in range(n_blocks)])
        self.final_LN = nn.LayerNorm(self.dim_embed)
        self.unembed = nn.Linear(self.dim_embed, self.vsize)   #final linear layer to get right output dimension (vocab_size) of logits. similar to lookup table in bigrams


    def forward(self, x, targets = None):
        """
        returns logits for next token, shape batch x block_size x vocab_size 
        """
        tok_emb = self.tok_embedding(x) #expands last axis from dim = 1 (one token index) to a vector of dim_embed
        pos_emb = self.pos_embedding(torch.arange(self.dim_seq)) # gives t x emb (think of t like a batch dimension for the embedding layer)
        x = tok_emb +pos_emb    #final embedding. broadcasting same pos_emb on each element of the batch
        #go through transformer blocks
        for block in self.blocks:
            x = block(x)
        #apply final layernorm
        x = self.final_LN(x)
        #unembed
        logits = self.unembed(x)

        if targets == None:
            loss = None
        else:
            #format shapes to match for cross entropy loss (need batch x vocab x ...)
            logits_ce = rearrange(logits, 'b t emb -> (b t) emb')
            targets_ce = rearrange(targets, 'b t -> (b t)')

            loss = F.cross_entropy(logits_ce, targets_ce)
        
        return logits, loss
    
    def generate(self, x, max_tokens):
        #loop until max tokens
        for _ in range(max_tokens):
            x_context = x[:, -self.max_context:] #crop sequence to use as context, only last block_size tokens
            logits, loss = self.forward(x_context)
            #take last token and turn it into prob
            logits_final_tk = logits[:, -1, :]  # batch x 1 x vocab_dim
            probs = F.softmax(logits_final_tk, dim = 1) #normalises on vocab dim
            x_pred = torch.multinomial(probs, num_samples=1) # batch x 1
            x = torch.concat((x, x_pred), dim =1)

        return x

## Training

In [20]:
#training initialisation and params
n_iters = 2000
n_estimation_wait = 200
n_iter_estimation = 50
learning_rate = 3e-4
dropout = 0.2 #regularisation. using the usual value ive seen

myGPT = GPT(vocab_size=vocab_size, dim_embed=128, dim_seq=block_size, n_heads=4, n_blocks=2, dropout=dropout)
optimiser = torch.optim.AdamW(myGPT.parameters(), lr=learning_rate)


In [25]:
#simple training loop
for n in range(n_iters):
    if n % n_estimation_wait == 0:
        est_losses = loss_estimation(n_iter_estimation)
        print("train loss is", est_losses[0], "val loss is", est_losses[1])
    
    xbatch, ybatch = dataloader('train')
    logits, loss = myGPT(xbatch, ybatch)

    optimiser.zero_grad(set_to_none=True)
    loss.backward()
    optimiser.step()

train loss is tensor(1.6083) val loss is tensor(1.7420)
train loss is tensor(1.6050) val loss is tensor(1.7381)
train loss is tensor(1.5934) val loss is tensor(1.7364)
train loss is tensor(1.5950) val loss is tensor(1.7355)
train loss is tensor(1.5866) val loss is tensor(1.7294)
train loss is tensor(1.5833) val loss is tensor(1.7119)
train loss is tensor(1.5779) val loss is tensor(1.7186)
train loss is tensor(1.5807) val loss is tensor(1.7230)
train loss is tensor(1.5626) val loss is tensor(1.7156)
train loss is tensor(1.5583) val loss is tensor(1.7100)


## Generation

In [26]:
xtest, ytest = dataloader('train')

gen = myGPT.generate(xtest, 500)
for dim in range(gen.shape[0]):
    print(decode(gen[dim].tolist()))

vrebbe dovuto alzarsi, cercare di approdare, chiamare aiuto; ma intato, gli uomozzurnato e ruotto ti quattorgevano era uomini albera; la ragli. Marcovaldo 
barbe le dalla nerivoro, singero dive carrinotta nonvipre ente più chianta e 
trovalda, gliermatestò che d’ullui spettirio voltava del miniozzata cincesmo permaierci, non di sonne sullo lumici delli all’itrofintampira. 

- E voltava, tandò sun’appande bel capotare le vosermiva s’erono - per la magazzionte sonusmarsi un forse fresto e 
quascia aldoc- viscia tra mate e perciò ristedaia e binieterrolvisse, p
un 
giallo d’oro. 

Già da un pezzo, un corteo di motorette e auovo; al piazzato lumavato sula servarettore piogni delle aborché non 
viamentinze oce disquante a tartase giunno volle have. Ma non sardare boclitra di 
pedisero l’altro diceva giozio. 

«C’era L’altro sontranzo della; cha le si corrente di polo. 
Prancolo, la naffassi casceva cuamo tudsa un po’ a cielosò alle 
balbera, davistate mamino guardi, che affra in cortile con