In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [2]:
import os, math, random, time, json
import numpy as np
import torch
import torch.nn as nn
from torch.nn import functional as F
from tqdm import tqdm

device = 'cuda' if torch.cuda.is_available() else 'cpu'
print("Device:", device)

SEED = 1337
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)


Device: cuda


<torch._C.Generator at 0x789ba1ffcb30>

In [3]:
# -------------------------
# Math dataset
# -------------------------
def safe_eval_int(expr: str):
    allowed = set("0123456789+-*/() ")
    if any(c not in allowed for c in expr):
        return None
    try:
        val = eval(expr, {"__builtins__": None}, {})
        if isinstance(val, (int, float)) and abs(val - round(val)) < 1e-9:
            return int(round(val))
        return None
    except Exception:
        return None

def gen_math_expr(level: int) -> str:
    # progressively harder
    if level == 0:
        a, b = random.randint(0, 9), random.randint(0, 9)
        return f"{a}+{b}"
    if level == 1:
        a, b = random.randint(0, 99), random.randint(0, 99)
        op = random.choice(["+", "-"])
        return f"{a}{op}{b}"
    if level == 2:
        a, b, c = random.randint(0, 9), random.randint(0, 9), random.randint(0, 9)
        op = random.choice(["+", "-"])
        return f"({a}{op}{b})*{c}"
    # level 3: exact division
    m = random.randint(1, 12)
    k = random.randint(0, 80)
    left = k * m
    if random.random() < 0.5:
        t = random.randint(0, 20)
        return f"({left}+{t}-{t})/{m}"
    return f"{left}/{m}"

def make_math_dataset(n, levels=(0,1,2,3)):
    out = []
    while len(out) < n:
        expr = gen_math_expr(random.choice(levels))
        ans = safe_eval_int(expr)
        if ans is None: 
            continue
        out.append(f"{expr}={ans}")
    return out


# -------------------------
# Boolean dataset (NO Python eval)
# Grammar supports: True False AND OR XOR NOT and parentheses
# -------------------------
def tok_bool(s):
    s = s.replace("(", " ( ").replace(")", " ) ")
    return s.split()

def parse_bool(tokens):
    # expr := term ((OR|XOR) term)*
    # term := factor (AND factor)*
    # factor := NOT factor | atom
    # atom := True | False | '(' expr ')'

    def parse_atom(i):
        if tokens[i] == "True":  return True, i+1
        if tokens[i] == "False": return False, i+1
        if tokens[i] == "(":
            v, j = parse_expr(i+1)
            if tokens[j] != ")":
                raise ValueError("Missing )")
            return v, j+1
        raise ValueError("Bad atom")

    def parse_factor(i):
        if tokens[i] == "NOT":
            v, j = parse_factor(i+1)
            return (not v), j
        return parse_atom(i)

    def parse_term(i):
        v, j = parse_factor(i)
        while j < len(tokens) and tokens[j] == "AND":
            rhs, j2 = parse_factor(j+1)
            v = v and rhs
            j = j2
        return v, j

    def parse_expr(i):
        v, j = parse_term(i)
        while j < len(tokens) and tokens[j] in ("OR", "XOR"):
            op = tokens[j]
            rhs, j2 = parse_term(j+1)
            v = (v or rhs) if op == "OR" else (v != rhs)
            j = j2
        return v, j

    v, j = parse_expr(0)
    if j != len(tokens):
        raise ValueError("Extra tokens")
    return v

def gen_bool_expr(depth):
    OPS = ["AND", "OR", "XOR"]
    LITS = ["True", "False"]
    if depth <= 0:
        atom = random.choice(LITS)
        return f"NOT {atom}" if random.random() < 0.25 else atom

    r = random.random()
    if r < 0.25:
        return f"NOT ({gen_bool_expr(depth-1)})"

    left = gen_bool_expr(depth-1)
    right = gen_bool_expr(depth-1)
    op = random.choice(OPS)
    if random.random() < 0.75:
        return f"({left} {op} {right})"
    return f"{left} {op} {right}"

def make_bool_dataset(n, max_depth):
    out = []
    while len(out) < n:
        d = random.randint(0, max_depth)
        expr = gen_bool_expr(d)
        val = parse_bool(tok_bool(expr))
        out.append(f"{expr} = {'True' if val else 'False'}")
    return out


