In [1]:
import torch

# Check if MPS (Metal Performance Shaders) is available
print("MPS available:", torch.backends.mps.is_available())

# Check if CUDA is available (it won't be on Mac)
print("CUDA available:", torch.cuda.is_available())

device = (
    "cuda" if torch.cuda.is_available() 
    else "mps" if torch.backends.mps.is_available() 
    else "cpu"
)

print("Using device:", device)

MPS available: True
CUDA available: False
Using device: mps


  cpu = _conversion_method_template(device=torch.device("cpu"))


In [2]:
import torch.nn as nn


class Attention(nn.Module):
    """Attention mechanism.
    Parameters
    ----------
    dim : int
        Input and output dimension of each input vector.
    n_heads : int
        Number of attention heads.

    Attributes
    ----------
    scale : float
        Normalizing constant for the dot product.
    qkv : nn.Linear
        Linear projection for the query, key and value.
    proj : nn.Linear
        Linear mapping that takes in the concatenated output of all attention
        heads and maps it into a new space.
    """
    def __init__(self, dim, n_heads=12, masked_attention=False, context_size=None):
        super().__init__()
        self.n_heads = n_heads
        self.dim = dim
        self.head_dim = dim // n_heads
        self.scale = self.head_dim ** -0.5

        self.qkv = nn.Linear(dim, dim * 3)

        #  We create a linear layer that takes a vector which contains
        #  d input elements and create a resulting vector with size d*3
        #  in which we store k, q and v.

        #  This is identical to:
        #  k = nn.Linear(dim, dim)
        #  q = nn.Linear(dim, dim)
        #  v = nn.Linear(dim, dim)
        #  self.qkv = torch.cat(q,k,v dim=-1)


        # last output layer to aggregate the information of the merged heads
        self.proj = nn.Linear(dim, dim)

        # for masked/causal attention
        self.masked_attention = masked_attention
        if masked_attention:
            self.mask = torch.tril(torch.ones(context_size, context_size)) \
                            .view(1, 1, context_size, context_size) \
                            .to(device)            # sets 1 if attention is allowed, 0 otherwise.
            # 10000
            # 11000
            # 11100
            # 11110
            # 11111


    def forward(self, x):
        """Run forward pass.
        Parameters
        ----------
        x : torch.Tensor
            Shape `(batchsize, number of input vectors, dim)`.
        Returns
        -------
        torch.Tensor
            Shape `(batchsize, number of input vectors, dim)`.
        """
        batchsize, n_input_vectors, dim = x.shape

        # if linear layer receives input with three dimension, its weight matrix
        # is duplicated for each vector
        qkv = self.qkv(x)  # output has dimension (batchsize, number input vectors, 3 * dim)


        # qkv calculates for all input vectors their key query and value vectors
        # We need to "cut out" the queries, keys and values
        # Simultaneously we also "cut out" the heads for multihead attention
        qkv = qkv.reshape(
                batchsize, n_input_vectors, 3, self.n_heads, self.head_dim
        )  # (batchsize, number input vectors, 3, n_heads, head_dim)


        # Later we will do matrix multiplication with 4D-tensors
        # We intentionally skipped this part in lecture but we need to
        # change the order of the dimensions so we can calculate the dot product
        # of all heads at the same time
        qkv = qkv.permute(
                2, 0, 3, 1, 4
        )  # (3, batchsize, n_heads, number input vectors, head_dim)


        # q contains the queries of all heads of all input vectors
        # same for k and v

        q, k, v = qkv[0], qkv[1], qkv[2]

        # transpose the last two dimension of keys
        k_t = k.transpose(-2, -1)  # (batchsize, n_heads, head_dim, number input vectors)


        # @ is the symbol for matrix multiplication
        # if you want to know how 4d matmul works: https://pytorch.org/docs/stable/generated/torch.matmul.html#torch.matmul
        # self.scale = 1 / root(head_dim)
        s_dot_product = (q @ k_t) * self.scale # (batchsize, n_heads, number input vectors, number input vectors)


        # Only used in Decoders that use masked attention, This sets all values to -inf if i > j
        if self.masked_attention:
            s_dot_product = s_dot_product.masked_fill(self.mask[:,:,:n_input_vectors, :n_input_vectors]== 0, float('-inf'))


        attn = s_dot_product.softmax(dim=-1)  # (batchsize, n_heads, number input vectors, number input vectors)

        weighted_avg = attn @ v  # (batchsize, n_heads, number input vectors, head_dim)


        # reverse order of number input vectors and n_heads ()
        weighted_avg = weighted_avg.transpose(
                1, 2
        )  # (batchsize, number input vectors, n_heads, head_dim)

        # Merge the heads back into one input vector
        weighted_avg = weighted_avg.flatten(2)  # (batchsize, number input vectors, dim)


        # Final output layer to aggreagte information of heads
        x = self.proj(weighted_avg)  # (batchsize, number input vectors, dim)

        return x

