In [None]:
model_name = "HuggingFaceTB/SmolLM2-135M-Instruct"
token = ""


# Saving tokenizer files for hugging face app
import shutil
import os
from transformers import AutoTokenizer, AutoModelForCausalLM

# Paths for saving tokenizer and model
save_dir = "/kaggle/working/smollm_model"

# Ensure the directory exists|
os.makedirs(save_dir, exist_ok=True)

# Load tokenizer and model (replace with your model name or path)

tokenizer_saved = AutoTokenizer.from_pretrained(model_name, use_auth_token=token)
model_saved = AutoModelForCausalLM.from_pretrained(model_name, use_auth_token=token)

# Save tokenizer and model to working directory
tokenizer_saved.save_pretrained(save_dir)
model_saved.save_pretrained(save_dir)

print(f">>> Model and tokenizer saved to {save_dir}")

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from dataclasses import dataclass
import math
import tiktoken
import time
from transformers import AutoTokenizer, AutoModelForCausalLM
import pytorch_lightning as pl
from pytorch_lightning import Trainer
from torch.optim import AdamW
from torch.optim.lr_scheduler import LambdaLR


# ============================================================================
# TECHNIQUE: Set matmul precision for performance
# ============================================================================
torch.set_float32_matmul_precision('high')

# ============================================================================
# Configuration Class for SmolLM2 Model
# ============================================================================
@ dataclass
class SMOLConfig:
    block_size: int = 1024
    vocab_size: int = 49152
    n_layer: int = 30
    n_head: int = 9
    n_embed: int = 576
    rope_theta: float = 10000.0
    rms_norm_eps: float = 1.0e-5
    intermediate_size: int = 1536

    def __post_init__(self):
        # Ensure head dimension is a power of 2
        head_dim = self.n_embed // self.n_head
        if head_dim & (head_dim - 1) != 0:
            raise ValueError("Head dimension must be a power of 2!")


# ============================================================================
# RMSNorm: Root Mean Square Normalization (Better than LayerNorm)
# ============================================================================
class RMSNorm(nn.Module):
    def __init__(self, dim, eps=1e-8):
        super().__init__()
        self.eps = eps
        self.scale = nn.Parameter(torch.ones(dim))

    def forward(self, x):
        norm = x.norm(2, dim=-1, keepdim=True)
        return x * (self.scale / (norm + self.eps))