# -------------------------
# Build datasets (adjust sizes if needed)
# -------------------------
math_train = make_math_dataset(60000, levels=(0,1,1,2,2,3))
math_test  = make_math_dataset(8000,  levels=(1,2,2,3,3))

bool_train = make_bool_dataset(60000, max_depth=3)
bool_test  = make_bool_dataset(8000,  max_depth=5)

print("Math:", len(math_train), len(math_test), "Bool:", len(bool_train), len(bool_test))
print("Sample math:", math_train[0])
print("Sample bool:", bool_train[0])

# optional: write datasets to files (useful for zip submission)
os.makedirs("datasets", exist_ok=True)
open("datasets/math_train.txt","w",encoding="utf-8").write("\n".join(math_train)+"\n")
open("datasets/math_test.txt","w",encoding="utf-8").write("\n".join(math_test)+"\n")
open("datasets/bool_train.txt","w",encoding="utf-8").write("\n".join(bool_train)+"\n")
open("datasets/bool_test.txt","w",encoding="utf-8").write("\n".join(bool_test)+"\n")
print("Wrote datasets/ ملفات")


Math: 60000 8000 Bool: 60000 8000
Sample math: (8+5)*9=117
Sample bool: (NOT False AND True) AND NOT (False) = True
Wrote datasets/ ملفات


In [4]:
def build_gpt_language_model(vocab_chars, *,
                             block_size=128,
                             n_embd=256,
                             n_head=4,
                             n_layer=4,
                             dropout=0.1):
    """
    Builds the GPTLanguageModel implementation from gpt.py,
    but with notebook-controlled hyperparameters & vocab.
    """

    chars = sorted(list(set(vocab_chars)))
    vocab_size = len(chars)
    stoi = {ch:i for i,ch in enumerate(chars)}
    itos = {i:ch for ch,i in stoi.items()}
    encode = lambda s: [stoi[c] for c in s]
    decode = lambda ids: ''.join([itos[i] for i in ids])

    # ---- Below is structurally the same as your gpt.py classes ----
    class Head(nn.Module):
        def __init__(self, head_size):
            super().__init__()
            self.key = nn.Linear(n_embd, head_size, bias=False)
            self.query = nn.Linear(n_embd, head_size, bias=False)
            self.value = nn.Linear(n_embd, head_size, bias=False)
            self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))
            self.dropout = nn.Dropout(dropout)

        def forward(self, x):
            B,T,C = x.shape
            k = self.key(x)
            q = self.query(x)
            wei = q @ k.transpose(-2,-1) * (k.shape[-1] ** -0.5)
            wei = wei.masked_fill(self.tril[:T,:T] == 0, float('-inf'))
            wei = F.softmax(wei, dim=-1)
            wei = self.dropout(wei)
            v = self.value(x)
            out = wei @ v
            return out

    class MultiHeadAttention(nn.Module):
        def __init__(self, num_heads, head_size):
            super().__init__()
            self.heads = nn.ModuleList([Head(head_size) for _ in range(num_heads)])
            self.proj = nn.Linear(head_size * num_heads, n_embd)
            self.dropout = nn.Dropout(dropout)

        def forward(self, x):
            out = torch.cat([h(x) for h in self.heads], dim=-1)
            out = self.dropout(self.proj(out))
            return out

    class FeedFoward(nn.Module):
        def __init__(self):
            super().__init__()
            self.net = nn.Sequential(
                nn.Linear(n_embd, 4*n_embd),
                nn.ReLU(),
                nn.Linear(4*n_embd, n_embd),
                nn.Dropout(dropout),
            )
        def forward(self, x):
            return self.net(x)

    class Block(nn.Module):
        def __init__(self):
            super().__init__()
            head_size = n_embd // n_head
            self.sa = MultiHeadAttention(n_head, head_size)
            self.ffwd = FeedFoward()
            self.ln1 = nn.LayerNorm(n_embd)
            self.ln2 = nn.LayerNorm(n_embd)
        def forward(self, x):
            x = x + self.sa(self.ln1(x))
            x = x + self.ffwd(self.ln2(x))
            return x

    class GPTLanguageModel(nn.Module):
        def __init__(self):
            super().__init__()
            self.token_embedding_table = nn.Embedding(vocab_size, n_embd)
            self.position_embedding_table = nn.Embedding(block_size, n_embd)
            self.blocks = nn.Sequential(*[Block() for _ in range(n_layer)])
            self.ln_f = nn.LayerNorm(n_embd)
            self.lm_head = nn.Linear(n_embd, vocab_size)
            self.apply(self._init_weights)

        def _init_weights(self, module):
            if isinstance(module, nn.Linear):
                torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
                if module.bias is not None:
                    torch.nn.init.zeros_(module.bias)
            elif isinstance(module, nn.Embedding):
                torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)

        def forward(self, idx, targets=None):
            B, T = idx.shape
            tok_emb = self.token_embedding_table(idx)
            pos_emb = self.position_embedding_table(torch.arange(T, device=idx.device))
            x = tok_emb + pos_emb
            x = self.blocks(x)
            x = self.ln_f(x)
            logits = self.lm_head(x)

            loss = None
            if targets is not None:
                B, T, C = logits.shape
                logits_2d = logits.view(B*T, C)
                targets_2d = targets.view(B*T)
                loss = F.cross_entropy(logits_2d, targets_2d)
            return logits, loss

        def generate(self, idx, max_new_tokens):
            for _ in range(max_new_tokens):
                idx_cond = idx[:, -block_size:]
                logits, _ = self(idx_cond)
                logits = logits[:, -1, :]
                probs = F.softmax(logits, dim=-1)
                idx_next = torch.multinomial(probs, num_samples=1)
                idx = torch.cat((idx, idx_next), dim=1)
            return idx

    return GPTLanguageModel, encode, decode, stoi, itos, vocab_size, chars