## create 10 input vectors each with size 800.
dummy_input = torch.rand(1,10, 800)

## create Multihead Attention Block
mha = Attention(dim=800, n_heads=8)

## feed dummy input through multihead attention
dummy_result = mha(dummy_input)
print(f"input shape was {dummy_input.shape}, output shape should be identical {dummy_result.shape}")

input shape was torch.Size([1, 10, 800]), output shape should be identical torch.Size([1, 10, 800])


In [3]:
class MLP(nn.Module):
    """Multilayer perceptron.

    Called Fully Connected Layers in lecture  (blue block inside the transformer block)

    Contains two fully connected layers.
        - the first fully connected layer quadruples vector size
        - the second fully connected layer decreases vector size back to original input size

    Parameters
    ----------
    dim : int
        Size of vectors.


    Attributes
    ----------
    fc : nn.Linear
        The First linear layer.
    act : nn.GELU
        GELU activation function.
    fc2 : nn.Linear
        The second linear layer.

    """
    def __init__(self, dim):
        super().__init__()
        self.fc1 = nn.Linear(in_features=dim, out_features=4*dim)
        self.act = nn.GELU()
        self.fc2 = nn.Linear(in_features=4*dim, out_features=dim)

    def forward(self, x):
        """Run forward pass.
        Parameters
        ----------
        x : torch.Tensor
            Shape `(batchsize, number input vectors, in_features)`.
        Returns
        -------
        torch.Tensor
            Shape `(batchsize, number input vectors, in_features)`
        """
        x = self.fc1(x)  # (batchsize, number input vectors, 4*dim)
        x = self.act(x)  # (batchsize, number input vectors, 4*dim)
        x = self.fc2(x)  # (batchsize, number input vectors, dim)
        return x

## create 10 input vectors each with size 800.
dummy_input = torch.rand(1,10, 800)

## create MLP Block
FC = MLP(dim=800)

## feed dummy input through fully connected layers
dummy_result = FC(dummy_input)
print(f"input shape was {dummy_input.shape}, output shape should be identical {dummy_result.shape}")

input shape was torch.Size([1, 10, 800]), output shape should be identical torch.Size([1, 10, 800])


In [4]:
class Block(nn.Module):
    """Transformer block.
    Parameters
    ----------
    dim : int
        Embedding dimension.
    n_heads : int
        Number of attention heads.

    Attributes
    ----------
    norm1, norm2 : LayerNorm
        Layer normalization.
    attn : Attention
        Attention module.
    mlp : MLP
        MLP module.
    """
    def __init__(self, dim, n_heads, is_decoder=False, context_size=None):
        super().__init__()
        ## Layer norm is (a-µ)/sigma
        ## it is possible that the standard deviation == 0 so we always add a tiny value (eps)
        self.norm1 = nn.LayerNorm(dim, eps=1e-6)

        # Multihead attention (can be masked if decoder)
        self.attn = Attention(
                dim,
                n_heads=n_heads,
                masked_attention = is_decoder,
                context_size=context_size
        )
        self.norm2 = nn.LayerNorm(dim, eps=1e-6)

        # the two fully connected layers
        self.mlp = MLP(
                dim=dim,
        )

    def forward(self, x):
        """Run forward pass.
        Parameters
        ----------
        x : torch.Tensor
            Shape `(batchsize, number input vectors, dim)`.
        Returns
        -------
        torch.Tensor
            Shape `(batchsize, number input vectors, dim)`.
        """

        # Two skip connections
        # We use pre-normalization here and normalize at the beginning
        # instead of after the attention/mlp part
        x = x + self.attn(self.norm1(x))
        x = x + self.mlp(self.norm2(x))

        return x

## create 10 input vectors each with size 800.
dummy_input = torch.rand(1,10, 800)

## create transformer block
transformer_block_1 = Block(dim=800, n_heads=4)
## and lets create another transformer Block
transformer_block_2 = Block(dim=800, n_heads=8)

## feed dummy input through both stacked transformer blocks
dummy_result = transformer_block_2(transformer_block_1(dummy_input))

print(f"input shape was {dummy_input.shape}, output shape should be identical {dummy_result.shape}")

input shape was torch.Size([1, 10, 800]), output shape should be identical torch.Size([1, 10, 800])


