In [1]:
%pip install einops xformers np

Note: you may need to restart the kernel to use updated packages.


In [3]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn.modules import ModuleList
from torch.nn.modules.normalization import LayerNorm
from torch import nn

import copy
import math

In [4]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cpu')

In [5]:
def _get_clones(module, n):
    return ModuleList([copy.deepcopy(module) for i in range(n)])

In [6]:
class Conv1D(nn.Module):
    def __init__(self, nx, nf):
        '''
        nx: Numero de datos de entrada
        nf: Numero de filtros. (Canales de salida).
        '''
        super().__init__()
        self.nf = nf
        #Inicializando matriz vacia de pesos del tamaño (nx)X(nf)
        w = torch.empty(nx,nf)
        #Calculando los pesos con una distribución normal.
        nn.init.normal_(w, std=0.02)
        #Calculando los pesos y sesgos encodeandos usando nn.Parameter.
        self.weight = nn.Parameter(w)
        self.bias = nn.Parameter(torch.zeros(nf))

    def forward(self, x):
        '''x:Tensor de entrada.'''
        #El tamaño de la salida es la suma de la segunda dimensionm de X y el numero de filtros nf.
        size_out = x.size()[:-1] + (self.nf,)
        #Producto punto Q,K(Transpuesta) y V
        x = torch.addmm(self.bias, x.view(-1, x.size(-1)), self.weight) # x.view ayuda a calcular la transpuesta.
        x = x.view(*size_out)
        return x

In [7]:
class FeedForward(nn.Module):
    def __init__(self, dropout, d_model=768, nx=768*4):
        super().__init__()
        self.c_fc     = Conv1D(d_model, nx)
        self.c_proj   = Conv1D(nx, d_model)
        self.act      = F.gelu
        self.dropout  = nn.Dropout(dropout)

    def forward(self, x):
        return self.dropout(self.c_proj(self.act(self.c_fc(x))))