In [5]:
def make_text(lines):
    return "\n".join(lines) + "\n"

def train_gpt_on_lines(train_lines, test_lines, *,
                       block_size=128, n_embd=256, n_head=4, n_layer=4, dropout=0.1,
                       batch_size=64, max_iters=4000, eval_interval=400, learning_rate=3e-4,
                       max_new_tokens=48):
    # Build vocab from BOTH splits to avoid OOV
    full_text = make_text(train_lines) + make_text(test_lines)
    GPTClass, encode, decode, stoi, itos, vocab_size, chars = build_gpt_language_model(
        full_text,
        block_size=block_size, n_embd=n_embd, n_head=n_head, n_layer=n_layer, dropout=dropout
    )

    data = torch.tensor(encode(full_text), dtype=torch.long)
    n = int(0.9 * len(data))
    train_data = data[:n]
    val_data = data[n:]

    def get_batch(split):
        src = train_data if split == 'train' else val_data
        ix = torch.randint(len(src) - block_size - 1, (batch_size,))
        x = torch.stack([src[i:i+block_size] for i in ix])
        y = torch.stack([src[i+1:i+block_size+1] for i in ix])
        return x.to(device), y.to(device)

    @torch.no_grad()
    def estimate_loss(eval_iters=100):
        out = {}
        model.eval()
        for split in ['train','val']:
            losses = []
            for _ in range(eval_iters):
                X, Y = get_batch(split)
                _, loss = model(X, Y)
                losses.append(loss.item())
            out[split] = float(np.mean(losses))
        model.train()
        return out

    @torch.no_grad()
    def gen_one(prompt, max_new=max_new_tokens):
        model.eval()
        idx = torch.tensor([encode(prompt)], dtype=torch.long, device=device)
        out = model.generate(idx, max_new)
        txt = decode(out[0].tolist())
        # take only first line (up to newline) so comparisons are stable
        return txt.split("\n", 1)[0].strip()

    @torch.no_grad()
    def eval_exact_match(test_lines, kind, n_samples=300):
        # kind: "math" or "bool"
        model.eval()
        n_samples = min(n_samples, len(test_lines))
        idxs = np.random.choice(len(test_lines), size=n_samples, replace=False)
        exact_ok = 0
        ans_ok = 0

        for i in idxs:
            gold = test_lines[i].strip()

            if kind == "math":
                expr, ans = gold.split("=", 1)
                prompt = f"{expr}="
                pred_line = gen_one(prompt, max_new=32)
                # exact line
                exact_ok += int(pred_line == gold)
                # answer-only
                if "=" in pred_line:
                    pred_ans = pred_line.split("=",1)[1].strip()
                    ans_ok += int(pred_ans == ans.strip())
            else:
                left, right = gold.split("=", 1)
                prompt = f"{left.strip()} ="
                pred_line = gen_one(prompt, max_new=48)
                exact_ok += int(pred_line == gold)
                if "=" in pred_line:
                    pred_rhs = pred_line.split("=",1)[1].strip()
                    ans_ok += int(pred_rhs == right.strip())

        return {
            "exact_match": exact_ok / n_samples,
            "answer_match": ans_ok / n_samples
        }

    model = GPTClass().to(device)
    optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)

    history = []
    t0 = time.time()

    for it in tqdm(range(1, max_iters+1), desc="training"):
        if it % eval_interval == 0 or it == 1:
            losses = estimate_loss(eval_iters=80)
            history.append({"iter": it, **losses})
            print(f"iter {it}: train {losses['train']:.4f}  val {losses['val']:.4f}")

        xb, yb = get_batch('train')
        _, loss = model(xb, yb)
        optimizer.zero_grad(set_to_none=True)
        loss.backward()
        optimizer.step()

    print("Training seconds:", round(time.time()-t0,2))
    return model, encode, decode, history, eval_exact_match, gen_one