In [5]:
class PatchEmbed(nn.Module):
    """Split image into patches and then embed them.
    Parameters
    ----------
    patch_size : int
        Size of the patch (it is a square).
    embed_dim : int
        The emmbedding dimension. --> how many values should our input vector have
    Attributes
    ----------
    proj : nn.Conv2d
        Convolutional layer that does both the splitting into patches
        and their embedding.
    """
    def __init__(self, patch_size, embed_dim=768):
        super().__init__()
        self.patch_size = patch_size

        """
        Notice: Stride == kernel size
        This cuts out a patch and then uses the convolutional Layer to create a d-size vector
        """
        self.proj = nn.Conv2d(
                in_channels=3, # 3 input channels because image is RGB
                out_channels=embed_dim,
                kernel_size=patch_size,
                stride=patch_size,
        )

    def forward(self, x):
        """Run forward pass.
        Parameters
        ----------
        x : torch.Tensor
            Shape `(batchsize, in_chans, img_size, img_size)`.
        Returns
        -------
        torch.Tensor
            Shape `(batchsize, n_patches, embed_dim)`.
        """
        x = self.proj(x)  # (batchsize, embed_dim, n_patches ** 0.5, n_patches ** 0.5)

        """
        Create a 1-D vector of each patch
        """
        x = x.flatten(2)  # (batchsize, embed_dim, n_patches)
        x = x.transpose(1, 2)  # (batchsize, n_patches, embed_dim)
        return x

In [6]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset
from torch.utils.data.dataloader import DataLoader
from torch.nn import functional as F
from urllib.request import urlopen
from tqdm import tqdm

