<a href="https://colab.research.google.com/github/prabal5ghosh/Deep-Learning-summer-school-2025-university-of-cote-d-Azur/blob/main/Lab_decoder___subject.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

<center><img width=60% src="http://www.i3s.unice.fr/~lingrand/efeliaUnica.png"><br/><br/>
<font size=+3><b>Decoder only</b></font><br/><br/>
<font size=+1>Célia D'Cruz, Diane Lingrand, and Frédéric Precioso<br/><br/>
    2025 - June/July</font><br/>
    <img width=14% src="http://www.i3s.unice.fr/~lingrand/cc-long.png">
    </center>

<div> This notebook introduces the decoder architecture, including its causal multi-head attention mechanism, and shows a method to pretrain it and to generate text.
</div>

## Imports and device

<font color="red">Use a GPU to speed up computations.</font>
If your laptop does not have a GPU, you can use Google Colab or Kaggle.

To enable GPU backend in Google Colab for your notebook:

1.   Runtime (top left corner) -> Change runtime type
2.   Put GPU as "Hardware accelerator"
3.   Save

In [1]:
# all the imports needed in the lab
!pip install bertviz
import bertviz

import torch
from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt
from tqdm import tqdm
from transformers import AutoTokenizer, AutoModel, pipeline
import requests
from collections import Counter

Collecting bertviz
  Downloading bertviz-1.4.1-py3-none-any.whl.metadata (19 kB)
Collecting boto3 (from bertviz)
  Downloading boto3-1.38.42-py3-none-any.whl.metadata (6.6 kB)
Collecting jedi>=0.16 (from IPython>=7.14->bertviz)
  Downloading jedi-0.19.2-py2.py3-none-any.whl.metadata (22 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch>=1.0->bertviz)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch>=1.0->bertviz)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch>=1.0->bertviz)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch>=1.0->bertviz)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.

Check that your GPU is recognized by running the code below:

In [2]:
# making the code device agnostic

if torch.cuda.is_available():
    device = torch.device("cuda")
elif torch.backends.mps.is_available():
    device = torch.device("mps")
else:
    device = torch.device("cpu")
device

device(type='cpu')

# GPT2 decoder model