In [6]:
# Reasonable starter hyperparams (tune for your architecture experiments in the report)
math_model, math_encode, math_decode, math_hist, math_eval_fn, math_gen_one = train_gpt_on_lines(
    math_train, math_test,
    block_size=64, n_embd=256, n_head=4, n_layer=4, dropout=0.1,
    batch_size=128, max_iters=3000, eval_interval=300, learning_rate=3e-4,
    max_new_tokens=32
)

# Save weights with required filename
torch.save(math_model.state_dict(), "model_weights_part1.pth")
print("Saved: model_weights_part1.pth")

# Evaluate
math_metrics = math_eval_fn(math_test, kind="math", n_samples=400)
print("Math metrics:", math_metrics)

# Prompt-output appendix examples (strengths & weaknesses)
math_prompts = [
    "3+2=",
    "47+38=",
    "(3+2)*4=",
    "12-5=",
    "20/4=",
    "(9-3)*7=",
    "(10+5)/5=",
]
with open("prompt_outputs_math.txt", "w", encoding="utf-8") as f:
    for p in math_prompts:
        out = math_gen_one(p, max_new=32)
        f.write(f"PROMPT: {p}\nOUTPUT: {out}\n---\n")

with open("metrics_math.json","w") as f:
    json.dump({"history": math_hist, "final_metrics": math_metrics}, f, indent=2)

print("Wrote: prompt_outputs_math.txt, metrics_math.json")


training:   0%|          | 0/3000 [00:00<?, ?it/s]

iter 1: train 2.9639  val 2.9617


training:  10%|█         | 302/3000 [00:26<19:21,  2.32it/s]

iter 300: train 1.4306  val 1.4056


training:  20%|██        | 602/3000 [00:48<17:54,  2.23it/s]

iter 600: train 1.2951  val 1.2609


training:  30%|███       | 902/3000 [01:12<16:17,  2.15it/s]

iter 900: train 1.1969  val 1.1586


training:  40%|████      | 1202/3000 [01:36<14:32,  2.06it/s]

iter 1200: train 1.1037  val 1.0828


training:  50%|█████     | 1502/3000 [02:01<11:53,  2.10it/s]

iter 1500: train 1.0722  val 1.0422


training:  60%|██████    | 1802/3000 [02:25<09:21,  2.13it/s]

iter 1800: train 1.0573  val 1.0255


training:  70%|███████   | 2102/3000 [02:49<07:06,  2.10it/s]

iter 2100: train 1.0466  val 1.0214


training:  80%|████████  | 2402/3000 [03:13<04:45,  2.09it/s]

iter 2400: train 1.0432  val 1.0168


training:  90%|█████████ | 2702/3000 [03:37<02:21,  2.11it/s]

iter 2700: train 1.0339  val 1.0068


