In [1]:
GPT_CONFIG_124M = {
    "vocab_size": 50257,
    "context_length": 1024,
    "emb_dim": 768,
    "n_heads": 12,
    "n_layers": 12,
    "drop_rate": 0.1,
    "qkv_bias": False
}

In [2]:
import torch
import torch.nn as nn

class DummyGPTModel(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.tok_emb = nn.Embedding(config['vocab_size'], config['emb_dim'])
        self.pos_emb = nn.Embedding(config['context_length'], config['emb_dim'])
        self.drop_emb = nn.Dropout(config['drop_rate'])
        
        self.transformer_blocks = nn.Sequential(
            *[DummyTransformerBlock(config) for _ in range(config['n_layers'])]
        )
        self.final_norm = DummyLayerNorm(config['emb_dim'])
        self.out_head = nn.Linear(config['emb_dim'], config['vocab_size'], bias=False)
        
    def forward(self, in_idx):
        batch_size, seq_len = in_idx.shape
        tok_embeds = self.tok_emb(in_idx)
        pos_embeds = self.pos_emb(torch.arange(seq_len, device=in_idx.device)) # organiza em posicoes 0, 1, 2, 3 etc
        x = tok_embeds + pos_embeds
        x = self.drop_emb(x)
        x = self.transformer_blocks(x)
        x = self.final_norm(x)
        logits = self.out_head(x)   # (num_tokens, vocab_size)
        return logits

class DummyTransformerBlock(nn.Module):
    def __init__(self, config):
        super().__init__()
    
    def forward(self, x):
        return x

class DummyLayerNorm(nn.Module):
    def __init__(self, normalized_shape, eps=1e-5):
        super().__init__()
    
    def forward(self, x):
        return x

In [3]:
import tiktoken

tokenizer = tiktoken.get_encoding("gpt2")
batch = []
txt1 = "Every effort moves you"
txt2 = "Every day holds a"
batch.append(torch.tensor(tokenizer.encode(txt1)))
batch.append(torch.tensor(tokenizer.encode(txt2)))
batch = torch.stack(batch, dim=0)
print(batch)

tensor([[6109, 3626, 6100,  345],
        [6109, 1110, 6622,  257]])


In [4]:
torch.manual_seed(123)
model = DummyGPTModel(GPT_CONFIG_124M)
logits = model(batch)
print("Output shape: ", logits.shape)
print(logits)

Output shape:  torch.Size([2, 4, 50257])
tensor([[[-0.9289,  0.2748, -0.7557,  ..., -1.6070,  0.2702, -0.5888],
         [-0.4476,  0.1726,  0.5354,  ..., -0.3932,  1.5285,  0.8557],
         [ 0.5680,  1.6053, -0.2155,  ...,  1.1624,  0.1380,  0.7425],
         [ 0.0447,  2.4787, -0.8843,  ...,  1.3219, -0.0864, -0.5856]],

        [[-1.5474, -0.0542, -1.0571,  ..., -1.8061, -0.4494, -0.6747],
         [-0.8422,  0.8243, -0.1098,  ..., -0.1434,  0.2079,  1.2046],
         [ 0.1355,  1.1858, -0.1453,  ...,  0.0869, -0.1590,  0.1552],
         [ 0.1666, -0.8138,  0.2307,  ...,  2.5035, -0.3055, -0.3083]]],
       grad_fn=<UnsafeViewBackward0>)


In [5]:
torch.manual_seed(123)
batch_example = torch.randn(2, 5)
layer = nn.Sequential(nn.Linear(5, 6), nn.ReLU())
out = layer(batch_example)
print(out)

tensor([[0.2260, 0.3470, 0.0000, 0.2216, 0.0000, 0.0000],
        [0.2133, 0.2394, 0.0000, 0.5198, 0.3297, 0.0000]],
       grad_fn=<ReluBackward0>)


In [6]:
# Se nao colocasse keepdim, o tensor de média seria um vetor de 2 dimensões ([x1, x2]) ao invés de uma matriz 2x1
mean = out.mean(dim=-1, keepdim=True) # dim = -1 para pegar a média por coluna
var = out.var(dim=-1, keepdim=True)
print("Mean: \n", mean)
print("Variance: \n", var)

Mean: 
 tensor([[0.1324],
        [0.2170]], grad_fn=<MeanBackward1>)
Variance: 
 tensor([[0.0231],
        [0.0398]], grad_fn=<VarBackward0>)


In [7]:
out_norm = (out - mean) / torch.sqrt(var) # media zero e variancia 1 para evitar gradient vanishing / exploding
mean = out_norm.mean(dim=-1, keepdim=True)
var = out_norm.var(dim=-1, keepdim=True)
print("Normalized layer outputs: \n", out_norm)
print("Mean: \n", mean)
print("Variance: \n", var)

Normalized layer outputs: 
 tensor([[ 0.6159,  1.4126, -0.8719,  0.5872, -0.8719, -0.8719],
        [-0.0189,  0.1121, -1.0876,  1.5173,  0.5647, -1.0876]],
       grad_fn=<DivBackward0>)
Mean: 
 tensor([[9.9341e-09],
        [1.9868e-08]], grad_fn=<MeanBackward1>)
Variance: 
 tensor([[1.0000],
        [1.0000]], grad_fn=<VarBackward0>)


In [8]:
torch.set_printoptions(sci_mode=False)
print("Mean: \n", mean)
print("Variance: \n", var)

Mean: 
 tensor([[    0.0000],
        [    0.0000]], grad_fn=<MeanBackward1>)
Variance: 
 tensor([[1.0000],
        [1.0000]], grad_fn=<VarBackward0>)


In [9]:
class LayerNorm(nn.Module):
    def __init__(self, emb_dim):
        super().__init__()
        self.eps = 1e-5 # pequena constante para prevenir divisão por zero durante normalização
        
        # scale e shift são parametros treinaveis c mesma dimensao que o input
        # que o llm ajusta automaticamente durante o treino se melhorar performance
        self.scale = nn.Parameter(torch.ones(emb_dim))
        self.shift = nn.Parameter(torch.zeros(emb_dim))
    
    def forward(self, x):
        mean = x.mean(dim=-1, keepdim=True)
        var = x.var(dim=-1, keepdim=True, unbiased=False)
        norm_x = (x - mean) / torch.sqrt(var + self.eps)
        return self.scale * norm_x + self.shift

In [10]:
ln = LayerNorm(emb_dim=5)
out_ln = ln(batch_example)
mean = out_ln.mean(dim=-1, keepdim=True)
var = out_ln.var(dim=-1, keepdim=True, unbiased=False)
print("Mean: \n", mean)
print("Variance: \n", var)

Mean: 
 tensor([[    -0.0000],
        [     0.0000]], grad_fn=<MeanBackward1>)
Variance: 
 tensor([[1.0000],
        [1.0000]], grad_fn=<VarBackward0>)


In [11]:
class GELU(nn.Module):
    def __init__(self):
        super().__init__()
    
    def forward(self, x):
        return 0.5 * x * (1 + torch.tanh(
            torch.sqrt(torch.tensor(2.0 / torch.pi)) *
            (x + 0.044715 * torch.pow(x, 3))
        ))

In [12]:
class FeedForward(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.layers = nn.Sequential(
            # Dimensões da camada linear são: dimensão do input, dimensão do output
            nn.Linear(config['emb_dim'], 4 * config['emb_dim']), # 768 dim de input e 4 * 768 de output
            GELU(), # ativação
            nn.Linear(4 * config['emb_dim'], config['emb_dim']),
        )
    
    def forward(self, x):
        return self.layers(x)

In [13]:
ffn = FeedForward(GPT_CONFIG_124M)
x = torch.rand(2, 3, 768) # input com dois batches
out = ffn(x)
print(out.shape)

torch.Size([2, 3, 768])


In [14]:
class ExampleDeepNeuralNetwork(nn.Module):
    def __init__(self, layer_sizes, use_shortcut):
        super().__init__()
        self.use_shortcut = use_shortcut
        self.layers = nn.ModuleList([
            # nn.Linear tem (dim input, dim output), dim input e output sao o numero de neuronios
            nn.Sequential(nn.Linear(layer_sizes[0], layer_sizes[1]), GELU()),
            nn.Sequential(nn.Linear(layer_sizes[1], layer_sizes[2]), GELU()),
            nn.Sequential(nn.Linear(layer_sizes[2], layer_sizes[3]), GELU()),
            nn.Sequential(nn.Linear(layer_sizes[3], layer_sizes[4]), GELU()),
            nn.Sequential(nn.Linear(layer_sizes[4], layer_sizes[5]), GELU())
        ])
    
    def forward(self, x):

        for layer in self.layers:
            layer_output = layer(x)
            
            if self.use_shortcut and x.shape == layer_output.shape:
                x = x + layer_output
            else:
                x = layer_output
        return x

In [15]:
layer_sizes = [3, 3, 3, 3, 3, 1] # 5 camadas com 3 neuronios e a final tem 1
sample_input = torch.tensor([[1., 0., -1.]])
torch.manual_seed(123)
model_without_shortcut = ExampleDeepNeuralNetwork(layer_sizes, use_shortcut=False)

In [16]:
def print_gradients(model, x):
    output = model(x)
    target = torch.tensor([[0.]])
    
    loss = nn.MSELoss()
    loss = loss(output, target)
    
    # backward pass para calcular gradientes
    loss.backward()
    
    for name, param in model.named_parameters():
        if 'weight' in name:
            print(f"{name} tem media de gradiente {param.grad.abs().mean().item()}")

In [17]:
print_gradients(model_without_shortcut, sample_input) # demonstra problema do vanishing gradient

layers.0.0.weight tem media de gradiente 0.00020173587836325169
layers.1.0.weight tem media de gradiente 0.00012011159560643137
layers.2.0.weight tem media de gradiente 0.0007152040489017963
layers.3.0.weight tem media de gradiente 0.0013988736318424344
layers.4.0.weight tem media de gradiente 0.005049645435065031


In [18]:
torch.manual_seed(123)
model_with_shortcut = ExampleDeepNeuralNetwork(layer_sizes, use_shortcut=True)
print_gradients(model_with_shortcut, sample_input)

layers.0.0.weight tem media de gradiente 0.22169798612594604
layers.1.0.weight tem media de gradiente 0.20694111287593842
layers.2.0.weight tem media de gradiente 0.3289700150489807
layers.3.0.weight tem media de gradiente 0.26657330989837646
layers.4.0.weight tem media de gradiente 1.3258544206619263


In [19]:
class MultiHeadAttention(torch.nn.Module):
    
    def __init__(self, d_in, d_out, context_length, dropout, num_heads, qkv_bias=False):
        super().__init__()
        assert (d_out % num_heads == 0), \
            "d_out deve ser divisível por num_heads"
    
        self.d_out = d_out
        self.num_heads = num_heads
        self.head_dim = d_out // num_heads
        
        self.W_query = torch.nn.Linear(d_in, d_out, bias=qkv_bias)
        self.W_key = torch.nn.Linear(d_in, d_out, bias=qkv_bias)
        self.W_value = torch.nn.Linear(d_in, d_out, bias=qkv_bias)
        self.out_proj = torch.nn.Linear(d_out, d_out) # camada linear para combinar outputs de cada head
        self.dropout = torch.nn.Dropout(dropout)
        self.register_buffer(
            "mask",
            torch.triu(torch.ones(context_length, context_length), diagonal=1),
        )
    
    def forward(self, x):
        b, num_tokens, d_in = x.shape
        
        keys = self.W_key(x)
        values = self.W_value(x)
        queries = self.W_query(x)
        
        # dividindo matrizes implicitamente ao adicionar dimensão num_heads
        keys = keys.view(b, num_tokens, self.num_heads, self.head_dim)
        values = values.view(b, num_tokens, self.num_heads, self.head_dim)
        queries = queries.view(b, num_tokens, self.num_heads, self.head_dim)
        
        # transpose: (b, num_tokens, num_heads, head_dim) -> (b, num_heads, num_tokens, head_dim)
        # agrupando por head, cada uma vai ter uma matriz de attention score
        # inicialmente agrupava por número de tokens, mas não é eficiente para calcularmos attention scores para cada cabeça paralelamente
        keys = keys.transpose(1, 2)
        queries = queries.transpose(1, 2)
        values = values.transpose(1, 2)
        
        # computando attention scores
        # após a multiplicação, fica com dimensões (b, num_heads, num_tokens, num_tokens), e simboliza o valor de atenção que precisa dar p cada palavra
        attn_scores = queries @ keys.transpose(2, 3) # o que é importante é num_tokens e head_dim para calcularmos produto escalar
        
        mask_bool = self.mask.bool()[:num_tokens, :num_tokens]
        
        attn_scores.masked_fill_(mask_bool, -torch.inf)
        
        attn_weights = torch.softmax(attn_scores / keys.shape[-1]**0.5, dim=-1)
        attn_weights = self.dropout(attn_weights)
        
        # attention weights tem dimensões (b, num_heads, num_tokens, num_tokens)
        # values tem dimensões (b, num_heads, num_tokens, head_dim)
        # Attention weights e values tem a mesma dimensão como linhas, então é possível multiplicar
        # dimensão final: (b, num_heads, num_tokens, head_dim)
        # com isso, transpomos a matriz para termos (b, num_tokens, num_heads, head_dim) para podermos dar merge no num_heads e head_dim
        context_vector = (attn_weights @ values).transpose(1, 2)
        
        # combinando heads, self_d_out = self.num_heads * self_head_dim
        # Fica com dimensão (b, num_tokens, d_out)
        # contiguous() serve para colocar as matrizes no mesmo bloco de memória para fazermos o merge
        context_vector = context_vector.contiguous().view(b, num_tokens, self.d_out)
        context_vector = self.out_proj(context_vector) # projeção opcional
        
        return context_vector

In [20]:
class TransformerBlock(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.att = MultiHeadAttention(
            d_in = config['emb_dim'],
            d_out = config['emb_dim'],
            context_length = config['context_length'],
            num_heads = config['n_heads'],
            dropout = config['drop_rate'],
            qkv_bias = config['qkv_bias']
        )
        self.ff = FeedForward(config)
        self.norm1 = LayerNorm(config['emb_dim'])
        self.norm2 = LayerNorm(config['emb_dim'])
        self.drop_shortcut = nn.Dropout(config['drop_rate'])
        
    def forward(self, x):
        # shortcut connection para attention block
        shortcut = x
        x = self.norm1(x)
        x = self.att(x)
        x = self.drop_shortcut(x)
        x = x + shortcut
        
        # shortcut connection para feed forward block
        shortcut = x
        x = self.norm2(x)
        x = self.ff(x)
        x = self.drop_shortcut(x)
        x = x + shortcut
        
        return x

In [21]:
torch.manual_seed(123)
x = torch.rand(2, 4, 768)
block = TransformerBlock(GPT_CONFIG_124M)
output = block(x)
print("Input shape: ", x.shape)
print("Output shape: ", output.shape)

Input shape:  torch.Size([2, 4, 768])
Output shape:  torch.Size([2, 4, 768])


In [22]:
import torch
import torch.nn as nn

class GPTModel(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.tok_emb = nn.Embedding(config['vocab_size'], config['emb_dim'])
        self.pos_emb = nn.Embedding(config['context_length'], config['emb_dim'])
        self.drop_emb = nn.Dropout(config['drop_rate'])
        
        self.transformer_blocks = nn.Sequential(
            *[TransformerBlock(config) for _ in range(config['n_layers'])]
        )
        self.final_norm = LayerNorm(config['emb_dim'])
        self.out_head = nn.Linear(config['emb_dim'], config['vocab_size'], bias=False)
        
    def forward(self, in_idx):
        batch_size, seq_len = in_idx.shape
        tok_embeds = self.tok_emb(in_idx)
        pos_embeds = self.pos_emb(torch.arange(seq_len, device=in_idx.device)) # organiza em posicoes 0, 1, 2, 3 etc
        x = tok_embeds + pos_embeds
        x = self.drop_emb(x)
        x = self.transformer_blocks(x)
        x = self.final_norm(x)
        logits = self.out_head(x)   # (num_tokens, vocab_size)
        return logits

In [23]:
torch.manual_seed(123)
model = GPTModel(GPT_CONFIG_124M)
out = model(batch)
print("Input batch:\n", batch)
print("Output shape: ", out.shape)
print(out)

Input batch:
 tensor([[6109, 3626, 6100,  345],
        [6109, 1110, 6622,  257]])
Output shape:  torch.Size([2, 4, 50257])
tensor([[[ 0.1381,  0.0077, -0.1963,  ..., -0.0222, -0.1060,  0.1717],
         [ 0.3865, -0.8408, -0.6564,  ..., -0.5163,  0.2369, -0.3357],
         [ 0.6989, -0.1829, -0.1631,  ...,  0.1472, -0.6504, -0.0056],
         [-0.4290,  0.1669, -0.1258,  ...,  1.1579,  0.5303, -0.5549]],

        [[ 0.1094, -0.2894, -0.1467,  ..., -0.0557,  0.2911, -0.2824],
         [ 0.0882, -0.3552, -0.3527,  ...,  1.2930,  0.0053,  0.1898],
         [ 0.6091,  0.4702, -0.4094,  ...,  0.7688,  0.3787, -0.1974],
         [-0.0612, -0.0737,  0.4751,  ...,  1.2463, -0.3834,  0.0609]]],
       grad_fn=<UnsafeViewBackward0>)


In [24]:
total_params = sum(p.numel() for p in model.parameters())
print(f"Número total de parâmetros: {total_params:,}") # tem mais parâmetros pois utiliza token embeddings na camada de output

Número total de parâmetros: 163,009,536


In [25]:
total_params_gpt2 = total_params - sum(p.numel() for p in model.out_head.parameters())
print(f"Número de parâmetros treináveis considerando weight tying: {total_params_gpt2:,}")

Número de parâmetros treináveis considerando weight tying: 124,412,160


In [26]:
total_size_bytes = total_params * 4 # cada parâmetro ocupando 4 bytes
total_size_mb = total_size_bytes / (1024 * 1024)
print(f"Tamanho total do modelo: {total_size_mb:.2f} MB")

Tamanho total do modelo: 621.83 MB


In [27]:
def generate_text_simple(model, idx, max_new_tokens, context_size):
    
    # idx é o vetor de índices no contexto atual (batch, n_tokens)
    
    for _ in range(max_new_tokens):
        # cortar contexto atual se exceder o context size
        # se suporta 5 tokens e o context size é 10
        # só os últimos 5 tokens são usados como contexto
        idx_condition = idx[:, -context_size:]
        
        with torch.no_grad():
            logits = model(idx_condition)
        
        # foca apenas na ultima linha de cada batch e as junta
        # (batch, n_tokens, vocab_size) -> (batch, vocab_size)
        logits = logits[:, -1, :]
        
        probs = torch.softmax(logits, dim=-1) # aplica softmax nas colunas
        idx_next = torch.argmax(probs, dim=-1, keepdim=True) # pega index da entrada com maior probabilidade
        
        # juntar index com maior probabilidade ao input
        idx = torch.cat((idx, idx_next), dim=1) # (batch, n_tokens + 1)
        
    return idx
        

In [28]:
start_context = "Hello, I am"
encoded = tokenizer.encode(start_context)
print("encoded:", encoded)
encoded_tensor = torch.tensor(encoded).unsqueeze(0)
print("encoded tensor shape:", encoded_tensor.shape)

encoded: [15496, 11, 314, 716]
encoded tensor shape: torch.Size([1, 4])


In [31]:
model.eval() # bypass nas camadas de dropout, normalization pois não estamos treinando
out = generate_text_simple(
    model=model, 
    idx=encoded_tensor, 
    max_new_tokens=6, 
    context_size=GPT_CONFIG_124M["context_length"]
)
print("out:", out) # tamanho 10 pois tinhamos 4 de entrada e 6 como numero maximo

out: tensor([[15496,    11,   314,   716, 27018, 24086, 47843, 30961, 42348,  7267]])


In [34]:
decoded_text = tokenizer.decode(out.squeeze(0).tolist())
print(decoded_text)

Hello, I am Featureiman Byeswickattribute argue
