In [1]:
## First, check to see if lightning is installed, if not, install it.
import pip
try:
  __import__("lightning")
except ImportError:
  pip.main(['install', "lightning"])  

import torch ## torch let's us create tensors and also provides helper functions
import torch.nn as nn ## torch.nn gives us nn.Module(), nn.Embedding() and nn.Linear()
import torch.nn.functional as F # This gives us the softmax() and argmax()
from torch.optim import Adam ## We will use the Adam optimizer, which is, essentially, 
                             ## a slightly less stochastic version of stochastic gradient descent.
from torch.utils.data import TensorDataset, DataLoader ## We'll store our data in DataLoaders

import lightning as L ## Lightning makes it easier to write, optimize and scale our code

In [2]:
class PositionEncoding(nn.Module):
    
    def __init__(self, d_model=2, max_len=6):
        
        super().__init__()
        
        pe = torch.zeros(max_len, d_model)
        
        position = torch.arange(start=0, end=max_len, step=1).float().unsqueeze(1)
        embedding_index = torch.arange(start=0, end=d_model, step=2).float()
        
        div_term = 1/torch.tensor(10000.0)**(embedding_index / d_model)
        

        pe[:, 0::2] = torch.sin(position * div_term) 
        pe[:, 1::2] = torch.cos(position * div_term) 
        
        self.register_buffer('pe', pe) 

        
    def forward(self, word_embeddings):
        
        return word_embeddings + self.pe[:word_embeddings.size(0), :] 

In [3]:
class Attention(nn.Module): 
    
    def __init__(self, d_model=2):
        
        super().__init__()
        
        self.W_q = nn.Linear(in_features=d_model, out_features=d_model, bias=False)
        self.W_k = nn.Linear(in_features=d_model, out_features=d_model, bias=False)
        self.W_v = nn.Linear(in_features=d_model, out_features=d_model, bias=False)
        
        self.row_dim = 0
        self.col_dim = 1

        
    def forward(self, encodings_for_q, encodings_for_k, encodings_for_v, mask=None):

        q = self.W_q(encodings_for_q)
        k = self.W_k(encodings_for_k)
        v = self.W_v(encodings_for_v)

        sims = torch.matmul(q, k.transpose(dim0=self.row_dim, dim1=self.col_dim))

        scaled_sims = sims / torch.tensor(k.size(self.col_dim)**0.5)

        if mask is not None:
            mask = mask.to(scaled_sims.device)  # Move mask to the same device as scaled_sims
            scaled_sims = scaled_sims.masked_fill(mask=mask, value=-1e9)
        
        attention_percents = F.softmax(scaled_sims, dim=self.col_dim)
        attention_scores = torch.matmul(attention_percents, v)
        
        return attention_scores

In [4]:
class DecoderOnlyTransformer(L.LightningModule):
    
    def __init__(self, num_tokens=4, d_model=2, max_len=6):
        
        super().__init__()
        
        L.seed_everything(seed=42)
        
        self.we = nn.Embedding(num_embeddings=num_tokens, 
                               embedding_dim=d_model)     
        self.pe = PositionEncoding(d_model=d_model, 
                                   max_len=max_len)
        self.self_attention = Attention(d_model=d_model)
        self.fc_layer = nn.Linear(in_features=d_model, out_features=num_tokens)
        
        self.loss = nn.CrossEntropyLoss()
        
        
    def forward(self, token_ids):
                
        word_embeddings = self.we(token_ids)        
        position_encoded = self.pe(word_embeddings)
        
        mask = torch.tril(torch.ones((token_ids.size(dim=0), token_ids.size(dim=0))))
        mask = mask == 0
        
        self_attention_values = self.self_attention(position_encoded, 
                                                    position_encoded, 
                                                    position_encoded, 
                                                    mask=mask)
                
        residual_connection_values = position_encoded + self_attention_values        
        fc_layer_output = self.fc_layer(residual_connection_values)
        
        return fc_layer_output
    
  
    def configure_optimizers(self): 
        return Adam(self.parameters(), lr=0.1)
    
    
    def training_step(self, batch, batch_idx): 
        input_tokens, labels = batch # collect input
        output = self.forward(input_tokens[0])
        loss = self.loss(output, labels[0])
                    
        return loss