# ============================================================================
# RoPE (Rotary Positional Embedding) with RoPE-Theta
# ============================================================================
class RotaryEmbedding(nn.Module):
    def __init__(self, dim, base_theta=10000):
        super().__init__()
        self.dim = dim
        self.base_theta = base_theta

        # Precompute RoPE frequencies
        inv_freq = 1.0 / (base_theta ** (torch.arange(0, dim, 2).float() / dim))
        self.register_buffer("inv_freq", inv_freq)

    def forward(self, seq_len, device):
        t = torch.arange(seq_len, device=device).float()
        freqs = torch.einsum("i,j->ij", t, self.inv_freq)
        cos = torch.cos(freqs).unsqueeze(0).unsqueeze(0)  # Shape: (1, 1, seq_len, dim/2)
        sin = torch.sin(freqs).unsqueeze(0).unsqueeze(0)  # Shape: (1, 1, seq_len, dim/2)
        return cos, sin

    def _rotate_half(self, x):
        """Rotate last dimension by half (for even dimensions only)."""
        x1, x2 = x[..., :x.shape[-1] // 2], x[..., x.shape[-1] // 2:]
        return torch.cat((-x2, x1), dim=-1)

    def apply_rotary(self, q, k):
        # Get sequence length and device
        seq_len = q.size(2)  # Ensure seq_len is from axis 2
        cos, sin = self(seq_len, q.device)
    
        # Ensure correct dimension: head_dim = q.shape[-1]
        head_dim = q.size(-1)
        dim = head_dim // 2
    
        # Split q, k into two halves (first half for RoPE)
        q1, q2 = q[..., :dim], q[..., dim:]
        k1, k2 = k[..., :dim], k[..., dim:]
    
        # Apply RoPE to the first half of dimensions
        q_rot = torch.cat([(q1 * cos) + (self._rotate_half(q1) * sin), q2], dim=-1)
        k_rot = torch.cat([(k1 * cos) + (self._rotate_half(k1) * sin), k2], dim=-1)
    
        return q_rot, k_rot




# ============================================================================
# Causal Self-Attention Module (Flash Attention + RoPE + RMSNorm)
# ============================================================================
class CausalSelfAttention(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.n_head = config.n_head
        self.n_embed = config.n_embed
        self.head_dim = config.n_embed // config.n_head

        # Linear projections for Q, K, V and output
        self.c_attn = nn.Linear(config.n_embed, 3 * config.n_embed)
        self.c_proj = nn.Linear(config.n_embed, config.n_embed)

        # Rotary Positional Embedding (RoPE) with custom theta
        self.rope = RotaryEmbedding(self.head_dim, base_theta=config.rope_theta)

        # RMSNorm for pre-normalization
        self.norm = RMSNorm(config.n_embed, eps=config.rms_norm_eps)

    def forward(self, x):
        B, T, C = x.size()

        # Apply RMSNorm before attention
        x = self.norm(x)

        # Compute Q, K, V projections
        qkv = self.c_attn(x).split(self.n_embed, dim=2)
        q, k, v = [qkv[i].view(B, T, self.n_head, self.head_dim).transpose(1, 2) for i in range(3)]

        # Apply RoPE (Rotary Positional Embedding) to Q and K
        q, k = self.rope.apply_rotary(q, k)

        # Flash Attention with causal masking
        y = F.scaled_dot_product_attention(q, k, v, attn_mask=None, dropout_p=0.0, is_causal=True)

        # Merge heads and project output
        y = y.transpose(1, 2).contiguous().view(B, T, C)
        return self.c_proj(y)


# ============================================================================
# Multi-Layer Perceptron (MLP) Block
# ============================================================================
# ============================================================================
# Multi-Layer Perceptron (MLP) Block with RMSNorm and Configurable Size
# ============================================================================
class MLP(nn.Module):
    def __init__(self, config):
        super().__init__()
        # Use configurable intermediate size for better performance
        hidden_size = config.intermediate_size if config.intermediate_size else 4 * config.n_embed

        self.c_fc = nn.Linear(config.n_embed, hidden_size)  # Input to hidden
        self.gelu = nn.GELU(approximate="tanh")  # Faster approximation of GELU
        self.c_proj = nn.Linear(hidden_size, config.n_embed)  # Hidden to output

        # Adding RMSNorm for better stability
        self.rms_norm = RMSNorm(config.n_embed, eps=config.rms_norm_eps)

    def forward(self, x):
        x = self.gelu(self.c_fc(x))  # Feed through FC and activation
        x = self.c_proj(x)  # Project back to embedding size
        return self.rms_norm(x)  # Normalize output for better stability


# ============================================================================
# Transformer Block with Residual Scaling, RMSNorm, and RoPE
# ============================================================================
class Block(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.rms_1 = RMSNorm(config.n_embed, eps=config.rms_norm_eps)  # Replaces LayerNorm
        self.attn = CausalSelfAttention(config)

        self.rms_2 = RMSNorm(config.n_embed, eps=config.rms_norm_eps)  # Replaces LayerNorm
        self.mlp = MLP(config)

        self.res_scale = 1.0 / math.sqrt(2)  # Residual scaling for stability

    def forward(self, x):
        # Attention block with RMSNorm and residual scaling
        x = x + self.res_scale * self.attn(self.rms_1(x))

        # MLP block with RMSNorm and residual scaling
        x = x + self.res_scale * self.mlp(self.rms_2(x))

        return x


# ============================================================================
# SmolLM Model Definition with Weight Sharing
# ============================================================================
class SMOLL(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.config = config
        self.transformer = nn.ModuleDict({
            'wte': nn.Embedding(config.vocab_size, config.n_embed),
            'wpe': nn.Embedding(config.block_size, config.n_embed),
            'h': nn.ModuleList([Block(config) for _ in range(config.n_layer)]),
            'ln_f': nn.LayerNorm(config.n_embed, eps=config.rms_norm_eps)})
        self.lm_head = nn.Linear(config.n_embed, config.vocab_size, bias=False)
        self.lm_head.weight = self.transformer['wte'].weight

    def forward(self, idx, targets=None):
        B, T = idx.size()
        assert T <= self.config.block_size, "Sequence length exceeds block size"

        pos = torch.arange(0, T, dtype=torch.long, device=idx.device)
        x = self.transformer['wte'](idx) + self.transformer['wpe'](pos)

        for block in self.transformer['h']:
            x = block(x)

        logits = self.lm_head(self.transformer['ln_f'](x))

        loss = F.cross_entropy(logits.view(-1, logits.size(-1)), targets.view(-1)) if targets is not None else None
        return logits, loss

    @classmethod
    def from_pretrained(cls, model_type, model_name, token=None):
        assert model_type == 'SmolLM2-135M', "Only 'SmolLM2-135M' is supported."
        config = SMOLConfig()
        model = cls(config)

        hf_model = AutoModelForCausalLM.from_pretrained(model_name, use_auth_token=token)
        model.load_state_dict(hf_model.state_dict(), strict=False)

        return model


# ============================================================================
# PyTorch Lightning Module (SMOLL)
# ============================================================================
class SMOLLLightningModule(pl.LightningModule):
    def __init__(self, model_type="SmolLM2-135M", model_name=None, token=None):
        super().__init__()
        self.model = SMOLL.from_pretrained(model_type, model_name, token)
        self.tokenizer = AutoTokenizer.from_pretrained(model_name, use_auth_token=token)

    def forward(self, x, targets=None):
        return self.model(x, targets)

    def training_step(self, batch, batch_idx):
        # Step timing
        step_start = time.time()
        x, y = batch
        _, loss = self.model(x, targets=y)

        if self.global_step % 200 == 0:
            print(">>> processing step: ", self.global_step)

        # Display generation samples every 500 steps
        if self.global_step % 500 == 0:
            sample = self.generate("Once upon a time", max_length=100, temperature=1.0)
            print(f"\n\n >>> Step {self.global_step}: \n >>> Generated Sample:\n{sample}\n")

        # Calculate and display step time
        step_time = time.time() - step_start

        self.log("train_loss", loss, on_step=True, prog_bar=True)
        return loss

    # def on_train_start(self):
    #     # Dataset and batch details
    #     dataset_size = len(self.trainer.datamodule.train_dataloader().dataset)
    #     batch_size = self.trainer.datamodule.batch_size
    #     max_steps = self.trainer.max_steps

    #     # Calculate estimated epochs
    #     total_epochs = (max_steps * batch_size) / dataset_size
    #     print(f">>> Training will run for ~{total_epochs:.2f} epochs.")

    #     # Start time for total training
    #     self.training_start_time = time.time()

    # def on_train_epoch_start(self):
    #     # Record epoch start time
    #     self.epoch_start_time = time.time()

    # def on_train_epoch_end(self):
    #     # Calculate and display epoch time
    #     epoch_time = time.time() - self.epoch_start_time
    #     print(f"Epoch {self.current_epoch + 1} completed in {epoch_time:.2f} seconds ({epoch_time/60:.2f} minutes)")

    # def on_train_end(self):
    #     # Calculate total training time
    #     total_time = time.time() - self.training_start_time
    #     print(f"Training completed in {total_time:.2f} seconds ({total_time/3600:.2f} hours).")
    #     print(f"Total epochs: {self.current_epoch + 1}, Total steps: {self.global_step}")

    def generate(self, prompt, max_length=100, temperature=1.0):
        self.model.eval()  # Set model to evaluation mode

        # Encode the prompt and move to the same device as the model
        input_ids = self.tokenizer.encode(prompt, return_tensors="pt").to(self.device)

        with torch.no_grad():
            for _ in range(max_length):
                logits, _ = self.model(input_ids)

                # Apply temperature scaling to control randomness
                next_token_logits = logits[:, -1, :] / temperature
                probs = F.softmax(next_token_logits, dim=-1)

                # Sample the next token
                next_token = torch.multinomial(probs, num_samples=1)

                # Append the token to the input sequence
                input_ids = torch.cat([input_ids, next_token], dim=1)

        return self.tokenizer.decode(input_ids[0], skip_special_tokens=True)

    def configure_optimizers(self):
        optimizer = AdamW(self.model.parameters(), lr=3e-3, betas=(0.9, 0.95), eps=1e-8, weight_decay=0.01)

        def lr_lambda(step):
            warmup_steps = 2000
            decay_start = 1600000
            decay_steps = 400000
            if step < warmup_steps:
                return step / warmup_steps
            elif step < decay_start:
                return 1.0
            return max(0.0, 1 - (step - decay_start) / decay_steps)

        return {
            "optimizer": optimizer,
            "lr_scheduler": {
                "scheduler": LambdaLR(optimizer, lr_lambda),
                "interval": "step"
            }
        }


# ============================================================================
# Dataset and DataLoader
# ============================================================================

# Custom Dataset Class
class TextDataset(Dataset):
    def __init__(self, tokens, block_size):
        self.tokens = tokens
        self.block_size = block_size

    def __len__(self):
        return len(self.tokens) - self.block_size

    def __getitem__(self, idx):
        return (
            torch.tensor(self.tokens[idx:idx + self.block_size], dtype=torch.long),
            torch.tensor(self.tokens[idx + 1:idx + self.block_size + 1], dtype=torch.long)
        )

# ============================================================================
# Training Configuration
# ============================================================================
T = 64
batch_size = 8

file_path = "/kaggle/input/shakespeare-texts/input.txt"
# Read and tokenize the Shakespeare dataset
with open(file_path, 'r', encoding='utf-8') as f:
    text = f.read()

tokenizer = AutoTokenizer.from_pretrained(model_name, use_auth_token=token)  # TECHNIQUE: Initialize tokenizer

text = text[:1000]  # TECHNIQUE: Optionally trim the text for demonstration
tokens = tokenizer.encode(text, add_special_tokens=False)
if len(tokens) < T + 1:
    raise ValueError("Not enough tokens in input.txt for one training sample.")

# Encode the full text using the tokenizer
tokens = tokenizer.encode(text, add_special_tokens=False)

# Create Dataset and DataLoader
dataset = TextDataset(tokens, T)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

print(f"Dataset size: {len(dataset)}, Batch size: {batch_size}")

lit_model = SMOLLLightningModule(model_name=model_name, token=token)
trainer = Trainer(max_steps=2500, log_every_n_steps=50, accelerator="auto", devices=1,
                  enable_progress_bar=True)  # Ensures live updates)

trainer.fit(lit_model, dataloader)
trainer.save_checkpoint("smollm2_135m.ckpt")


In [None]:
# !pip install transformers accelerate bitsandbytes

In [None]:
# # # # # # # # # #
# # QUANTIZATION  #
# # # # # # # # # #
# import torch
# from transformers import AutoModelForCausalLM, AutoTokenizer

# checkpoint_path = "smollm2_135m.ckpt"

# # Apply 8-bit quantization
# model = lit_model.to('cuda')  # Ensure the model is on GPU if available
# model = model.half()      # Use FP16 for better performance
# quantized_model = model.quantize(bits=8)

# # Save the quantized model
# quantized_model.save_pretrained("smollM2-135M-quantized")
# tokenizer.save_pretrained("smollM2-135M-quantized")


In [None]:
!pip install torchinfo

In [None]:
# # # # # # # #
# PARAMETERS #
# # # # # # ##

if __name__ == "__main__":
    # Initialize model config
    config = SMOLConfig()
    model = SMOLL(config)

    # Print model architecture
    print(model)


In [None]:
# # # # # # # #
# EXTRA STEPS #
# # # # # # # #

lit_model_extra = SMOLLLightningModule.load_from_checkpoint("smollm2_135m.ckpt",model_name=model_name, token=token)
trainer_extra = Trainer(
    max_steps=50,
    log_every_n_steps=10,
    accelerator="auto",
    devices=1
)
trainer_extra.fit(lit_model_extra, train_dataloaders=dataloader)
print("Additional 50 training steps completed.")

In [None]:
# ============================================================================
# Final Inference: Text Generation
# ============================================================================
inference_prompt1 = lit_model_extra.generate("Once upon a time", max_length=100, temperature=1.0)
print("Final generated text:", inference_prompt1)

In [None]:
inference_prompt2 = lit_model_extra.generate("Hey, do you know India is a", max_length=100, temperature=1.0)
print("Final generated text:", inference_prompt2)

In [None]:
import torch
from transformers import AutoTokenizer
import pytorch_lightning as pl
from torch.quantization import quantize_dynamic
from pytorch_lightning import Trainer

# Step 1: Load the trained Lightning checkpoint
checkpoint_path = "smollm2_135m.ckpt"
smoll_lightning_model = SMOLLLightningModule.load_from_checkpoint(checkpoint_path, model_name=model_name, token=token)
print("✅ Loaded SMOLLLightningModule from checkpoint")

# Step 2: Apply dynamic quantization to the model (focus on Linear layers for efficiency)
quantized_model = quantize_dynamic(
    smoll_lightning_model.model,  # Target model
    {torch.nn.Linear},            # Quantize Linear layers
    dtype=torch.qint8             # Use 8-bit quantization
)

# Step 3: Replace the model with the quantized version
smoll_lightning_model.model = quantized_model
print("✅ Model quantized successfully")

# Step 4: Save the quantized checkpoint (for Lightning reloading)
quantized_ckpt_path = "quantized_smollLMv2.ckpt"
torch.save({"state_dict": smoll_lightning_model.state_dict()}, quantized_ckpt_path)
print(f"✅ Quantized checkpoint saved: {quantized_ckpt_path}")

# Step 5: Save model and tokenizer in Hugging Face-compatible format

hf_save_path = "quantized-smollM2"
os.makedirs(hf_save_path, exist_ok=True)

# Save model weights
torch.save(smoll_lightning_model.model.state_dict(), os.path.join(hf_save_path, "pytorch_model.bin"))
print(f"✅ Quantized model weights saved: {hf_save_path}/pytorch_model.bin")

# Save tokenizer using Hugging Face API
smoll_lightning_model.tokenizer.save_pretrained(hf_save_path)
print(f"✅ Tokenizer saved for Hugging Face: {hf_save_path}")


# Step 6: Save model configuration (including SMOLConfig details)
config_hf_model = {
    "model_type": "smollm2-135m",
    "torch_dtype": "int8",  # Quantized model
    "architectures": ["SMOLL"],
    "vocab_size": smoll_lightning_model.tokenizer.vocab_size,
    
    # Add SMOLConfig parameters
    "block_size": 1024,
    "n_layer": 30,
    "n_head": 9,
    "n_embed": 576,
    "rope_theta": 10000.0,
    "rms_norm_eps": 1.0e-5,
    "intermediate_size": 1536,

    # Additional training parameters
    "batch_size": 8,
    "max_position_embeddings": 64,  # Sequence length
}

import json
# Save the config.json file
with open(os.path.join(hf_save_path, "config.json"), "w") as f:
    json.dump(config_hf_model, f, indent=4)

print(f"✅ Hugging Face config saved: {hf_save_path}/config.json")



In [None]:

folder_path = "/kaggle/working/quantized-smollM2"
output_zip_path = "/kaggle/working/quantized-smollM2.zip"

# Create a zip archive of the folder
shutil.make_archive(output_zip_path.replace(".zip", ""), 'zip', folder_path)

In [None]:

folder_path = "/kaggle/working/smollm_model"
output_zip_path = "/kaggle/working/smollm_model.zip"

# Create a zip archive of the folder
shutil.make_archive(output_zip_path.replace(".zip", ""), 'zip', folder_path)