In [28]:
import torch
from torch.utils.data import Dataset as TorchDataset, DataLoader
from torch.optim import AdamW
from torch.optim.lr_scheduler import CosineAnnealingLR
from datasets import load_dataset, Dataset
from peft import LoraConfig, get_peft_model, prepare_model_for_kbit_training
import tiktoken
import random
import os

In [29]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
from dataclasses import dataclass
import numpy as np
from tqdm.auto import tqdm
from contextlib import nullcontext
import os

class LayerNorm(nn.Module):
    def __init__(self, ndim, bias):
        super().__init__()
        self.weight = nn.Parameter(torch.ones(ndim))
        self.bias = nn.Parameter(torch.zeros(ndim)) if bias else None
    def forward(self, x):
        return F.layer_norm(x, self.weight.shape, self.weight, self.bias, 1e-5)

class CausalSelfAttention(nn.Module):
    def __init__(self, config):
        super().__init__()
        assert config.n_embd % config.n_head == 0
        self.c_attn = nn.Linear(config.n_embd, 3 * config.n_embd, bias=config.bias)
        self.c_proj = nn.Linear(config.n_embd, config.n_embd, bias=config.bias)
        self.attn_dropout = nn.Dropout(config.dropout)
        self.resid_dropout = nn.Dropout(config.dropout)
        self.n_head = config.n_head
        self.n_embd = config.n_embd
        self.flash = hasattr(F, 'scaled_dot_product_attention')
        if not self.flash:
            self.register_buffer("bias", torch.tril(torch.ones(config.block_size, config.block_size))
                                       .view(1, 1, config.block_size, config.block_size))

    def forward(self, x):
        B, T, C = x.size()
        q, k, v = self.c_attn(x).split(self.n_embd, dim=2)
        k = k.view(B, T, self.n_head, C // self.n_head).transpose(1, 2)
        q = q.view(B, T, self.n_head, C // self.n_head).transpose(1, 2)
        v = v.view(B, T, self.n_head, C // self.n_head).transpose(1, 2)

        if self.flash:
            y = F.scaled_dot_product_attention(q, k, v, attn_mask=None, dropout_p=self.attn_dropout.p if self.training else 0.0, is_causal=True)
        else:
            att = (q @ k.transpose(-2, -1)) * (1.0 / math.sqrt(k.size(-1)))
            att = att.masked_fill(self.bias[:, :, :T, :T] == 0, float('-inf'))
            att = F.softmax(att, dim=-1)
            att = self.attn_dropout(att)
            y = att @ v

        y = y.transpose(1, 2).contiguous().view(B, T, C)
        y = self.resid_dropout(self.c_proj(y))
        return y

class MLP(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.c_fc = nn.Linear(config.n_embd, 4 * config.n_embd, bias=config.bias)
        self.gelu = nn.GELU()
        self.c_proj = nn.Linear(4 * config.n_embd, config.n_embd, bias=config.bias)
        self.dropout = nn.Dropout(config.dropout)
    def forward(self, x):
        return self.dropout(self.c_proj(self.gelu(self.c_fc(x))))

class Block(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.ln1 = LayerNorm(config.n_embd, config.bias)
        self.attn = CausalSelfAttention(config)
        self.ln2 = LayerNorm(config.n_embd, config.bias)
        self.mlp = MLP(config)
    def forward(self, x):
        x = x + self.attn(self.ln1(x))
        x = x + self.mlp(self.ln2(x))
        return x

@dataclass
class GPTConfig:
    block_size: int
    vocab_size: int
    n_layer: int
    n_head: int
    n_embd: int
    dropout: float = 0.0
    bias: bool = True

    # Add a get method to mimic dictionary-like access for peft
    def get(self, key, default=None):
        return getattr(self, key, default)

    # Add __contains__ and __getitem__ for dictionary-like behavior for PEFT
    def __contains__(self, key):
        return hasattr(self, key)

    def __getitem__(self, key):
        if hasattr(self, key):
            return getattr(self, key)
        raise KeyError(f"'{key}' not found in GPTConfig")

class GPT(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.config = config
        self.transformer = nn.ModuleDict(dict(
            wte=nn.Embedding(config.vocab_size, config.n_embd),
            wpe=nn.Embedding(config.block_size, config.n_embd),
            drop=nn.Dropout(config.dropout),
            h=nn.ModuleList([Block(config) for _ in range(config.n_layer)]),
            ln_f=LayerNorm(config.n_embd, config.bias),
        ))
        self.lm_head = nn.Linear(config.n_embd, config.vocab_size, bias=False)
        self.transformer.wte.weight = self.lm_head.weight  # weight tying

        self.apply(self._init_weights)
        for pn, p in self.named_parameters():
            if pn.endswith('c_proj.weight'):
                nn.init.normal_(p, mean=0.0, std=0.02 / math.sqrt(2 * config.n_layer))

    def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if module.bias is not None:
                nn.init.zeros_(module.bias)
        elif isinstance(module, nn.Embedding):
            nn.init.normal_(module.weight, mean=0.0, std=0.02)

    def forward(self, idx, targets=None):
        device = idx.device
        b, t = idx.size()
        assert t <= self.config.block_size
        pos = torch.arange(0, t, dtype=torch.long, device=device)

        tok_emb = self.transformer.wte(idx)
        pos_emb = self.transformer.wpe(pos)
        x = self.transformer.drop(tok_emb + pos_emb)
        for block in self.transformer.h:
            x = block(x)
        x = self.transformer.ln_f(x)

        if targets is not None:
            logits = self.lm_head(x)
            loss = F.cross_entropy(logits.view(-1, logits.size(-1)), targets.view(-1), ignore_index=-1)
            return logits, loss
        else:
            logits = self.lm_head(x[:, [-1], :])
            return logits, None

    @torch.no_grad()
    def generate(self, idx, max_new_tokens, temperature=1.0, top_k=None):
        """
        Generate tokens given a conditioning sequence.
        idx: Tensor of shape (B, T)
        """
        for _ in range(max_new_tokens):
            idx_cond = idx if idx.size(1) <= self.config.block_size else idx[:, -self.config.block_size:]
            logits, _ = self(idx_cond)
            logits = logits[:, -1, :] / temperature
            if top_k is not None:
                v, _ = torch.topk(logits, min(top_k, logits.size(-1)))
                logits[logits < v[:, [-1]]] = -float('Inf')
            probs = F.softmax(logits, dim=-1)
            idx_next = torch.multinomial(probs, num_samples=1)
            idx = torch.cat((idx, idx_next), dim=1)
        return idx

# Creating Instruction Dataset

In [30]:
from datasets import load_dataset, Dataset
import random

In [31]:
# 1. TinyStories instructions (90% of data)
ds_tiny = load_dataset("roneneldan/TinyStories", split="train")

In [32]:
templates = [
    "Tell a short children's story about {topic}.",
    "Write a bedtime story featuring {topic}.",
    "Create a simple tale where {topic}.",
    "Make up a happy story about {topic}.",
    "Write a 200-word mini-story about {topic}",
    "Write a soothing, lyrical bedtime story about a {topic} preparing for sleep."

]

In [33]:
def make_tiny_instruction(example):
    story = example["text"].strip()
    topic = " ".join(story.split()[:12]).rsplit(".", 1)[0] + "."
    instruction = random.choice(templates).format(topic=topic)
    return {"instruction": instruction, "output": story}

tiny_pairs = ds_tiny.shuffle(seed=42).select(range(12000)).map(make_tiny_instruction, num_proc=4)

In [34]:
# 2. Safe Alpaca stories (10% of data)
alpaca = load_dataset("yahma/alpaca-cleaned", split="train")

story_keywords = ["story", "tale", "fable", "bedtime story", "short story", "children's story", "fairytale"]
def is_child_story(example):
    inst = example["instruction"].lower()
    output = example["output"]
    return (any(k in inst for k in story_keywords) and
            len(output.split()) < 400 and
            all(bad not in output.lower() for bad in ["sex", "kill", "die", "blood", "erotic", "adult", "murder", "horror"]))

alpaca_stories = alpaca.filter(is_child_story, num_proc=4).shuffle(seed=42).select(range(1300))  # ~10%

In [35]:
final_list = list(tiny_pairs) + list(alpaca_stories)
final_dataset = Dataset.from_list(final_list).shuffle(seed=42)

print(f"Final instruction-tuning dataset ready: {len(final_dataset)} examples")
print("Example:", final_dataset[5]["instruction"])

Final instruction-tuning dataset ready: 13300 examples
Example: Create a simple tale where Timmy was outside playing all by himself..


In [36]:
# ========================== 3. DATASET CLASS ==========================
enc = tiktoken.get_encoding("gpt2")
block_size = 128

class StoryDataset(TorchDataset):
    def __init__(self, data):
        self.data = data
    def __len__(self):
        return len(self.data)
    def __getitem__(self, i):
        item = self.data[i]
        text = f"Instruction: {item['instruction']}\n\nResponse: {item['output']}<|endoftext|>"
        tokens = enc.encode(text, allowed_special={"<|endoftext|>"})
        tokens = tokens[:block_size+1]
        if len(tokens) < block_size+1:
            tokens += [50256] * (block_size+1 - len(tokens))
        x = torch.tensor(tokens[:-1], dtype=torch.long)
        y = torch.tensor(tokens[1:], dtype=torch.long)
        return x, y

train_ds = StoryDataset(final_dataset)
train_loader = DataLoader(train_ds, batch_size=32, shuffle=True)

In [37]:
# ========================== 4. LOAD MODEL + LoRA ==========================
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
config = GPTConfig(
    vocab_size=50257,
    block_size=128,
    n_layer=6,
    n_head=6,
    n_embd=384,
    dropout=0.1,
    bias=True
)
model = GPT(config).to(device)

# Load your pretrained weights
model.load_state_dict(torch.load("/content/drive/MyDrive/SLM Model/best_model_params.pt", map_location=device))

# Apply LoRA
lora_config = LoraConfig(
    r=16, lora_alpha=32,
    target_modules=["c_attn", "c_proj"],
    lora_dropout=0.05, bias="none"
)
model = get_peft_model(model, lora_config)
model.print_trainable_parameters()

trainable params: 405,504 || all params: 30,400,896 || trainable%: 1.3339


In [38]:
optimizer = AdamW(model.parameters(), lr=1e-4)
scheduler = CosineAnnealingLR(optimizer, T_max=3*len(train_loader))
epochs = 3

for epoch in range(epochs):
    model.train()
    total_loss = 0
    for step, (x, y) in enumerate(train_loader):
        x, y = x.to(device), y.to(device)
        logits, loss = model(x, y)
        optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()
        scheduler.step()
        total_loss += loss.item()
        if step % 100 == 0:
            print(f"Epoch {epoch+1} | Step {step} | Loss {loss.item():.4f}")
    print(f"Epoch {epoch+1} finished — Avg loss: {total_loss/len(train_loader):.4f}")

Epoch 1 | Step 0 | Loss 3.4053
Epoch 1 | Step 100 | Loss 2.4948
Epoch 1 | Step 200 | Loss 2.1010
Epoch 1 | Step 300 | Loss 1.7999
Epoch 1 | Step 400 | Loss 2.0889
Epoch 1 finished — Avg loss: 2.2725
Epoch 2 | Step 0 | Loss 2.1801
Epoch 2 | Step 100 | Loss 2.3217
Epoch 2 | Step 200 | Loss 1.8854
Epoch 2 | Step 300 | Loss 2.1302
Epoch 2 | Step 400 | Loss 2.1854
Epoch 2 finished — Avg loss: 2.0016
Epoch 3 | Step 0 | Loss 2.1298
Epoch 3 | Step 100 | Loss 1.8315
Epoch 3 | Step 200 | Loss 1.9146
Epoch 3 | Step 300 | Loss 2.0930
Epoch 3 | Step 400 | Loss 1.6913
Epoch 3 finished — Avg loss: 1.9733


In [39]:
# ========================== 6. SAVE ==========================
save_dir = "/content/drive/MyDrive/SLM Model/story_slm_instruct_tuned"
model.save_pretrained(save_dir)
print(f"Instruction-tuned model saved to {save_dir}")
print("Done! Now update model_inference.py to load this folder with PeftModel.")

Instruction-tuned model saved to /content/drive/MyDrive/SLM Model/story_slm_instruct_tuned
Done! Now update model_inference.py to load this folder with PeftModel.