In [5]:
## first, we create a dictionary that maps vocabulary tokens to id numbers...
token_to_id = {'what' : 0,
               'is' : 1,
               'apple' : 2,
               'healthy': 3,
               '<EOS>' : 4, ## <EOS> = end of sequence
              }
id_to_token = dict(map(reversed, token_to_id.items()))
inputs = torch.tensor([[token_to_id["what"], ## input #1: what is statquest <EOS> awesome
                        token_to_id["is"], 
                        token_to_id["apple"], 
                        token_to_id["<EOS>"],
                        token_to_id["healthy"]], 
                       
                       [token_to_id["apple"], # input #2: statquest is what <EOS> awesome
                        token_to_id["is"], 
                        token_to_id["what"], 
                        token_to_id["<EOS>"], 
                        token_to_id["healthy"]]])

## NOTE: Because we are using a Decoder-Only Transformer the outputs, or
##       the predictions, are the input questions (minus the first word) followed by 
##       <EOS> awesome <EOS>.  The first <EOS> means we're done processing the input question
##       and the second <EOS> means we are done generating the output.
##       See the illustration above for more details.
labels = torch.tensor([[token_to_id["is"], 
                        token_to_id["apple"], 
                        token_to_id["<EOS>"], 
                        token_to_id["healthy"], 
                        token_to_id["<EOS>"]],  
                       
                       [token_to_id["is"], 
                        token_to_id["what"], 
                        token_to_id["<EOS>"], 
                        token_to_id["healthy"], 
                        token_to_id["<EOS>"]]])

## Now let's package everything up into a DataLoader...
dataset = TensorDataset(inputs, labels) 
dataloader = DataLoader(dataset)

In [6]:
model = DecoderOnlyTransformer(num_tokens=len(token_to_id), d_model=2, max_len=6)

## Now create the input for the transformer...
model_input = torch.tensor([token_to_id["what"], 
                            token_to_id["is"], 
                            token_to_id["apple"], 
                            token_to_id["<EOS>"]])
input_length = model_input.size(dim=0)

Seed set to 42


In [8]:
trainer = L.Trainer(max_epochs=30)
trainer.fit(model, train_dataloaders=dataloader)

You are using the plain ModelCheckpoint callback. Consider using LitModelCheckpoint which with seamless uploading to Model registry.
GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs
C:\Users\Ltx\anaconda3\envs\dl_gpu\Lib\site-packages\lightning\pytorch\trainer\connectors\logger_connector\logger_connector.py:76: Starting from v1.9.0, `tensorboardX` has been removed as a dependency of the `lightning.pytorch` package, due to potential conflicts with other packages in the ML ecosystem. For this reason, `logger=True` will use `CSVLogger` as the default logger, unless the `tensorboard` or `tensorboardX` packages are found. Please `pip install lightning[extra]` or one of them to enable TensorBoard support by default
You are using a CUDA device ('NVIDIA GeForce RTX 3050 Laptop GPU') that has Tensor Cores. To properly utilize them, you should set `torch.set_float32_matmul_precision('medium' | 'high')` which will trade-off precis