In this lab, we will use the pretrained [GPT2 decoder model](https://huggingface.co/openai-community/gpt2).

Note the large vocabulary size of the GPT2 tokenizer.

In [3]:
tokenizer = AutoTokenizer.from_pretrained("openai-community/gpt2")
print(f"tokenizer vocab size = {tokenizer.vocab_size}")

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/26.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/665 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/1.04M [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.36M [00:00<?, ?B/s]

tokenizer vocab size = 50257


We load the GPT2 decoder model.

In [4]:
GPT2_model = AutoModel.from_pretrained("openai-community/gpt2", output_attentions = True)

model.safetensors:   0%|          | 0.00/548M [00:00<?, ?B/s]

We retrieve a few configuration elements of the GPT2 tokenizer and model

In [5]:
config_model = {
    "vocab_size" : tokenizer.vocab_size, # vocabulary size
    "emb_dim" : GPT2_model.config.n_embd, # dimension of the token embeddings
    "context_length" : GPT2_model.config.n_positions, # maximum length of sequences
}

print(config_model)

{'vocab_size': 50257, 'emb_dim': 768, 'context_length': 1024}


# Causal Multihead Attention

## Self Attention

In the previous lab, we implemented a simple version of the self-attention. Here, we add keys, queries, values, and a scaling factor.

\begin{equation}
Attention(Q, K, V) = \mathrm{softmax}(\frac{Q*K^{T}}{dim^{0.5}}) * V
\end{equation}

The Query matrix Q, the Key matrix K, and the Value matrix V are linear transformations of the input embedding.

Taking as input an embedding, compute the Q, K and V matrices, and implement the self-attention mechanism following the aforementioned formula.

- Hint 1: linear transformations can be done using [torch.nn.Linear](https://docs.pytorch.org/docs/stable/generated/torch.nn.Linear.html).
- Hint 2: the transpose of the K matrix be can be done using [torch.Tensor.transpose](https://docs.pytorch.org/docs/stable/generated/torch.Tensor.transpose.html) or [torch.transpose](https://docs.pytorch.org/docs/stable/generated/torch.transpose.html#torch.transpose).
- Hint 3: the dimensions of a matrix can be retrieved using [torch.Tensor.shape](https://docs.pytorch.org/docs/stable/generated/torch.Tensor.shape.html) or [torch.Tensor.size](https://docs.pytorch.org/docs/stable/generated/torch.Tensor.size.html#torch.Tensor.size).
- Hint 4: the @ operator can be used for matrix multiplication.
- Hint 5: you can choose the dimension to which the [torch.softmax](https://docs.pytorch.org/docs/stable/generated/torch.softmax.html) is applied with the "dim" parameter.

In [17]:
# your code
import math
import torch.nn.functional as F

class SelfAttention(torch.nn.Module):

    def __init__(self, d_in, d_out, qkv_bias=False):
        super().__init__()
        self.W_query = torch.nn.Linear(d_in, d_out, bias=qkv_bias)
        self.W_key   = torch.nn.Linear(d_in, d_out, bias=qkv_bias)
        self.W_value = torch.nn.Linear(d_in, d_out, bias=qkv_bias)

    def forward(self, x): # x represents embeddings # dim = [batch_size, num_tokens, emb_dim]    # x: [batch_size, num_tokens, d_in]
        keys = self.W_key(x) # linear transformation of the input embeddings
        queries = self.W_query(x) # linear transformation of the input embeddings
        values = self.W_value(x) # linear transformation of the input embeddings

        attn_scores = torch.matmul(queries, keys.transpose(-2, -1)) # Q * Kt # matrix multiplication between queries, and keys transposed in the last 2 dimensions # dim = [batch_size, num_tokens, num_tokens]
        attn_weights =  F.softmax(attn_scores / math.sqrt(queries.shape[-1])) # softmax(attn_scores / sqrt(dim of embedding)) # attention scores rescaled with the square root of the embedding dimension, and normalized with softmax (be careful when choosing the dimension to which you apply the softmax)

        context_vec = torch.matmul(attn_weights, values) # attn_weights * V # weighted average between the attention weights and the values (matrix multiplication)
        return context_vec # contextualized embeddings # dim = [batch_size, num_tokens, emb_dim]

For simplicity, we generate random embeddings with the shape of [batch_size, num_tokens, emb_dim]

In [18]:
random_embeddings = torch.rand(3, 11, config_model["emb_dim"])
print(random_embeddings.shape)

torch.Size([3, 11, 768])


In [19]:
self_attention_block = SelfAttention(config_model["emb_dim"], config_model["emb_dim"])
contextualized_embeddings = self_attention_block(random_embeddings)
print(f"contextualized embeddings of shape {contextualized_embeddings.shape}")
print(contextualized_embeddings - random_embeddings) # contextualized embeddings are different from the initially non contextualized embedding

contextualized embeddings of shape torch.Size([3, 11, 768])
tensor([[[ 0.1665,  1.7948,  0.5903,  ...,  0.6676, -0.5569, -0.9453],
         [-0.2074,  1.1920,  0.6658,  ...,  0.3312, -1.2694, -0.2969],
         [ 0.3152,  1.5022,  1.2811,  ...,  0.7240, -1.3026, -0.4614],
         ...,
         [ 0.1490,  1.3958,  0.8943,  ...,  0.3246, -1.0314, -0.6124],
         [ 0.3803,  1.0387,  0.5496,  ...,  0.2170, -0.4680, -0.7108],
         [ 0.2422,  1.0641,  1.3300,  ...,  1.1255, -0.8299, -0.1949]],

        [[ 0.5289,  1.5645,  1.4235,  ...,  0.2874, -0.9026, -0.2129],
         [-0.4320,  1.6427,  0.6476,  ...,  0.5956, -0.8328, -0.0728],
         [ 0.4413,  1.3821,  1.0507,  ...,  0.1128, -0.3071, -0.4360],
         ...,
         [-0.1875,  1.5299,  1.4705,  ...,  0.6848, -0.6144, -0.8049],
         [ 0.3746,  1.8716,  1.4524,  ...,  0.7550, -0.3269, -0.3200],
         [-0.0887,  1.7264,  0.6117,  ...,  0.1574, -0.4797, -0.3937]],

        [[-0.5415,  1.3125,  1.4629,  ...,  0.6647, -1.1

  attn_weights =  F.softmax(attn_scores / math.sqrt(queries.shape[-1])) # softmax(attn_scores / sqrt(dim of embedding)) # attention scores rescaled with the square root of the embedding dimension, and normalized with softmax (be careful when choosing the dimension to which you apply the softmax)


Now that we created the core of the attention mechanism in transformers, we can extend it over multiple heads, where we divide the attention mechanism into multiple “heads” that operate independently. This is called multi-head attention mechanism. For the sake of simplicity, we provide you with the implementation.

In [20]:
class MultiHeadAttention(torch.nn.Module):
    def __init__(self, d_in, d_out, dropout, num_heads, qkv_bias=False, output_attention_weights = True):
        super().__init__()
        assert d_out % num_heads == 0, "d_out must be divisible by num_heads"

        self.d_out = d_out
        self.num_heads = num_heads
        self.head_dim = d_out // num_heads # Reduce the projection dim to match desired output dim
        self.output_attention_weights = output_attention_weights

        self.W_query = torch.nn.Linear(d_in, d_out, bias=qkv_bias)
        self.W_key = torch.nn.Linear(d_in, d_out, bias=qkv_bias)
        self.W_value = torch.nn.Linear(d_in, d_out, bias=qkv_bias)
        self.out_proj = torch.nn.Linear(d_out, d_out)  # Linear layer to combine head outputs
        self.dropout = torch.nn.Dropout(dropout)

    def forward(self, x): # embeddings (batch_size, num_tokens, d_in)
        num_tokens = x.shape[-2]

        keys = self.W_key(x) # Shape: (batch_size, num_tokens, d_out)
        queries = self.W_query(x)
        values = self.W_value(x)

        # We implicitly split the matrix by adding a `num_heads` dimension
        # Unroll last dim: (batch_size, num_tokens, d_out) -> (batch_size, num_tokens, num_heads, head_dim)
        keys = keys.view(-1, num_tokens, self.num_heads, self.head_dim)
        values = values.view(-1, num_tokens, self.num_heads, self.head_dim)
        queries = queries.view(-1, num_tokens, self.num_heads, self.head_dim)

        # Transpose: (batch_size, num_tokens, num_heads, head_dim) -> (batch_size, num_heads, num_tokens, head_dim)
        keys = keys.transpose(-2, -3)
        queries = queries.transpose(-2, -3)
        values = values.transpose(-2, -3)

        # Compute scaled dot-product attention (aka self-attention)
        attn_scores = queries @ keys.transpose(-1, -2)  # Dot product for each head

        attn_weights = torch.softmax(attn_scores / self.head_dim ** 0.5, dim=-1)
        attn_weights = self.dropout(attn_weights)

        # Shape: (batch_size, num_tokens, num_heads, head_dim)
        context_vec = (attn_weights @ values).transpose(-2, -3)

        # Combine heads, where self.d_out = self.num_heads * self.head_dim
        context_vec = context_vec.contiguous().view(-1, num_tokens, self.d_out)
        context_vec = self.out_proj(context_vec) # optional projection

        return (context_vec, attn_weights) if self.output_attention_weights else (context_vec,)


We instanciate the MultiHeadAttention class and provide it with the random embeddings as input.

In [21]:
multihead_attention_block = MultiHeadAttention(config_model["emb_dim"], config_model["emb_dim"], 0.1, num_heads = 2, output_attention_weights = True)
contextualized_embeddings, attn_weights = multihead_attention_block(random_embeddings)

print(f"contextualized embeddings of shape {contextualized_embeddings.shape}") # [batch_size, num_tokens, emb_dim]
print(f"attention weights of shape {attn_weights.shape}") # [batch_size, n_heads, num_tokens, num_tokens]

contextualized embeddings of shape torch.Size([3, 11, 768])
attention weights of shape torch.Size([3, 2, 11, 11])


We visualize the attention weight matrices with the [interactive BertViz tool](https://github.com/jessevig/bertviz). Associated with our random initial embeddings, we randomly chose a sequence of "Hello" tokens. Note that each token (on the left) attends to both preceding and following tokens in the sequence.

In [22]:
bertviz_attention_weight = (attn_weights[0].unsqueeze(0), ) # for 1st sequence
bertviz_token_string = ["Hello"] * 11
print(f"attentions: {len(bertviz_attention_weight)} encoder layers, each with an attention matrix of shape {bertviz_attention_weight[0].shape}")
bertviz.model_view(bertviz_attention_weight, bertviz_token_string)
bertviz.head_view(bertviz_attention_weight, bertviz_token_string)

attentions: 1 encoder layers, each with an attention matrix of shape torch.Size([1, 2, 11, 11])


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

However, in a decoder model (unlike an encoder), tokens should only attend to past tokens (those that come earlier in the sequence). To achieve it, we will implement a masking mechanism on the attention scores, so that only past tokens are considered.

For simplicity and illustration purpose, we generate a random attention score matrix of size [1, 1, num_tokens, num_tokens] with the [torch.rand](https://pytorch.org/docs/main/generated/torch.rand.html) function, representing the attention scores for a single sequence and a single attention head.

In [23]:
# your code

num_tokens = 6
attn_scores = torch.rand(1, 1, num_tokens, num_tokens)   # done by me
print(attn_scores)

tensor([[[[0.5526, 0.3047, 0.9691, 0.3901, 0.8281, 0.0293],
          [0.3053, 0.1510, 0.5907, 0.6060, 0.9772, 0.8764],
          [0.3011, 0.9663, 0.6859, 0.6373, 0.8758, 0.6487],
          [0.7700, 0.0427, 0.9903, 0.1065, 0.5760, 0.1946],
          [0.7336, 0.9236, 0.2698, 0.0011, 0.5414, 0.1738],
          [0.9532, 0.6829, 0.5524, 0.6439, 0.7926, 0.3811]]]])


In [26]:
attn_scores.shape

torch.Size([1, 1, 6, 6])

As the first step in creating our mask, we use the [torch.ones](https://pytorch.org/docs/main/generated/torch.ones.html) function to generate a matrix of ones with a size of [num_tokens, num_tokens].

In [24]:
# your code

matrix_ones = torch.ones(num_tokens, num_tokens)
print(matrix_ones)

tensor([[1., 1., 1., 1., 1., 1.],
        [1., 1., 1., 1., 1., 1.],
        [1., 1., 1., 1., 1., 1.],
        [1., 1., 1., 1., 1., 1.],
        [1., 1., 1., 1., 1., 1.],
        [1., 1., 1., 1., 1., 1.]])


In [27]:
matrix_ones.shape

torch.Size([6, 6])

Next, we apply the [torch.triu](https://pytorch.org/docs/stable/generated/torch.triu.html) function on our matrix of ones to set the lower triangular part—including the main diagonal—to zeros while keeping the ones in the upper triangular part. Use the "diagonal" parameter of this function to ensure that the main diagonal is also set to zero.

In [32]:
# your code

mask_int = torch.triu(matrix_ones,diagonal=0 )
print(mask_int)

tensor([[1., 1., 1., 1., 1., 1.],
        [0., 1., 1., 1., 1., 1.],
        [0., 0., 1., 1., 1., 1.],
        [0., 0., 0., 1., 1., 1.],
        [0., 0., 0., 0., 1., 1.],
        [0., 0., 0., 0., 0., 1.]])


Then, based on our mask tensor of ones and zeros, we generate a boolean mask with True and False values.

In [33]:
mask_bool = mask_int.bool()
print(mask_bool)

tensor([[ True,  True,  True,  True,  True,  True],
        [False,  True,  True,  True,  True,  True],
        [False, False,  True,  True,  True,  True],
        [False, False, False,  True,  True,  True],
        [False, False, False, False,  True,  True],
        [False, False, False, False, False,  True]])


Next, we mask the upper triangular part of the attention score matrix by replacing these values by "negative infinity" (using -torch.inf). To do this, we apply the [masked_fill](https://pytorch.org/docs/stable/generated/torch.Tensor.masked_fill.html) function.

In [38]:
# your code

attn_masked = torch.masked_fill(attn_scores, mask_bool, -torch.inf)
print(attn_masked)

tensor([[[[  -inf,   -inf,   -inf,   -inf,   -inf,   -inf],
          [0.3053,   -inf,   -inf,   -inf,   -inf,   -inf],
          [0.3011, 0.9663,   -inf,   -inf,   -inf,   -inf],
          [0.7700, 0.0427, 0.9903,   -inf,   -inf,   -inf],
          [0.7336, 0.9236, 0.2698, 0.0011,   -inf,   -inf],
          [0.9532, 0.6829, 0.5524, 0.6439, 0.7926,   -inf]]]])


When we apply the softmax function to this masked attention score matrix, the -inf values become zeros, and each row sums up to 1. Since a token's contextualized embedding is computed as a weighted average of other token embeddings (using the attention weights), the future tokens will have a weight of zero. This means that a given token's embedding will not incorporate information from future tokens.

In [39]:
attn_weights = torch.softmax(attn_masked, dim=-1) # note that in the multi head implemention, a scaling factor would have been applied on the masked attention scores.
print(attn_weights)

tensor([[[[   nan,    nan,    nan,    nan,    nan,    nan],
          [1.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000],
          [0.3396, 0.6604, 0.0000, 0.0000, 0.0000, 0.0000],
          [0.3663, 0.1770, 0.4566, 0.0000, 0.0000, 0.0000],
          [0.3013, 0.3644, 0.1895, 0.1448, 0.0000, 0.0000],
          [0.2488, 0.1899, 0.1667, 0.1826, 0.2119, 0.0000]]]])


Reusing our implementation of the mask, you can now code our decoder's CausalMultiHeadAttention class that masks future tokens.

In [41]:
# your code

class CausalMultiHeadAttention(torch.nn.Module):
    def __init__(self, d_in, d_out, dropout, num_heads, qkv_bias=False, output_attention_weights = True):
        super().__init__()
        assert d_out % num_heads == 0, "d_out must be divisible by num_heads"

        self.d_out = d_out
        self.num_heads = num_heads
        self.head_dim = d_out // num_heads # Reduce the projection dim to match desired output dim
        self.output_attention_weights = output_attention_weights

        self.W_query = torch.nn.Linear(d_in, d_out, bias=qkv_bias)
        self.W_key = torch.nn.Linear(d_in, d_out, bias=qkv_bias)
        self.W_value = torch.nn.Linear(d_in, d_out, bias=qkv_bias)
        self.out_proj = torch.nn.Linear(d_out, d_out)  # Linear layer to combine head outputs
        self.dropout = torch.nn.Dropout(dropout)

    def forward(self, x): # embeddings (batch_size, num_tokens, d_in)

        # your code

        # you can copy paste the forward implementation from MultiHeadAttention
        # then insert the implementation of the mask at the right place
        num_tokens = x.shape[-2]

        keys = self.W_key(x) # Shape: (batch_size, num_tokens, d_out)
        queries = self.W_query(x)
        values = self.W_value(x)

        # We implicitly split the matrix by adding a `num_heads` dimension
        # Unroll last dim: (batch_size, num_tokens, d_out) -> (batch_size, num_tokens, num_heads, head_dim)
        keys = keys.view(-1, num_tokens, self.num_heads, self.head_dim)
        values = values.view(-1, num_tokens, self.num_heads, self.head_dim)
        queries = queries.view(-1, num_tokens, self.num_heads, self.head_dim)

        # Transpose: (batch_size, num_tokens, num_heads, head_dim) -> (batch_size, num_heads, num_tokens, head_dim)
        keys = keys.transpose(-2, -3)
        queries = queries.transpose(-2, -3)
        values = values.transpose(-2, -3)

        # Compute scaled dot-product attention (aka self-attention)
        attn_scores = queries @ keys.transpose(-1, -2)  # Dot product for each head


        # your code
        matrix_ones = torch.ones(num_tokens, num_tokens)
        mask_int = torch.triu(matrix_ones,diagonal=0 )
        mask_bool = mask_int.bool()
        print(mask_bool)
        attn_masked = torch.masked_fill(attn_scores, mask_bool, -torch.inf)
        print(attn_masked)

        attn_weights = torch.softmax(attn_masked, dim=-1) # note that in the multi head implemention, a scaling factor would have been applied on the masked attention scores.

        # attn_weights = torch.softmax(attn_masked / self.head_dim ** 0.5, dim=-1)




        attn_weights = self.dropout(attn_weights)

        # Shape: (batch_size, num_tokens, num_heads, head_dim)
        context_vec = (attn_weights @ values).transpose(-2, -3)

        # Combine heads, where self.d_out = self.num_heads * self.head_dim
        context_vec = context_vec.contiguous().view(-1, num_tokens, self.d_out)
        context_vec = self.out_proj(context_vec) # optional projection

        return (context_vec, attn_weights) if self.output_attention_weights else (context_vec,)


        # return (context_vec, attn_weights) if self.output_attention_weights else (context_vec,)


We create an instance of our CausalMultiHeadAttention class, we input the random embeddings and we save the outputs containing the attention weight matrix.

In [42]:
multihead_attention_block = CausalMultiHeadAttention(config_model["emb_dim"], config_model["emb_dim"], 0.1, num_heads = 2, output_attention_weights = True)
contextualized_embeddings, attn_weights = multihead_attention_block(random_embeddings)

print(f"contextualized embeddings of shape {contextualized_embeddings.shape}") # [batch_size, num_tokens, emb_dim]
print(f"attention weights of shape {attn_weights.shape}") # [batch_size, n_heads, num_tokens, num_tokens]

tensor([[ True,  True,  True,  True,  True,  True,  True,  True,  True,  True,
          True],
        [False,  True,  True,  True,  True,  True,  True,  True,  True,  True,
          True],
        [False, False,  True,  True,  True,  True,  True,  True,  True,  True,
          True],
        [False, False, False,  True,  True,  True,  True,  True,  True,  True,
          True],
        [False, False, False, False,  True,  True,  True,  True,  True,  True,
          True],
        [False, False, False, False, False,  True,  True,  True,  True,  True,
          True],
        [False, False, False, False, False, False,  True,  True,  True,  True,
          True],
        [False, False, False, False, False, False, False,  True,  True,  True,
          True],
        [False, False, False, False, False, False, False, False,  True,  True,
          True],
        [False, False, False, False, False, False, False, False, False,  True,
          True],
        [False, False, False, False, Fal

We visualize the attention weight matrices with the [interactive BertViz tool](https://github.com/jessevig/bertviz). Associated with our random initial embeddings, we randomly chose a sequence of "Hello" tokens. Note that, unlike the encoder example where tokens attend to all positions, the decoder ensures that tokens (on the left) DO NOT attend to future tokens in the sequence.

In [43]:
bertviz_attention_weight = (attn_weights[0].unsqueeze(0), ) # for 1st sentence
bertviz_token_string = ["Hello"] * 11
print(f"attentions: {len(bertviz_attention_weight)} decoder layers, each with an attention matrix of shape {bertviz_attention_weight[0].shape}")
bertviz.model_view(bertviz_attention_weight, bertviz_token_string)
bertviz.head_view(bertviz_attention_weight, bertviz_token_string)

attentions: 1 decoder layers, each with an attention matrix of shape torch.Size([1, 2, 11, 11])


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

We don't observe much variation between the different attention heads since we did not train the model (the weights are randomly initialized).

Now, let's visualize the attention weight matrices of the already pretrained GPT2 decoder model. Note again that a given token does not look at future tokens.

In [44]:
sentence = "He turned off the fan whenever he felt too cold."
inputs = tokenizer(sentence, add_special_tokens = False, return_tensors = "pt") # tokenization of the sentence
attention = GPT2_model(**inputs).attentions # get the attention matrices
token_ids = inputs['input_ids'][0].tolist() # extract the input ids at batch index 0
tokens = tokenizer.batch_decode(token_ids) # convert the token ids to their string representation

bertviz.model_view(attention, tokens)
bertviz.head_view(attention, tokens)



<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

# Simple greedy text generation with a decoder model

In this section, we will implement a simple method to generate text with a decoder model.

First, we define a simple list of sequences that have been truncated. The goal is to use a decoder model to generate new tokens that extend these sequences.

In [45]:
simple_text_list = [
    "I love learning about science and",
    "The 2025 Deep Learning School is",
    "There are a lot of interesting",
]

Let's tokenize that text list and get the token IDs.

In [46]:
input_ids = tokenizer(simple_text_list, add_special_tokens = False, return_tensors = "pt")["input_ids"] # token IDs

print(f"\nencoded text list of shape {input_ids.shape} =\n{input_ids}")


encoded text list of shape torch.Size([3, 6]) =
tensor([[   40,  1842,  4673,   546,  3783,   290],
        [  464, 32190, 10766, 18252,  3961,   318],
        [ 1858,   389,   257,  1256,   286,  3499]])


We define a DecoderModelForGeneration class that takes as input a batch of tokenized sequences, processes it with a decoder model that outputs the contextualized embedding of each token (here, we will use the pretrained GPT2 decoder model), and adds a linear layer (generation head) that will output logits of size [batch_size, sequence_size, vocab_size]. These logits will be used for predicting the next tokens (which will be useful for pretraining and generating texts).

In [47]:
class DecoderModelForGeneration(torch.nn.Module):
    def __init__(self,
        base_model_name,
        vocab_size,
    ):
        super().__init__()
        self.base_model_name = base_model_name
        self.vocab_size = vocab_size
        self.transformer = AutoModel.from_pretrained(base_model_name)
        self.embedding_size = self.transformer.config.hidden_size
        self.generation_head = torch.nn.Linear(self.embedding_size, self.vocab_size, bias=False)

    def forward(self, idx): # idx => [batch_size, num_tokens]
        embeddings = self.transformer(idx).last_hidden_state # contextualized embedding for each token # [batch_size, num_tokens, embed_dim]
        logits = self.generation_head(embeddings) # [batch_size, num_tokens, vocab_size]
        return logits

We instanciate our DecoderModelForGeneration class with the base decoder model GPT2.

In [48]:
my_model = DecoderModelForGeneration(
    "openai-community/gpt2",
    tokenizer.vocab_size,
)

In [49]:
logits = my_model(input_ids)
print(logits.shape, logits)

torch.Size([3, 6, 50257]) tensor([[[ 2.3764, -0.0508, -2.2615,  ...,  2.2776,  3.3297,  0.7150],
         [ 6.0042, -1.9020, -5.8365,  ...,  4.3961,  6.9874, -1.2886],
         [ 7.3887, -1.7156, -7.6025,  ...,  5.0728,  8.9435, -0.4900],
         [ 6.9218, -2.3605, -7.3171,  ...,  4.7727,  7.8541, -1.6848],
         [ 6.4064, -0.5488, -5.2556,  ...,  5.6959,  7.4150, -0.3297],
         [ 7.1372, -1.5758, -7.7306,  ...,  4.7083,  8.7375, -1.9347]],

        [[ 1.9975, -0.2222, -2.2529,  ...,  1.6956,  3.2501,  0.4185],
         [ 5.5651, -2.2276, -5.7188,  ...,  4.2225,  7.8695, -0.5245],
         [ 4.5653, -1.0286, -3.9342,  ...,  4.0277,  5.7227,  1.2001],
         [ 6.8358, -2.0269, -7.0507,  ...,  4.3734,  8.1286, -1.2051],
         [ 7.5430, -1.5299, -5.9701,  ...,  5.2575,  8.7922, -0.3348],
         [ 8.5466, -1.9091, -8.0782,  ...,  5.5132, 10.5152, -0.9297]],

        [[ 2.3071,  0.0868, -2.3590,  ...,  2.0294,  3.0098,  0.4980],
         [ 8.7540, -2.1452, -9.1842,  ...,  5.9

We want to generate new tokens to extend the truncated simple text list. In order to do it, we first take into account only the logits of the last token of each sequence in the batch, which will later be used to predict the next word.

Get those last-token logits. Logits were of dimension [batch_size, num_tokens, vocab_size], the last-token logits will be of dimension [batch_size, vocab_size].

In [None]:
# your code
last_token_logits = ...
print("last_token_logits:", last_token_logits.shape, last_token_logits)

Then, use the [torch.argmax](https://pytorch.org/docs/main/generated/torch.argmax.html) function on the logits of the last token of each sequence to get the predicted most probable next token ID for each sequence in the batch.

In [None]:
# your code
idx_next = ...
print("idx_next:", idx_next.shape, idx_next)

Append the predicted most probable token to each sequence. As `input_ids` was of dimension [3, 6], and we append at the end the predicted next token, we get a dimension of [3, 7]. You can use [torch.cat](https://docs.pytorch.org/docs/stable/generated/torch.cat.html) to concatenate the input token IDs and the next predicted token IDs.

In [None]:
# your code
idx = ...
print("input_ids", input_ids.shape, input_ids)
print("idx", idx.shape, idx)

We now decode the token IDs to get the beginning of the sentence and the next predicted subwords. Note that the generation head of our DecoderModelForGeneration instance has not been trained yet, therefore it outputs tokens that are not meaningful.

In [None]:
print(tokenizer.batch_decode(idx))

Reusing the code above, we define a simple generation function to produce tokens iteratively. Note that we use a for loop to generate not only 1 single tokens, but to successively generate several next tokens for each sequence. This is an autoregressive way to generate tokens at inference time. The decoding strategy that we use when selecting the next token is called "greedy", because take the most probable next token at each step.

Note: A more complete version of this function would add a stopping criterion to stop the generation when encountering an EOS token (the End Of Sequence token). For the sake of simplicity, we do not implement it.

In [None]:
# your code

def generate_text_simple(model, idx, max_new_tokens, context_size, device):
    model.eval()
    model = model.to(device)
    idx = idx.to(device)

    for _ in range(max_new_tokens):

        # Crop current context if it exceeds the supported context size
        # E.g., if LLM supports only 5 tokens, and the context size is 10
        # then only the last 5 tokens are used as context
        idx_cond = idx[:, -context_size:]

        # Get the logits
        with torch.inference_mode():
            logits = ...

        # Focus only on the last token in each sequence
        # (batch_size, num_tokens, vocab_size) becomes (batch_size, vocab_size)
        last_token_logits = ...

        # Get the idx of the vocab entry with the highest logits value
        idx_next = ...  # (batch_size, 1)

        # Append sampled index to the running sequence
        idx = ...  # (batch_size, num_tokens + 1)

    return idx

We generate several next tokens for each sequence in the batch.

In [None]:
max_new_tokens = 10 # to generate 10 more tokens
generated_ids = generate_text_simple(my_model, input_ids, max_new_tokens, config_model["context_length"], device)
print("generated_ids:", generated_ids.shape, generated_ids)

We decode the now longer sequences (where more tokens have been generated). Note that the function outputs nonsensical next tokens because the generation head of the DecoderModelForGeneration model has not been trained yet.

In [None]:
decoded_ids = tokenizer.batch_decode(generated_ids)
print(decoded_ids)

# Decoder pretraining

In this section, we will pretrain the DecoderModelForGeneration model to generate new tokens. Note that the "generation head" has not been trained whatsoever, however, the GPT2 model in DecoderModelForGeneration has been already pretrained.

We download the text of "[The Project Gutenberg eBook of The Adventures of Sherlock Holmes, by Arthur Conan Doyle](https://www.gutenberg.org/files/1661/1661-0.txt)"

In [None]:
url = 'https://www.gutenberg.org/files/1661/1661-0.txt'
resp = requests.get(url)
raw_text = resp.text.replace("\r\n", " ")

In [None]:
print(raw_text[ : 2000])

Split the text into a training part and and validation part. You can apply a very basic strategy: for instance, the first 80% of the text is the training text, and the last 20% of the text is the validation text.

In [None]:
# your code
train_ratio = 0.80 # Train/validation ratio

train_text = ...
val_text = ...

We use a Dataset where the text is tokenizer and divided into multiple chunks of size "max_length" and with a stride of "stride". The targets are the inputs but shifted one position forward, since we want to predict the next token.

In [None]:
class DatasetPretraining(Dataset):
    def __init__(self, txt, tokenizer, max_length, stride):
        self.input_ids = []
        self.target_ids = []

        # Tokenize the entire text
        token_ids = tokenizer(txt, truncation = False, padding = False, add_special_tokens = False)["input_ids"]
        # Use a sliding window to chunk the book into overlapping sequences of max_length
        for i in range(0, len(token_ids) - max_length, stride):
            input_chunk = token_ids[i:i + max_length]
            target_chunk = token_ids[i + 1: i + max_length + 1]
            self.input_ids.append(torch.tensor(input_chunk))
            self.target_ids.append(torch.tensor(target_chunk))

    def __len__(self):
        return len(self.input_ids)

    def __getitem__(self, idx):
        return self.input_ids[idx], self.target_ids[idx]

We define a few key training elements, which may not be optimal but will still be effective for our task.

In [None]:
my_model = DecoderModelForGeneration(
    "openai-community/gpt2",
    tokenizer.vocab_size,
)

traning_max_length = 1024
training_stride = traning_max_length
batch_size = 4
nb_epochs = 4
learning_rate = 5e-4
optimizer = torch.optim.AdamW(my_model.parameters(), lr = learning_rate)


We create our training and validation datasets.

In [None]:
train_dataset = DatasetPretraining(train_text, tokenizer, traning_max_length, training_stride)
val_dataset = DatasetPretraining(val_text, tokenizer, traning_max_length, training_stride)

Note that the targets are the inputs but shifted one position forward.

In [None]:
print(len(train_dataset))
x, y = train_dataset[0]
print(x.shape, x[:10])
print(y.shape, y[:10])

We create our training and validation dataloaders to automatically handle the batching process.

In [None]:
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, drop_last=True)
val_dataloader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, drop_last=False)

Now, we train our model to predict the next token. Remember that the defined training parameters are not ideal, and that we do not use a big enough dataset. However, you can still see that the model is learning.

In [None]:
def train(model, train_dataloader, val_dataloader, nb_epochs, device, optimizer):
    training_validation_loss_history = {"training_loss" : [], "validation_loss" : []}
    model = model.to(device)
    initial_validation_loss = epoch_validation(model, val_dataloader, -1, device)
    for epoch in range(nb_epochs):
        training_loss = epoch_training(model, train_dataloader, epoch, device, optimizer)
        validation_loss = epoch_validation(model, val_dataloader, epoch, device)
        training_validation_loss_history["training_loss"].append(training_loss)
        training_validation_loss_history["validation_loss"].append(validation_loss)
    return training_validation_loss_history

def epoch_training(model, dataloader, epoch, device, optimizer):
    model.train()
    loss_epoch_list = []
    with tqdm(dataloader, unit="batch") as tqdm_dataloader:
        tqdm_dataloader.set_description(f"Epoch {epoch}: Training")
        for input, target in tqdm_dataloader:
            # load tensor to GPU if enabled
            input = input.to(device)
            target = target.to(device)
            # forward pass
            logits = model(input)
            # get the loss
            loss = torch.nn.functional.cross_entropy(logits.view(-1, logits.size(-1)), target.view(-1))
            loss_epoch_list.append(loss.item())
            loss_epoch_mean = sum(loss_epoch_list) / len(loss_epoch_list)
            tqdm_dataloader.set_postfix(loss = loss_epoch_mean)
            # backward pass, optimize
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
    return loss_epoch_mean

def epoch_validation(model, dataloader, epoch, device):
    model.eval()
    loss_epoch_list = []
    with tqdm(dataloader, unit="batch") as tqdm_dataloader, torch.inference_mode():
        tqdm_dataloader.set_description(f"Epoch {epoch}: Validation")
        for input, target in tqdm_dataloader:
            # load tensor to GPU if enabled
            input = input.to(device)
            target = target.to(device)
            # forward pass
            logits = model(input)
            # get the loss
            loss = torch.nn.functional.cross_entropy(logits.view(-1, logits.size(-1)), target.view(-1))
            loss_epoch_list.append(loss.item())
            loss_epoch_mean = sum(loss_epoch_list) / len(loss_epoch_list)
            tqdm_dataloader.set_postfix(loss = loss_epoch_mean)
    return loss_epoch_mean

In [None]:
train(my_model, train_dataloader, val_dataloader, nb_epochs, device, optimizer)

Let's test our trained model on the previous simple truncated text list. We will make the model generate new tokens. We notice that this time, the generation is much better than when the model was not trained. Note that the generation could have been better with a bigger dataset and better training parameters.

In [None]:
print("Initial simple truncated text list:")
print(simple_text_list)

In [None]:
max_new_tokens = 10 # to generate 10 more tokens
generated_ids = generate_text_simple(my_model, input_ids, max_new_tokens, config_model["context_length"], device)
decoded_ids = tokenizer.batch_decode(generated_ids)
print("text list completed with new generated tokens:")
print(decoded_ids)

Now, let's use the already fully pretrained GPT-2 model, where the "generation head" has also already been fully trained on a huge corpus of texts, to generate the next few tokens completing the sentences. You can see that the generated tokens makes much more sense.

In [None]:
max_new_tokens = 10 # to generate a maximum of 10 more tokens
generator = pipeline('text-generation', model='gpt2', device = device)
print(generator(simple_text_list, truncation=True, max_new_tokens=max_new_tokens, num_return_sequences=1, pad_token_id=tokenizer.eos_token_id, do_sample=False))

# Non-greedy decoding strategy: sampling with top_k and temperature

We used earlier the "greedy" text generation strategy, where, as explained earlier, the generated token is selected at each generation step corresponding to the largest probability score among all tokens in the vocabulary. This means that the LLM will always generate the same outputs even if we run the preceding generation function multiple times on the same start context.

Let’s look at other common text generation strategies (also called decoding strategies) to generate more original text.

We will look at two techniques:

- Top-k sampling is a technique that leverages the probability distribution generated by the language model to select a token from the k most likely options.

- We can further control the distribution and selection process via a concept called temperature scaling.

## Sampling with temperature

Let’s now look at temperature scaling, a technique that adds a probabilistic selection process to the next-token generation task. Previously, we always sampled the token with the highest probability as the next token using torch.argmax, also known as greedy decoding. To generate text with more variety, we can replace torch.argmax with a function that samples from a probability distribution (here, the probability scores the LLM generates for each token ID at each token generation step).

To illustrate the probabilistic sampling with a concrete example, let’s briefly discuss the next-token generation process using a very small vocabulary for illustration
purposes:

Fist, for illustration purposes, we show an example on a very small vocabulary, and we write ourself the logits of the next token hypothetically outputted by a LLM.

In [None]:
vocab = {
    "Welcome": 0,
    "to": 1,
    "the": 2,
    "amazing": 3,
    "2025": 4,
    "Deep": 5,
    "Learning": 6,
    "School": 7,
    "!": 8,
}

inverse_vocab = {v: k for k, v in vocab.items()}

next_token_logits = torch.tensor([4.51, 0.89, -1.90, 6.75, 1.63, -1.62, -1.89, 6.28, 1.79])


In the greedy decoding strategy (without using sampling), we converted the logits into probabilities via the softmax function and obtain the token ID corresponding to the generated token via the argmax function, which we could then map back into text via the inverse vocabulary.

For a given sequence start, the generated next token is always the same, no matter how many times we run the code, because it's deterministic, no randomness is introduced.

In [None]:
# greedy decoding
probas = torch.softmax(next_token_logits, dim=-1)
print("probas:", probas)
next_token_id = torch.argmax(probas, dim = -1).item()
print("next_token_id:", next_token_id)
print("next token:", inverse_vocab[next_token_id]) # always the same predicted next token

To implement a probabilistic sampling process, we can now replace torch.argmax with the [torch.multinomial](https://pytorch.org/docs/stable/generated/torch.multinomial.html) function in PyTorch.

Run sereval times the following piece of code, and observe that the generated next token do change.

In [None]:
probas = torch.softmax(next_token_logits, dim=-1)
next_token_id = torch.multinomial(probas, num_samples=1).item()
print("next token:", inverse_vocab[next_token_id])

To better visualize the probabilities for the next token, let’s repeats this sampling "number_of_tests" = 1,000 times. We display how many times the words in the vocabulary have been selected as the next token.

We also add the temperature scaling. Temperature scaling is just dividing the logits by a number greater than 0. A temperature of 1 divides the logits by 1 before passing them to the softmax function to compute the probability scores. In other words, using a temperature of 1 is the same as not using any temperature scaling.

Test the next token distribution with different values for the temperature (for instance: 0.1, 1, 5, etc). How does the distribution of the next token changes depending on the temperature ? What does it mean to have a low temperature ? And a high temperature ?

In [None]:
temperature = 1.0
number_of_tests = 1000

scaled_next_token_logits = next_token_logits / temperature
probas = torch.softmax(scaled_next_token_logits, dim=-1)
next_token_id_list = torch.multinomial(probas, num_samples = number_of_tests, replacement = True).tolist()
next_token_list = [inverse_vocab[next_token_id] for next_token_id in next_token_id_list]

print(Counter(next_token_list))

Temperatures greater than 1 result in more uniformly distributed token probabilities, and temperatures smaller than 1 will result in more confident (sharper or more peaky) distributions. We illustrate this by plotting the probabilities scaled with different temperature values.

In [None]:
temperatures = [1, 0.1, 5]
scaled_probas = [torch.softmax(next_token_logits / T, dim=-1) for T in temperatures]

x = torch.arange(len(vocab))
bar_width = 0.15
fig, ax = plt.subplots(figsize=(5, 3))
for i, T in enumerate(temperatures):
    rects = ax.bar(x + i * bar_width, scaled_probas[i],
    bar_width, label=f'Temperature = {T}')
ax.set_ylabel('Probability')
ax.set_xticks(x)
ax.set_xticklabels(vocab.keys(), rotation=90)
ax.legend()
plt.tight_layout()
plt.show()

## top_k sampling

We previously implemented a probabilistic sampling approach coupled with temperature scaling to increase the diversity of the outputs. This method allows for the exploring of less likely but potentially more interesting and creative paths in the generation process. However, one downside of this approach is that it sometimes leads to grammatically incorrect or completely nonsensical output.

Top-k sampling, when combined with probabilistic sampling and temperature scaling, can improve the text generation results. In top-k sampling, we can restrict the sampled tokens to the top-k most likely tokens and exclude all other tokens from the selection process by masking their probability scores.

Using top-k sampling with k = 3, we focus on the three tokens associated with the highest logits.

In [None]:
top_k = 3
top_logits, top_pos = torch.topk(next_token_logits, top_k)
print("Initial logits:", next_token_logits)
print("Top logits:", top_logits)
print("Top positions:", top_pos)

We then mask out all other tokens with negative infinity (–inf). In other words, we apply [torch.where](https://pytorch.org/docs/stable/generated/torch.where.html) function to set the logit values of tokens that are below the lowest logit value within our top-three selection to negative infinity (-inf).

In [None]:
new_logits = torch.where(
    condition=next_token_logits < top_logits[-1], # Identifies logits less than the minimum in the top n
    input=torch.tensor(float('-inf')), # Assigns –inf to these lower logits
    other=next_token_logits # Retains the original logits for all other tokens
)

print(new_logits)

Lastly, we apply the softmax function to turn these new logits into next-token probabilities. As we can see, the result of this top-three approach are three non-zero probability scores.

In [None]:
topk_probas = torch.softmax(new_logits, dim=-1)
print(topk_probas)

We can now apply the temperature scaling and multinomial function for probabilistic sampling to select the next token among these three non-zero probability scores to generate the next token. We do this next by modifying the text generation function.

## Combining top_k sampling with temperature for next token generation

We combine temperature sampling and top-k sampling for a more advanced text generation function.

In [None]:
def generate_advanced(model, idx, max_new_tokens, context_size, device, temperature=0.0, top_k=None):
    model.eval()
    model = model.to(device)
    idx = idx.to(device)

    # For-loop is the same as before: Get logits, and only focus on last time step
    for _ in range(max_new_tokens):
        idx_cond = idx[:, -context_size:]
        with torch.no_grad():
            logits = model(idx_cond)
        logits = logits[:, -1, :]

        # New: Filter logits with top_k sampling
        if top_k is not None:
            # Keep only top_k values
            top_logits, top_pos = torch.topk(logits, top_k)
            min_val = top_logits[:, -1] # miminum logits values to be retrained
            logits = torch.where(
                condition = logits < min_val.unsqueeze(dim = -1), # Identifies logits less than the minimum in the top n
                input = torch.tensor(float("-inf")).to(device), # Assigns –inf to these lower logits
                other = logits # Retains the original logits for all other tokens
            )

        # New: Apply temperature scaling
        if temperature > 0.0:
            logits = logits / temperature

            # Apply softmax to get probabilities
            probs = torch.softmax(logits, dim=-1)  # (batch_size, context_len)

            # Sample from the distribution
            idx_next = torch.multinomial(probs, num_samples=1)  # (batch_size, 1)

        # Otherwise same as before: get idx of the vocab entry with the highest logits value
        else:
            idx_next = torch.argmax(logits, dim=-1, keepdim=True)  # (batch_size, 1)

        # Same as before: append sampled index to the running sequence
        idx = torch.cat((idx, idx_next), dim=1)  # (batch_size, num_tokens+1)

    return idx

You can try different temperature and top_k values and see the generated tokens.

In [None]:
# your code

temperature = ...
top_k = ...
max_new_tokens = 10 # to generate 10 more tokens
generated_ids = generate_advanced(my_model, input_ids, max_new_tokens, config_model["context_length"], device, temperature = temperature, top_k = top_k)
decoded_ids = tokenizer.batch_decode(generated_ids)
print(decoded_ids)