In [8]:
class Attention(nn.Module):
    def __init__(self, d_model=768, n_head=12, n_ctx=1024, d_head=64, bias=True, scale=False):
        '''Función de construcción
        Params:
        d_model: Dimensión que necesita ser ingresada en el modelo.
        n_head: La cantidad de heads de atención
        n_ctx: Buffer para guardar los registros del sesgo.
        scale: Escalar y estabilidad númerica (sqrt(dk))
        '''
        super().__init__()
        self.n_head = n_head
        self.d_model = d_model
        self.c_attn = Conv1D(d_model, d_model*3)
        self.scale = scale
        self.softmax = nn.Softmax(dim=-1)
        self.register_buffer("bias", torch.tril(torch.ones(n_ctx, n_ctx)).view(1, 1, n_ctx, n_ctx))
        self.dropout = nn.Dropout(0.1)
        self.c_proj = Conv1D(d_model, d_model)

    def split_heads(self,x):
        """
        Dividiend en la cantidad de heads y retornando.
        return shape ['Barch', 'head', 'sequence', 'features']
        """
        new_shape = x.size()[:-1] + (self.n_head, x.size(-1)//self.n_head)
        x = x.view(*new_shape)
        return x.permute(0,2,1,3)

    def _attn(self, q, k, v, attn_mask=None):
        """Función de atención principal.
        Que calcula usando la formula de producto de punto de atención."""
        scores = torch.matmul(q, k.transpose(-2,-1)) #producto punto de Q*K(t)
        if self.scale: scores = scores/math.sqrt(v.size(-1)) #escalandolo por sqrt(dk)
        nd, ns = scores.size(-2), scores.size(-1)
        if attn_mask is not None: scores = scores + attn_mask
        scores = self.softmax(scores)
        scores = self.dropout(scores)
        output = torch.matmul(scores, v)
        return output

    def merge_heads(self,x):
        x = x.permute(0,2,1,3).contiguous()
        new_shape = x.size()[:-2] + (x.size(-2)*x.size(-1),)
        return x.view(*new_shape)

    def forward(self, x):
        '''Función de para calcular atención, separar los heads y combinarlos de nuevos.'''
        x = self.c_attn(x)
        query, key, value = x.split(self.d_model, dim=2)
        query, key, value = self.split_heads(query), self.split_heads(key), self.split_heads(value)
        out = self._attn(query, key, value)
        out = self.merge_heads(out)
        out = self.c_proj(out)
        return out

In [9]:
class TransformerBlock(nn.Module):
    def __init__(self, d_model=768, n_head=12, dropout=0.1):
        super(TransformerBlock, self).__init__()
        self.attn = Attention(d_model=768, n_head=12, d_head=64, n_ctx=1024, bias=True, scale=False)
        self.feedforward = FeedForward(dropout=0.1, d_model=768, nx=768*4)
        self.ln_1 = LayerNorm(d_model)
        self.ln_2 = LayerNorm(d_model)

    def forward(self, x):
        x = x + self.attn(self.ln_1(x))
        x = x + self.feedforward(self.ln_2(x))
        return x

In [None]:
class GPT2(nn.Module):
    def __init__(self, nlayers=12, n_ctx=1024, d_model=768, vob_size=50257): # GPT-3 usa nlayers=96, n_ctx=2048, d_model=12288
        '''nlayer: La cantidad de veces que queremos multiplicar el transformer.
        n_ctx: El contexto, la cantidad total de tokens que puede ver en el pasado de las palabras.
        d_model: Dimesiones del modelo
        vob_size: Tamaño del vocabulario usado en el entrenamiento.'''
        super(GPT2, self).__init__()
        self.nlayers = nlayers
        block = TransformerBlock(d_model=768, n_head=12, dropout=0.1)
        self.h = _get_clones(block, 12)
        self.wte = nn.Embedding(vob_size, d_model)
        self.wpe = nn.Embedding(n_ctx, d_model)
        self.drop = nn.Dropout(0.1)
        self.ln_f = LayerNorm(d_model)
        self.out = nn.Linear(d_model, vob_size, bias=False)
        self.loss_fn = nn.CrossEntropyLoss()
        self.init_weights()

    def init_weights(self):
        '''Incilizacion de los pesos'''
        self.out.weight = self.wte.weight
        self.apply(self._init_weights)

    def _init_weights(self, module):
        '''Iniciliazación con la medida y S.D.'''
        if isinstance(module, (nn.Linear, nn.Embedding, Conv1D)):
            module.weight.data.normal_(mean=0.0, std=0.02)
            if isinstance(module, (nn.Linear, Conv1D)) and module.bias is not None:
                '''Data Bias Zero'''
                module.bias.data.zero_()
        elif isinstance(module, nn.LayerNorm):
            module.bias.data.zero_()
            module.weight.data.fill_(1.0)

    def forward(self, src, labels=None, pos_ids=None):
        '''Añadir el embedding posicional, dropping y añadiendo los inputs usados por la función
        de perdida y finalmente añadiendo la salida y la peridida.'''

        if pos_ids is None:
            pos_ids = torch.arange(0, src.size(-1)).unsqueeze(0)

        pos_ids = pos_ids.to(src.device)

        inp = self.drop((self.wte(src) + self.wpe(pos_ids)))
        for i in range(self.nlayers): inp = self.h[i](inp)

        inp = self.ln_f(inp)
        logits = self.out(inp)

        outputs = (logits,) + (inp,)

        if labels is not None:
            shift_logits = logits[..., :-1, :].contiguous()
            shift_labels = labels[..., 1:].contiguous()
            loss = self.loss_fn(shift_logits.view(-1, shift_logits.size(-1)), shift_labels.view(-1))
            outputs = (loss,) + outputs
            return loss.mean()

        return outputs

In [11]:
import torch.nn.functional as F
import time
from transformers import GPT2Tokenizer

In [14]:
model = GPT2()

In [15]:
import urllib.request

url = "https://s3.amazonaws.com/models.huggingface.co/bert/gpt2-pytorch_model.bin"
output = "gpt2-pytorch_model.bin"
urllib.request.urlretrieve(url, output)
print("Descarga completada:", output)

Descarga completada: gpt2-pytorch_model.bin


In [16]:
model_dict = model.state_dict()
state_dict = torch.load("./gpt2-pytorch_model.bin", weights_only=False)

old_keys = []
new_keys = []

for key in state_dict.keys():
    if "mlp" in key:
        new_key = key.replace("mlp", "feedforward")
        new_keys.append(new_key)
        old_keys.append(key)

In [17]:
for old_key, new_key in zip(old_keys, new_keys):
    state_dict[new_key] = state_dict.pop(old_key)

In [18]:
pretrained_dict = {k: v for k, v in state_dict.items() if k in model_dict}

model_dict.update(pretrained_dict)
model.load_state_dict(model_dict)
model.eval()

GPT2(
  (h): ModuleList(
    (0-11): 12 x TransformerBlock(
      (attn): Attention(
        (c_attn): Conv1D()
        (softmax): Softmax(dim=-1)
        (dropout): Dropout(p=0.1, inplace=False)
        (c_proj): Conv1D()
      )
      (feedforward): FeedForward(
        (c_fc): Conv1D()
        (c_proj): Conv1D()
        (dropout): Dropout(p=0.1, inplace=False)
      )
      (ln_1): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
      (ln_2): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
    )
  )
  (wte): Embedding(50257, 768)
  (wpe): Embedding(1024, 768)
  (drop): Dropout(p=0.1, inplace=False)
  (ln_f): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
  (out): Linear(in_features=768, out_features=50257, bias=False)
  (loss_fn): CrossEntropyLoss()
)

In [19]:
total_params = sum(p.numel() for p in model.parameters())

In [20]:
size_bytes = total_params * 4
size_mb = size_bytes / (1024 ** 2)

print(f"El tamaño total de GPT2 sin alteraciones es: {size_bytes} bytes o {size_mb:.2f} MB")

El tamaño total de GPT2 sin alteraciones es: 497759232 bytes o 474.70 MB


In [21]:
tokenizer = GPT2Tokenizer.from_pretrained("gpt2")
context = torch.tensor([tokenizer.encode("The planet earth is a beautiful")])

tokenizer_config.json:   0%|          | 0.00/26.0 [00:00<?, ?B/s]

To support symlinks on Windows, you either need to activate Developer Mode or to run Python as an administrator. In order to activate developer mode, see this article: https://docs.microsoft.com/en-us/windows/apps/get-started/enable-your-device-for-development


vocab.json:   0%|          | 0.00/1.04M [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.36M [00:00<?, ?B/s]

config.json:   0%|          | 0.00/665 [00:00<?, ?B/s]

In [22]:
def generate(context, ntok=550):
    start_time = time.time()

    for _ in range(ntok):
        out = model(context)
        logits = out[0][:, -1, :]
        indices_to_remove = logits < torch.topk(logits, 10)[0][..., -1, None]
        logits[indices_to_remove] = -np.inf
        next_tok = torch.multinomial(F.softmax(logits, dim=-1), num_samples=1).squeeze(1)
        context = torch.cat([context, next_tok.unsqueeze(-1)], dim=1)

    end_time = time.time()
    inference_time = end_time - start_time
    return context, inference_time

In [23]:
out, inference_time = generate(context, ntok=40)
decoded_output = tokenizer.decode(out[0])

In [24]:
print(f"Inference Time: {inference_time:.4f} seconds")
print(f"Generated Output: {decoded_output}")

Inference Time: 3.1711 seconds
Generated Output: The planet earth is a beautiful place where people of your choice. Earth in your where they've a a, in you are a in an is Your

<|endoftext|>" a user "I say .the name"isplanet a


[REGRESAR](../01_Construccion_de_GPT-2.md)