Training: |                                                                                      | 0/? [00:00<…

`Trainer.fit` stopped: `max_epochs=30` reached.


In [23]:
model_input = torch.tensor([token_to_id["apple"], 
                            token_to_id["what"], 
                            token_to_id["is"], 
                            token_to_id["<EOS>"]])
input_length = model_input.size(dim=0)

predictions = model(model_input) 
predicted_id = torch.tensor([torch.argmax(predictions[-1,:])])
predicted_ids = predicted_id
max_length = 6
for i in range(input_length, max_length):
    if (predicted_id == token_to_id["<EOS>"]): # if the prediction is <EOS>, then we are done
        break
    
    model_input = torch.cat((model_input, predicted_id))
    
    predictions = model(model_input) 
    predicted_id = torch.tensor([torch.argmax(predictions[-1,:])])
    predicted_ids = torch.cat((predicted_ids, predicted_id))
        
print("Predicted Tokens:\n") 
for id in predicted_ids: 
    print("\t", id_to_token[id.item()])

Predicted Tokens:

	 healthy
	 <EOS>


In [25]:
!pip install transformers

Collecting transformers
  Downloading transformers-4.50.0-py3-none-any.whl.metadata (39 kB)
Collecting regex!=2019.12.17 (from transformers)
  Downloading regex-2024.11.6-cp312-cp312-win_amd64.whl.metadata (41 kB)
Collecting tokenizers<0.22,>=0.21 (from transformers)
  Downloading tokenizers-0.21.1-cp39-abi3-win_amd64.whl.metadata (6.9 kB)
Collecting safetensors>=0.4.3 (from transformers)
  Downloading safetensors-0.5.3-cp38-abi3-win_amd64.whl.metadata (3.9 kB)
Downloading transformers-4.50.0-py3-none-any.whl (10.2 MB)
   ---------------------------------------- 0.0/10.2 MB ? eta -:--:--
   ----------------------- ---------------- 6.0/10.2 MB 33.5 MB/s eta 0:00:01
   ---------------------------------------- 10.2/10.2 MB 35.2 MB/s eta 0:00:00
Downloading regex-2024.11.6-cp312-cp312-win_amd64.whl (273 kB)
Downloading safetensors-0.5.3-cp38-abi3-win_amd64.whl (308 kB)
Downloading tokenizers-0.21.1-cp39-abi3-win_amd64.whl (2.4 MB)
   ---------------------------------------- 0.0/2.4 MB ? et



In [9]:
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import AutoTokenizer

# Load a tokenizer (example: BERT tokenizer)
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")

# Read large text data
with open("data.txt", "r", encoding="utf-8") as f:
    lines = f.readlines()


In [10]:
# Tokenize and convert to tensor
tokenized_inputs = tokenizer(lines, padding="max_length", truncation=True, return_tensors="pt")
    

In [11]:
class TextDataset(Dataset):
    def __init__(self, tokenized_data):
        self.data = tokenized_data["input_ids"]  # Tokenized inputs
        self.labels = self.data.clone()  # Usually, labels are the same for next-word prediction

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx], self.labels[idx]

# Create dataset and dataloader
dataset = TextDataset(tokenized_inputs)
dataloader = DataLoader(dataset, batch_size=8, shuffle=True)


In [12]:
print(dataloader)

<torch.utils.data.dataloader.DataLoader object at 0x0000022953239CA0>


In [16]:
model = DecoderOnlyTransformer(num_tokens=len(token_to_id), d_model=2, max_len=6)


Seed set to 42


RuntimeError: CUDA error: device-side assert triggered
CUDA kernel errors might be asynchronously reported at some other API call, so the stacktrace below might be incorrect.
For debugging consider passing CUDA_LAUNCH_BLOCKING=1
Compile with `TORCH_USE_CUDA_DSA` to enable device-side assertions.


In [14]:
trainer = L.Trainer(max_epochs=30, accelerator="cpu")
trainer.fit(model, train_dataloaders=dataloader)

You are using the plain ModelCheckpoint callback. Consider using LitModelCheckpoint which with seamless uploading to Model registry.
GPU available: True (cuda), used: False
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs
C:\Users\Ltx\anaconda3\envs\dl_gpu\Lib\site-packages\lightning\pytorch\trainer\setup.py:177: GPU available but not used. You can set it by doing `Trainer(accelerator='gpu')`.


RuntimeError: CUDA error: device-side assert triggered
CUDA kernel errors might be asynchronously reported at some other API call, so the stacktrace below might be incorrect.
For debugging consider passing CUDA_LAUNCH_BLOCKING=1
Compile with `TORCH_USE_CUDA_DSA` to enable device-side assertions.


In [15]:
print(f"Input shape: {inputs.shape}")  # Expected: (batch_size, seq_len)
print(f"Labels shape: {labels.shape}")  # Expected: (batch_size, seq_len)


Input shape: torch.Size([2, 5])
Labels shape: torch.Size([2, 5])