class VisionAndLanguageTransformer(nn.Module):
    def __init__(
            self,
            context_size= 384,
            patch_size=16,
            n_classes=1000,
            embed_dim=768,
            depth=12,
            n_heads=12,
            is_ViT = True,
    ):
        super().__init__()

        self.is_ViT = is_ViT
        self.is_LM = not is_ViT


        if self.is_ViT:
            self.patch_embed = PatchEmbed(
                    patch_size=patch_size,
                    embed_dim=embed_dim,
            )

            # create a zeroth vector with the same size as the patch vectors
            # but with trainable values
            # initialize with zeros
            self.cls_token = nn.Parameter(torch.zeros(1, 1, embed_dim))

            # create positional embedding
            # start with zeros and learn what they should be
            n_patches = (context_size // patch_size)**2
            self.context_size = n_patches + 1 # each patch + one cls token

        if self.is_LM:
                # we predict each token, so that is our vocab size
                self.token_embed = nn.Embedding(n_classes, embed_dim)
                self.context_size = context_size

        self.pos_embed = nn.Parameter(
                torch.zeros(1, context_size, embed_dim)
        )

        # stack as many blocks as defined in input "depth"
        self.blocks = nn.ModuleList(
            [
                Block(
                    dim=embed_dim,
                    n_heads=n_heads,
                    is_decoder=self.is_LM,
                    context_size=context_size
                )
                for _ in range(depth)
            ]
        )

        # one final layer norm at the end of the transformer
        self.norm = nn.LayerNorm(embed_dim, eps=1e-6)

        # final fully connected layer that takes the first output vector
        # and predicts values for all classes
        self.head = nn.Linear(embed_dim, n_classes)


    def forward(self, x, targets=None):
        """Run the forward pass.
        Parameters
        ----------
        x : torch.Tensor
            Shape `(batchsize, in_chans, img_size, img_size)`.
        Returns
        -------
        logits : torch.Tensor
            Logits over all the classes - `(batchsize, n_classes)`.
        """
        batchsize = x.shape[0]

        # create patches from input image

        if self.is_ViT:
            x = self.patch_embed(x)

            # only necessary because we train with batches
            cls_token = self.cls_token.expand(
                    batchsize, -1, -1
            )  # (batchsize, 1, embed_dim)

            # put the the class token vector on the zeroth position
            x = torch.cat((cls_token, x), dim=1)  # (batchsize, 1 + n_patches, embed_dim)

        if self.is_LM:
            # (batchsize, n, embed_dim)
            x = self.token_embed(x)

        # add the positional encoding ontop of our input values

        number_tokens = x.shape[1]
        x = x + self.pos_embed[:,:number_tokens,:]  # (batchsize, 1 + n_patches, embed_dim)

        # feed input through all transformer blocks
        for block in self.blocks:
            x = block(x)

        # Final normalization of the output
        x = self.norm(x)

        if self.is_ViT:
            # take the vector that is on the zeroth position
            x = x[:, 0]
        else:
            # for generation we will take the last vector
            # but in pretraining we will use all vectors
            pass


        # feed it throuh a fully connected layer
        x = self.head(x)

        if targets is not None:
            loss = F.cross_entropy(x.view(-1, x.size(-1)), targets.view(-1), ignore_index=-1)
            return x, loss

        return x

    def generate(self, prompt):
        while prompt.shape[1] < self.context_size:
            logits = self(prompt)
            # take vector furthest right
            logits = logits[:,-1,:] / 0.8 # use 0.8 temperature
            # Top-K decoding take the 20 most likely predictions
            v, _ = torch.topk(logits, 20)

            # set prediction for all other logits to -inf
            logits[logits < v[:, [-1]]] = -float('Inf')

            probs = F.softmax(logits, dim=-1)

            # sample from distribution
            next_token = torch.multinomial(probs, num_samples=1)

            # append token to prompt

            prompt = torch.cat((prompt, next_token), dim=1)

        return prompt

In [7]:
# We won't use BPE so each symbol in text is a token

class TextDataset(Dataset):
    def __init__(self, english=True):
        # We can use the books of shakespeare for training
        # or books from german authors
        if english:
            resource_shakespeare  = urlopen("https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt")
            self.train_text =  resource_shakespeare.read().decode('utf-8')
            print(f"dataset has {len(self.train_text)} characters")
            print("random part of shakespeare \n",self.train_text[314825:315270])

        else:
            urls = [ 
                    # The Adventures of Sherlock Holmes
                    "https://www.gutenberg.org/files/1661/1661-0.txt"
                    # GÖTHE
                    #"https://gutenberg.org/files/2229/2229-0.txt", # Faust 1
                    #"https://www.gutenberg.org/cache/epub/2407/pg2407.txt", # Leiden des Werther
                    #"https://www.gutenberg.org/cache/epub/2230/pg2230.txt",  # Faust 2
                    #"https://www.gutenberg.org/cache/epub/2321/pg2321.txt", # Götz von Berlichingen
                    #"https://www.gutenberg.org/cache/epub/10425/pg10425.txt", # Torquato Tasso
                    #"https://www.gutenberg.org/cache/epub/2054/pg2054.txt", # Iphigenie auf Tauris
                    #"https://www.gutenberg.org/cache/epub/8565/pg8565.txt", # Viele deutsche Gedichte inkl. Erlkönig
                    # Schiller
                    #"https://www.gutenberg.org/cache/epub/47804/pg47804.txt", # Die Räuber
                    #"https://www.gutenberg.org/cache/epub/6498/pg6498.txt", # Kabale und Liebe
                    #"https://www.gutenberg.org/files/6518/6518-0.txt", # Wallensteins Lager
                    #"https://www.gutenberg.org/cache/epub/6525/pg6525.txt", # Piccolomini
                    #"https://www.gutenberg.org/cache/epub/6549/pg6549.txt", # Wallensteins Tod
                    ]
            self.train_text = ""


            for url in urls:
                book = urlopen(url)
                book = book.read().decode('utf-8')
                self.train_text += book

            # Holmes part
            print("Part of our dataset: \n",self.train_text[1501:2000])
            # Faust part
            #print("Part of our dataset: \n",self.train_text[16701:17073])

        letters = set(self.train_text)
        letters = sorted(letters)

        print("\nConvert these letters into tokens ", letters)
        self.vocab_size = len(letters)
        print(f"\nThe text contains {self.vocab_size} characters which we will use as vocabulary")


        self.letter_to_token = {k: v for v, k in enumerate(list(letters))}
        self.token_to_letter = {v: k for v, k in enumerate(list(letters))}

    def text_to_tokens(self, text):

        tokens = []
        for i in text:
            tokens.append(self.letter_to_token[i])
        return torch.LongTensor(tokens)


    def __getitem__(self, idx):
        # always return 256 tokens

        input = self.text_to_tokens(self.train_text[idx:idx+256])
        label = self.text_to_tokens(self.train_text[idx+1:idx+256+1])
        return input, label

    def __len__(self):
        return len(self.train_text) - 256 - 1

dataset = TextDataset(english=False)

Part of our dataset: 
 I. A SCANDAL IN BOHEMIA


I.

To Sherlock Holmes she is always _the_ woman. I have seldom heard him
mention her under any other name. In his eyes she eclipses and
predominates the whole of her sex. It was not that he felt any emotion
akin to love for Irene Adler. All emotions, and that one particularly,
were abhorrent to his cold, precise but admirably balanced mind. He
was, I take it, the most perfect reasoning and observing machine that
the world has seen, but as a lover he would h

Convert these letters into tokens  ['\n', '\r', ' ', '!', '#', '$', '%', '&', '(', ')', '*', ',', '-', '.', '/', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ':', ';', '?', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '[', ']', '_', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', '£', '½', 'à', 'â', 'æ', 'è', 'é',

In [8]:
# Initialize your model and move it to the chosen device
gpt = VisionAndLanguageTransformer(
    context_size=256,   # Sequence length: how many characters the model sees at once
                        # Shorter sequences = less memory usage, faster training
    n_classes=dataset.vocab_size,  # Number of possible output tokens (characters)
    embed_dim=384,      # Embedding dimension: size of hidden representation for each token
                        # Smaller = fits on Mac RAM; larger = more expressive but slower
    is_ViT=False,       # Whether to use Vision Transformer-style patching (not needed for text-only)
    depth=3             # Number of transformer blocks/layers
).to(device)


dataloader = DataLoader(dataset, batch_size=128, # Changed batch size from 256 to 128
                        shuffle=True, num_workers=0)  # use 0 on Mac

In [9]:
def generate_text(text, model, dataset):
    device = next(model.parameters()).device  # get the device of the model
    model.eval()
    
    def token_to_text(tokens):
        if tokens.ndim > 1:
            tokens = tokens[0]  # take first batch
        return [dataset.token_to_letter[int(t)] for t in tokens]
    
    with torch.no_grad():
        # move input to same device as model
        input_tensor = dataset.text_to_tokens(text).unsqueeze(0).to(device)
        predicted_tokens = model.generate(input_tensor)
        predicted_text = token_to_text(predicted_tokens)
    
    return "".join(predicted_text)

In [None]:
class Trainer():
    def __init__(self, model, dataloader, device):

        self.device = device

        # Compile the model (PyTorch 2.0)
        compiled_model = torch.compile(model.to(device))
        compiled_model.train()

        optim = torch.optim.AdamW(compiled_model.parameters(), lr=5e-4)
        num_epochs = 1

        # Use GradScaler only for CUDA
        use_amp = device.startswith("cuda")
        scaler = torch.cuda.amp.GradScaler() if use_amp else None

        for epoch in range(num_epochs):
            pbar = tqdm(dataloader)
            for idx, (x, y) in enumerate(pbar):

                x = x.to(device)
                y = y.to(device)

                sliding_loss = 0

                # Mixed precision if CUDA, else normal float
                autocast_context = torch.autocast(device_type='cuda', dtype=torch.float16) if use_amp else torch.autocast(device_type=device)
                with autocast_context:
                    pred, loss = compiled_model(x, y)

                if use_amp:
                    scaler.scale(loss).backward()
                    scaler.step(optim)
                    scaler.update()
                else:
                    loss.backward()
                    optim.step()

                compiled_model.zero_grad(set_to_none=True)

                sliding_loss += loss.detach().item()
                if idx % 50 == 0:
                    sliding_loss *= 100
                    pbar.display(f'\n\n\nepoch: {epoch+1}, step: {idx}, loss {sliding_loss:.2f}\n'
                                 + "Generated text: " + generate_text("To Sherlock Holmes", compiled_model, dataloader.dataset))
                    sliding_loss = 0

        # Save model
        torch.save({
            'epoch': epoch,
            'model_state_dict': compiled_model.state_dict(),
            'optimizer_state_dict': optim.state_dict(),
            'loss': loss,
        }, "my_gpt.pt")


# Usage:
Trainer(gpt, dataloader, device)

  0%|          | 0/4637 [00:00<?, ?it/s]W0201 17:07:58.231000 53303 torch/_inductor/utils.py:1679] [0/0] Not enough SMs to use max_autotune_gemm mode



epoch: 1, step: 0, loss 474.83
  1%|          | 45/4637 [00:32<46:16,  1.65it/s]                                                                                                                                                                                                                                                                 

In [None]:
loaded_model = VisionAndLanguageTransformer(context_size=256,
                                     n_classes=dataset.vocab_size,
                                     embed_dim = 384,
                                    is_ViT=False,
                                    depth=4).to(device)

lm = torch.compile(loaded_model)
checkpoint = torch.load("my_gpt.pt")
lm.load_state_dict(checkpoint['model_state_dict'])

lm.eval()
text = "To Sherlock Holmes"
pred_text = generate_text(text, lm, dataset)
print(pred_text)

FAUST. nsn sl  d,iecrteh ucnie eh sersieidegcna ehns t
air inn emeheht l
ann,
sicsl za Gerrr ues erehen ahrhrnsan, ri di s tah is is ars icheneach t g.
Schcha hakun leml sit d da ml
eh  Stehnetde h
anisat d Wehenn anehid
Und,
Dirabeicndemdeheh ain 
