<a href="https://colab.research.google.com/github/nlp-en-es/nlp-de-cero-a-cien/blob/main/3_transformers_1/multi_head_attention.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Módulo de Multi-Head Attention

In [None]:
import torch
from torch import nn
from torch import Tensor
import math
from typing import Optional

## Attention

Una función de atención puede ser decrita como un mapeo de una consulta (query) y un conjunto de parejas llave-valor (key-value) a una salida, donde consultas, llaves, valores y salidas son todos vectores. La salida se calcula como una suma ponderada de los valores, donde el peso asignado a cada uno de los valores es calculado por una función de compatibilidad entre cada consulta y la correspodiente llave.

En Transformers, dicha función atención se denomina "Scaled Dot-Product Attention". La entrada consiste en consultas y llaves de dimensión $d_k$, y valores de dimensión $d_v$. Calculamos el producto punto de la consulta con todas las llave, divimos cada producto por $\sqrt{d_k}$, y aplicamos una función softmax para obtener los pesos sobre los valores.

En la práctica, calculamos la función de atención sobre un conjunto de consultas de manera simultanea, acopladas en una matriz $Q$. Las llaves y valores también se acoplan en matricez $K$ y $V$ respectivamente. Calculamos la matriz de salidas de la siguiente manera:

$Attention(Q, K, V) = softmax(\frac{QK^T}{\sqrt{d_k}})V$

In [None]:
class MultiHeadedAttention(nn.Module):
    '''
    BLoque de MultiHeadedAttention que permita al modelo atender de manera
    conjunta a información de diferentes subespacios de representación.

    Args:
        num_heads (int): número de cabezas por capa
        d_model (int): dimensión total del modelo
        dropout (float): Una capa de dropout sobre attention_probs. Default: 0.0. 
    '''
    def __init__(self, num_heads: int, d_model: int, dropout: float = 0.0):
        super(MultiHeadedAttention, self).__init__()
        if d_model % num_heads != 0:
            raise ValueError(
                f"The hidden size ({d_model}) is not a multiple of the number of attention "
                f"heads ({num_heads})"
            )
        # Número de features por cabeza, se asume que d_v = d_k
        self.d_k = d_model // num_heads
        self.d_model = d_model
        self.num_heads = num_heads

        self.query = nn.Linear(d_model, d_model)
        self.key = nn.Linear(d_model, d_model)
        self.value = nn.Linear(d_model, d_model)

        self.dropout = nn.Dropout(p=dropout)

    def transpose_for_scores(self, x):
        new_x_shape = x.size()[:-1] + (self.num_heads, self.d_k)
        x = x.view(*new_x_shape)
        return x.permute(0, 2, 1, 3)

    def forward(
        self,
        query: Tensor,
        key: Tensor,
        value: Tensor,
        mask: Optional[Tensor] = None,
        output_attentions: Optional[bool] = False
    ):
        '''
        Args:
            query, key, value: Se mapea el query y un conjunto de parejas key-value a una salida output.
            mask: máscara que previene la atención en ciertas posiciones.
            output_attentions: Indica si se quiere regresar la matriz de pesos de atención
        '''
        if mask is not None:
            # Se aplica la misma máscara para todas las cabezas
            mask = mask.unsqueeze(1)
        

        query_layer = self.transpose_for_scores(self.query(query)) # (batch, num_heads, seq_len, d_k)
        key_layer = self.transpose_for_scores(self.key(key))
        value_layer = self.transpose_for_scores(self.value(value))

        # Se realiza el producto punto entre "query" y "key" para obtener los scores de atención crudos/sin procesar
        attention_scores = torch.matmul(query_layer, key_layer.transpose(-1, -2))
        attention_scores = attention_scores / math.sqrt(self.d_k)

        # Se aplica máscara
        if mask is not None:
            attention_scores = attention_scores.masked_fill(mask == 0, float('-inf'))

        # Se normalizan los scores de atención a probabilidades
        attention_probs = nn.Softmax(dim=-1)(attention_scores)

        attention_probs = self.dropout(attention_probs)

        context_layer = torch.matmul(attention_probs, value_layer)

        context_layer = context_layer.permute(0, 2, 1, 3).contiguous() # (batch, seq_len, num_heads, d_k)
        new_context_layer_shape = context_layer.size()[:-2] + (self.d_model,) # (batch, seq_len, d_model)
        context_layer = context_layer.view(*new_context_layer_shape)

        outputs = (context_layer, attention_probs) if output_attentions else (context_layer,)

        return outputs

## Ejemplo uso

Instanciamos el módulo de Multi-Head Attention

In [None]:
att = MultiHeadedAttention(8, 768, 0.1)

Creamos un entrada con tamaño de lote de 8 y secuencias de 300 elementos aleatorios

In [None]:
# Se prepara la entrada
x = torch.rand(8, 300, 768)
mask = torch.ones((8, 300))

# Ejecutar módulo, se regresa la matriz de atención
output = att(query=x, key=x, value=x, mask=mask, output_attentions=True)

Embeddings de salida

In [None]:
print(output[0].shape)

torch.Size([8, 300, 768])


Matriz de atención

In [None]:
print(output[1].shape)

torch.Size([8, 8, 300, 300])


## Visualización de self-attention

In [None]:
%%capture
!pip install transformers
!pip install bertviz

In [None]:
from transformers import BertModel, BertTokenizer
from bertviz import head_view

In [None]:
bert = BertModel.from_pretrained('dccuchile/bert-base-spanish-wwm-cased')
bert_embeddings_layer = bert.embeddings
tokenizer = BertTokenizer.from_pretrained('dccuchile/bert-base-spanish-wwm-cased')

att = MultiHeadedAttention(8, 768, 0.1)

att.query.load_state_dict(bert.encoder.layer[0].attention.self.query.state_dict())
att.key.load_state_dict(bert.encoder.layer[0].attention.self.key.state_dict())
att.value.load_state_dict(bert.encoder.layer[0].attention.self.value.state_dict())

In [None]:
text_input = tokenizer(["El perro va caminando sobre el pasto"], return_tensors='pt')

In [None]:
input_ids = text_input['input_ids']
x = bert_embeddings_layer(input_ids)
mask = text_input['attention_mask']

output, attention_scores = att(query=x, key=x, value=x, mask=mask, output_attentions=True)

In [None]:
attention_scores.shape

torch.Size([1, 8, 10, 10])

In [None]:
input_id_list = input_ids.tolist()[0]
tokens = tokenizer.convert_ids_to_tokens(input_id_list)

In [None]:
head_view((attention_scores,)*12, tokens)

<IPython.core.display.Javascript object>