training: 100%|██████████| 3000/3000 [04:01<00:00, 12.43it/s]

iter 3000: train 1.0337  val 1.0066
Training seconds: 241.39





Saved: model_weights_part1.pth
Math metrics: {'exact_match': 0.8775, 'answer_match': 0.8775}
Wrote: prompt_outputs_math.txt, metrics_math.json


In [7]:
bool_model, bool_encode, bool_decode, bool_hist, bool_eval_fn, bool_gen_one = train_gpt_on_lines(
    bool_train, bool_test,
    block_size=96, n_embd=256, n_head=4, n_layer=4, dropout=0.1,
    batch_size=128, max_iters=3000, eval_interval=300, learning_rate=3e-4,
    max_new_tokens=48
)

torch.save(bool_model.state_dict(), "model_weights_part2.pth")
print("Saved: model_weights_part2.pth")

bool_metrics = bool_eval_fn(bool_test, kind="bool", n_samples=400)
print("Bool metrics:", bool_metrics)

bool_prompts = [
    "True AND False =",
    "NOT True =",
    "(True OR False) AND True =",
    "True XOR True =",
    "NOT (False OR False) =",
    "(True XOR False) AND (False OR True) =",
]
with open("prompt_outputs_bool.txt", "w", encoding="utf-8") as f:
    for p in bool_prompts:
        out = bool_gen_one(p, max_new=48)
        f.write(f"PROMPT: {p}\nOUTPUT: {out}\n---\n")

with open("metrics_bool.json","w") as f:
    json.dump({"history": bool_hist, "final_metrics": bool_metrics}, f, indent=2)

print("Wrote: prompt_outputs_bool.txt, metrics_bool.json")


training:   0%|          | 1/3000 [00:06<5:08:54,  6.18s/it]

iter 1: train 3.0104  val 3.0191


training:  10%|█         | 301/3000 [00:45<1:04:06,  1.43s/it]

iter 300: train 0.3218  val 0.3599


training:  20%|██        | 601/3000 [01:23<55:43,  1.39s/it]  

iter 600: train 0.3001  val 0.3382


training:  30%|███       | 901/3000 [02:02<49:53,  1.43s/it]  

iter 900: train 0.2942  val 0.3312


training:  40%|████      | 1201/3000 [02:40<42:03,  1.40s/it]

iter 1200: train 0.2907  val 0.3308


training:  50%|█████     | 1501/3000 [03:18<35:25,  1.42s/it]

iter 1500: train 0.2911  val 0.3288


training:  60%|██████    | 1801/3000 [03:57<28:21,  1.42s/it]

iter 1800: train 0.2885  val 0.3260


training:  70%|███████   | 2101/3000 [04:35<21:09,  1.41s/it]

iter 2100: train 0.2867  val 0.3296


training:  80%|████████  | 2401/3000 [05:13<14:01,  1.41s/it]

iter 2400: train 0.2869  val 0.3247


training:  90%|█████████ | 2701/3000 [05:52<07:01,  1.41s/it]

iter 2700: train 0.2863  val 0.3205


training: 100%|██████████| 3000/3000 [06:30<00:00,  7.69it/s]

iter 3000: train 0.2844  val 0.3267
Training seconds: 390.28
Saved: model_weights_part2.pth





Bool metrics: {'exact_match': 0.59, 'answer_match': 0.59}
Wrote: prompt_outputs_bool.txt, metrics_bool.json


In [8]:
# Demonstrate required loading behavior (Part 1 as example)
# Note: GPTLanguageModel() requires the same vocab/hyperparams used at training time.
# In your submission zip, you'll include this notebook/script that rebuilds the same model config + vocab.

print("Sanity loading test (math):")
# rebuild math class exactly (same dataset text/vocab and hyperparams)
full_text_math = make_text(math_train) + make_text(math_test)
GPTMathClass, *_ = build_gpt_language_model(
    full_text_math, block_size=64, n_embd=256, n_head=4, n_layer=4, dropout=0.1
)

m = GPTMathClass().to(device)
m.load_state_dict(torch.load("model_weights_part1.pth", map_location=device))
m.eval()
print("Loaded model_weights_part1.pth OK")


Sanity loading test (math):
Loaded model_weights_part1.pth OK
