In [1]:
import torch 
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import Adam
from torch.utils.data import TensorDataset, DataLoader

import lightning as L

import pandas as pd

import logging
# Set the logging level to ERROR to suppress INFO and WARNING messages
logging.getLogger("pytorch_lightning").setLevel(logging.ERROR)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

df = pd.read_csv('llm_input.csv', usecols=["instruction", "response"])
data = list(tuple(zip(df["instruction"], df["response"])))


def build_vocab(data):
    vocab = {"<PAD>": 0, "<UNK>": 1, "<SOS>": 2, "<EOS>": 3}
    for question, answer in data:
        for word in question.split() + answer.split():
            if word not in vocab:
                vocab[word] = len(vocab)
    id_to_token = {v: k for k, v in vocab.items()} # Reverse token and id
    return vocab, id_to_token


vocab, id_to_token = build_vocab(data)


def encode_input(question, answer, vocab, max_len):
    """Encodes both question and answer into a single list of token IDs, 
       separated by a [EOS] token, padding to max_len.
    """
    
    question_encoded = [vocab.get(word, vocab["<UNK>"]) for word in question.split()]
    answer_encoded = [vocab.get(word, vocab["<UNK>"]) for word in answer.split()]
    
    # Combine question and answer with [EOS] token
    encoded = [vocab["<SOS>"]] + question_encoded + [vocab["<EOS>"]] + answer_encoded + [vocab["<EOS>"]]
    
    # Padding
    padding_lenth = max_len - len(encoded)
    if padding_lenth > 0:
        encoded += [vocab["<PAD>"]] * padding_lenth
    
    # Truncate if longer than max length AND remove the last token 
    # to prepare for teacher forcing
    return encoded[:max_len]


def encode_label(question, answer, vocab, max_len):
    """Encodes both question and answer into a single list of token IDs, 
       separated by a [EOS] token, padding to max_len.
    """
    
    question_encoded = [vocab.get(word, vocab["<UNK>"]) for word in question.split()]
    answer_encoded = [vocab.get(word, vocab["<UNK>"]) for word in answer.split()]
    
    # Combine question and answer with [EOS] token
    # Labels remove the first character of the sequences (pos 1) and keep the <SOS> character
    encoded = [vocab["<SOS>"]] + question_encoded[1:] + [vocab["<EOS>"]] + answer_encoded + [vocab["<EOS>"]]
    
    # Padding
    padding_lenth = max_len - len(encoded)
    if padding_lenth > 0:
        encoded += [vocab["<PAD>"]] * padding_lenth
    
    # Truncate if longer than max length AND remove the last token 
    # to prepare for teacher forcing
    return encoded[:max_len]


inputs = torch.tensor([encode_input(question, answer, vocab, 100) for question, answer in data])
labels = torch.tensor([encode_label(question, answer, vocab, 100) for question, answer in data]) 

dataset = TensorDataset(inputs, labels)
dataloader = DataLoader(dataset)



class PositionEncoding(nn.Module):
    
    def __init__(self, d_model=10, max_len=100):
        
        super().__init__()
        
        pe = torch.zeros(max_len, d_model)
        
        position = torch.arange(start=0, end=max_len, step=1).float().unsqueeze(1)
        
        embedding_index = torch.arange(start=0, end=d_model, step=2).float()
        
        div_term = 1/torch.tensor(10000.0)**(embedding_index/d_model)
        
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)

        self.register_buffer('pe', pe)
        

    def forward(self, word_embeddings):
        
        return word_embeddings + self.pe[:word_embeddings.size(0)]
    


class Attention(nn.Module):
    
    def __init__(self, d_model=2):
        
        super().__init__()

        self.d_model = d_model
        
        self.W_q = nn.Linear(in_features=d_model, out_features=d_model, bias=False)
        self.W_k = nn.Linear(in_features=d_model, out_features=d_model, bias=False)
        self.W_v = nn.Linear(in_features=d_model, out_features=d_model, bias=False)
        
        self.row_dim = 0
        self.col_dim = 1
        
    def forward(self, encodings_for_q, encodings_for_k, encodings_for_v, mask=None):
        
        q = self.W_q(encodings_for_q)
        k = self.W_k(encodings_for_k)
        v = self.W_v(encodings_for_v)
        
        sims = torch.matmul(q, k.transpose(dim0=self.row_dim, dim1=self.col_dim))

        scaled_sims = sims / torch.sqrt(torch.tensor(self.d_model))

        if mask is not None:
            
            scaled_sims = scaled_sims.masked_fill(mask=mask, value=-1e9)
            
        attention_percents = F.softmax(scaled_sims, dim=self.col_dim)
        
        attention_scores = torch.matmul(attention_percents, v)
        
        return attention_scores
    


class DecoderOnlyTransformer(L.LightningModule):
    
    def __init__(self, num_tokens=len(vocab), d_model=10, max_len=100):
        
        super().__init__()
        
        L.seed_everything(seed=42)
        
        self.we = nn.Embedding(num_embeddings=num_tokens,
                               embedding_dim=d_model)
        
        self.pe = PositionEncoding(d_model=d_model, 
                                   max_len=max_len)
        
        self.self_attention = Attention(d_model=d_model)
        
        self.fc_layer = nn.Linear(in_features=d_model, out_features=num_tokens)
        
        self.loss = nn.CrossEntropyLoss()

    
    def forward(self, token_ids):
        
        word_embeddings = self.we(token_ids)
        
        position_encodings = self.pe(word_embeddings)
        
        mask = torch.tril(torch.ones((token_ids.size(dim=0), token_ids.size(dim=0)), device=self.device))
        
        mask = mask == 0
        
        self_attention_values = self.self_attention(encodings_for_q=position_encodings,
                                                    encodings_for_k=position_encodings,
                                                    encodings_for_v=position_encodings,
                                                    mask=mask)
        
        residual_connection_values = position_encodings + self_attention_values
        
        fc_layer_output = self.fc_layer(residual_connection_values)
        
        return fc_layer_output


    def configure_optimizers(self):
        
        return Adam(self.parameters(), lr=1e-3)

    
    def training_step(self, batch, batch_idx):
        input_tokens, labels = batch
        input_tokens, labels = input_tokens.to(self.device), labels.to(self.device)
        output = self.forward(input_tokens[0])
        loss = self.loss(output, labels[0])
        return loss

Train And Save

In [2]:
model = DecoderOnlyTransformer(num_tokens=len(vocab), d_model=10, max_len=100)
model = model.to(device)

trainer = L.Trainer(max_epochs=50)
trainer.fit(model, train_dataloaders=dataloader)

Seed set to 42
GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs
You are using a CUDA device ('NVIDIA GeForce RTX 3050 Laptop GPU') that has Tensor Cores. To properly utilize them, you should set `torch.set_float32_matmul_precision('medium' | 'high')` which will trade-off precision for performance. For more details, read https://pytorch.org/docs/stable/generated/torch.set_float32_matmul_precision.html#torch.set_float32_matmul_precision





LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name           | Type             | Params | Mode 
------------------------------------------------------------
0 | we             | Embedding        | 155 K  | train
1 | pe             | PositionEncoding | 0      | train
2 | self_attention | Attention        | 300    | train
3 | fc_layer       | Linear           | 171 K  | train
4 | loss           | CrossEntropyLoss | 0      | train
------------------------------------------------------------
327 K     Trainable params
0         Non-trainable params
327 K     Total params
1.309     Total estimated model params size (MB)
e:\ROOTDIR\python\Lib\site-packages\lightning\pytorch\trainer\connectors\data_connector.py:424: The 'train_dataloader' does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` to `num_workers=11` in the `DataLoader` to improve performance.


Training: |          | 0/? [00:00<?, ?it/s]

`Trainer.fit` stopped: `max_epochs=50` reached.


In [14]:
torch.save(model.state_dict(), 'llm.pth')

USE

In [2]:
model = DecoderOnlyTransformer()

# Load the state dictionary into the model
model_path = 'llm.pth'
model.load_state_dict(torch.load(model_path, weights_only=False))

# Set the model to evaluation mode
model.eval()

def text_to_tensor(input_text):
    tokens = input_text.split()
    token_ids = []
    for token in tokens:
        if token in vocab:
            token_ids.append(vocab[token])
        else:
            print(f"Warning: Token '{token}' not found in dictionary.")
    return torch.tensor(token_ids)


input_text = input("Prompt: ")
model_input = text_to_tensor(input_text)

input_length = model_input.size(dim=0)

predictions = model(model_input)
predicted_id = torch.tensor([torch.argmax(predictions[-1, :])])
predicted_ids = predicted_id

max_length = 100

for i in range(input_length, max_length):
    if (predicted_id == vocab['<EOS>']):
        break
    model_input = torch.cat((model_input, predicted_id))
    predictions = model(model_input)
    predicted_id = torch.tensor([torch.argmax(predictions[-1, :])])
    predicted_ids = torch.cat((predicted_ids, predicted_id))


print("Answer:", end=" ")    
for id in predicted_ids:
    print(id_to_token[id.item()], end=" ")

Seed set to 42


Answer: status of your account on the current status of your refund. To provide accurate information, could you please provide accurate information, I will help you with the necessary information, I will help me with the status of your compensation. Your satisfaction is our priority, and we will help me to assist you. Thank you for you. <EOS> 