<a href="https://colab.research.google.com/github/peterbabulik/QSFPGA/blob/main/QSFPGA_QuILT_NAS.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
import torch
import torch.nn as nn
from torch.nn import functional as F
import math
import numpy as np
import time
import random
import sys
import os
import subprocess

# ==============================================================================
# PART 0: Environment & Data Setup (Automated)
# ==============================================================================

device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f"--- QSFPGA SYSTEM STARTUP ---")
print(f"Compute Device: {device}")

# 1. Download Data if missing
data_dir = "mathematics_dataset-v1.0"
if not os.path.exists(data_dir):
    print("Downloading Mathematics Dataset...")
    subprocess.run(["curl", "-s", "-O", "https://storage.googleapis.com/mathematics-dataset/mathematics_dataset-v1.0.tar.gz"])
    print("Extracting...")
    subprocess.run(["tar", "-xzf", "mathematics_dataset-v1.0.tar.gz"])
else:
    print("Dataset already present.")

# 2. Load Specific Module (Arithmetic)
TRAIN_FILE_PATH = f"{data_dir}/train-easy/arithmetic__add_or_sub.txt"

def load_data(filepath, holdout=200):
    if not os.path.exists(filepath):
        print(f"Error: File {filepath} not found. Check download."); sys.exit()

    with open(filepath, 'r', encoding='utf-8') as f:
        lines = f.readlines()

    # Clean trailing newlines
    if len(lines) % 2 != 0: lines = lines[:-1]

    pairs = []
    for i in range(0, len(lines), 2):
        q = lines[i].strip()
        a = lines[i+1].strip()
        pairs.append((q, a))

    # Split
    test_pairs = pairs[-holdout:]
    train_pairs = pairs[:-holdout]

    # Flatten train for tokenizer
    text_data = ""
    for q, a in train_pairs:
        text_data += f"{q}\n{a}\n"

    return text_data, test_pairs

print("Loading Data...")
train_text, test_qa_pairs = load_data(TRAIN_FILE_PATH)
print(f"Training Samples: {len(train_text.splitlines())//2}")
print(f"Test Samples:     {len(test_qa_pairs)}")

# 3. Tokenizer
chars = sorted(list(set(train_text)))
vocab_size = len(chars)
stoi = { ch:i for i,ch in enumerate(chars) }
itos = { i:ch for i,ch in enumerate(chars) }
encode = lambda s: [stoi[c] for c in s]
decode = lambda l: ''.join([itos[i] for i in l])

# Prepare Tensor Data
data_tensor = torch.tensor(encode(train_text), dtype=torch.long)
n_split = int(0.9*len(data_tensor))
train_data = data_tensor[:n_split]
val_data = data_tensor[n_split:]

def get_batch(split, block_size, batch_size):
    d = train_data if split == 'train' else val_data
    ix = torch.randint(len(d) - block_size, (batch_size,))
    x = torch.stack([d[i:i+block_size] for i in ix])
    y = torch.stack([d[i+1:i+block_size+1] for i in ix])
    return x.to(device), y.to(device)

# ==============================================================================
# PART 1: The picoTransformer Model
# ==============================================================================

class CausalSelfAttention(nn.Module):
    def __init__(self, config):
        super().__init__()
        assert config['n_embd'] % config['n_head'] == 0
        self.c_attn = nn.Linear(config['n_embd'], 3 * config['n_embd'])
        self.c_proj = nn.Linear(config['n_embd'], config['n_embd'])
        self.attn_dropout = nn.Dropout(config['dropout'])
        self.resid_dropout = nn.Dropout(config['dropout'])
        self.n_head = config['n_head']
        self.n_embd = config['n_embd']
        self.register_buffer("bias", torch.tril(torch.ones(config['block_size'], config['block_size']))
                                     .view(1, 1, config['block_size'], config['block_size']))

    def forward(self, x):
        B, T, C = x.size()
        q, k, v  = self.c_attn(x).split(self.n_embd, dim=2)
        k = k.view(B, T, self.n_head, C // self.n_head).transpose(1, 2)
        q = q.view(B, T, self.n_head, C // self.n_head).transpose(1, 2)
        v = v.view(B, T, self.n_head, C // self.n_head).transpose(1, 2)

        att = (q @ k.transpose(-2, -1)) * (1.0 / math.sqrt(k.size(-1)))
        att = att.masked_fill(self.bias[:,:,:T,:T] == 0, float('-inf'))
        att = F.softmax(att, dim=-1)
        att = self.attn_dropout(att)
        y = att @ v
        y = y.transpose(1, 2).contiguous().view(B, T, C)
        y = self.resid_dropout(self.c_proj(y))
        return y

class Block(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.ln_1 = nn.LayerNorm(config['n_embd'])
        self.sa = CausalSelfAttention(config)
        self.ln_2 = nn.LayerNorm(config['n_embd'])
        self.mlp = nn.Sequential(
            nn.Linear(config['n_embd'], 4 * config['n_embd']),
            nn.GELU(),
            nn.Linear(4 * config['n_embd'], config['n_embd']),
            nn.Dropout(config['dropout']),
        )

    def forward(self, x):
        x = x + self.sa(self.ln_1(x))
        x = x + self.mlp(self.ln_2(x))
        return x

class PicoTransformer(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.config = config
        self.transformer = nn.ModuleDict(dict(
            wte = nn.Embedding(config['vocab_size'], config['n_embd']),
            wpe = nn.Embedding(config['block_size'], config['n_embd']),
            drop = nn.Dropout(config['dropout']),
            h = nn.ModuleList([Block(config) for _ in range(config['n_layer'])]),
            ln_f = nn.LayerNorm(config['n_embd']),
        ))
        self.lm_head = nn.Linear(config['n_embd'], config['vocab_size'], bias=False)

        # Weight tying
        self.transformer.wte.weight = self.lm_head.weight
        self.apply(self._init_weights)

    def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if module.bias is not None:
                torch.nn.init.zeros_(module.bias)
        elif isinstance(module, nn.Embedding):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
        elif isinstance(module, nn.LayerNorm):
            torch.nn.init.zeros_(module.bias)
            torch.nn.init.ones_(module.weight)

    def forward(self, idx, targets=None):
        B, T = idx.size()
        pos = torch.arange(0, T, dtype=torch.long, device=device)

        tok_emb = self.transformer.wte(idx)
        pos_emb = self.transformer.wpe(pos)
        x = self.transformer.drop(tok_emb + pos_emb)
        for block in self.transformer.h:
            x = block(x)
        x = self.transformer.ln_f(x)
        logits = self.lm_head(x)

        loss = None
        if targets is not None:
            loss = F.cross_entropy(logits.view(-1, logits.size(-1)), targets.view(-1), ignore_index=-1)
        return logits, loss

    @torch.no_grad()
    def generate(self, idx, max_new_tokens, end_token_id=None):
        for _ in range(max_new_tokens):
            idx_cond = idx if idx.size(1) <= self.config['block_size'] else idx[:, -self.config['block_size']:]
            logits, _ = self(idx_cond)
            logits = logits[:, -1, :]
            probs = F.softmax(logits, dim=-1)
            idx_next = torch.multinomial(probs, num_samples=1)
            idx = torch.cat((idx, idx_next), dim=1)
            if end_token_id and idx_next.item() == end_token_id:
                break
        return idx

# ==============================================================================
# PART 2: Automated Architecture Search (Neural Architecture Search)
# ==============================================================================

def automated_judge(question, model_answer, correct_answer):
    """
    Replaces the Human.
    Checks if the Model Answer numerically matches the Correct Answer.
    """
    # 1. Clean strings
    ma_clean = model_answer.strip().replace('\n', '')
    ca_clean = correct_answer.strip().replace('\n', '')

    # 2. Exact string match
    if ma_clean == ca_clean:
        return 10.0

    # 3. Numerical Fallback (in case of extra spaces)
    try:
        if abs(float(ma_clean) - float(ca_clean)) < 1e-4:
            return 10.0
    except:
        pass

    # 4. Penalty for wrong answer
    return 1.0

def evaluate_fitness_autonomous(config, iterations=500):
    """Trains a model briefly and auto-scores it."""

    # Init Model
    model = PicoTransformer(config).to(device)
    optimizer = torch.optim.AdamW(model.parameters(), lr=config['lr'])

    # Quick Train
    model.train()
    for _ in range(iterations):
        xb, yb = get_batch('train', config['block_size'], config['batch_size'])
        _, loss = model(xb, yb)
        optimizer.zero_grad(set_to_none=True)
        loss.backward()
        optimizer.step()

    # Evaluation (Test on 5 holdout problems)
    model.eval()
    total_score = 0
    num_tests = 5
    end_token = stoi.get('\n', None) # Stop at newline

    # Context window logic
    for _ in range(num_tests):
        q, a = random.choice(test_qa_pairs)
        # Format: "Question\n" -> Model expects to generate Answer
        prompt = f"{q}\n"
        context = torch.tensor(encode(prompt), dtype=torch.long, device=device).unsqueeze(0)

        try:
            out_ids = model.generate(context, max_new_tokens=15, end_token_id=end_token)[0].tolist()
            generated = decode(out_ids[len(encode(prompt)):])
            score = automated_judge(q, generated, a)
        except Exception as e:
            score = 0.0 # Crash penalty

        total_score += score

    avg_score = total_score / num_tests
    # Inverse Cost: High Score (10) -> Low Cost (-10)
    return -avg_score

# Define Search Space (4 Qubits = 16 States)
n_qubits_nas = 4
bitstrings = [np.binary_repr(i, width=n_qubits_nas) for i in range(2**n_qubits_nas)]

def decode_arch(b):
    # Mapping bits to hyperparameters
    # Bits 0-1: Size
    size_map = {
        '00': {'n_layer': 2, 'n_embd': 64, 'n_head': 4},    # Tiny
        '01': {'n_layer': 4, 'n_embd': 64, 'n_head': 4},    # Small
        '10': {'n_layer': 4, 'n_embd': 128, 'n_head': 8},   # Medium
        '11': {'n_layer': 6, 'n_embd': 128, 'n_head': 8}    # Large
    }
    cfg = size_map[b[0:2]].copy()

    # Bit 2: Block Size (Context)
    cfg['block_size'] = 64 if b[2] == '0' else 128

    # Bit 3: Learning Rate
    cfg['lr'] = 1e-3 if b[3] == '0' else 5e-4

    # Fixed params
    cfg['vocab_size'] = vocab_size
    cfg['batch_size'] = 32
    cfg['dropout'] = 0.1
    cfg['name'] = f"Arch-{b}"
    return cfg

configs = [decode_arch(b) for b in bitstrings]

print("\n--- PHASE 1: COLLECTING FITNESS LANDSCAPE (AUTONOMOUS) ---")
costs = []
for i, cfg in enumerate(configs):
    print(f"[{i+1}/{len(configs)}] Assessing {cfg['name']}...", end="")
    cost = evaluate_fitness_autonomous(cfg, iterations=200) # Fast check
    costs.append(cost)
    print(f" Cost: {cost:.2f}")

# ==============================================================================
# PART 3: QuILT (Quantum Optimization) - CORRECTED
# ==============================================================================

class VQESelector(nn.Module):
    def __init__(self, n_qubits, depth=2):
        super().__init__()
        self.n_qubits = n_qubits
        self.depth = depth
        # Learnable Rotation Angles
        self.theta = nn.Parameter(torch.rand(depth, n_qubits, 2) * 2 * math.pi)

    def forward(self):
        # Digital Twin of the FPGA Hardware Logic
        psi = torch.zeros(2**self.n_qubits, dtype=torch.cfloat, device=device)
        psi[0] = 1.0

        # Identity for tensor expansion
        I = torch.eye(2, device=device, dtype=torch.cfloat)

        for d in range(self.depth):
            for q in range(self.n_qubits):
                # --- APPLY RY ---
                ang_y = self.theta[d, q, 0]
                cy = torch.cos(ang_y / 2)
                sy = torch.sin(ang_y / 2)

                # FIX: Use torch.stack instead of torch.tensor to keep gradients alive
                row0 = torch.stack([cy, -sy])
                row1 = torch.stack([sy, cy])
                mat_y = torch.stack([row0, row1]).to(torch.cfloat)

                # Expand to full system
                lst = [I] * self.n_qubits
                lst[q] = mat_y
                op = lst[0]
                for k in range(1, self.n_qubits):
                    op = torch.kron(op, lst[k])
                psi = op @ psi

                # --- APPLY RZ ---
                ang_z = self.theta[d, q, 1]
                val = ang_z / 2

                # FIX: Construct complex exponentials carefully
                # e^(-ix) = cos(x) - i*sin(x)
                c_val = torch.cos(val)
                s_val = torch.sin(val)

                # Diagonal elements
                elem0 = torch.complex(c_val, -s_val)
                elem1 = torch.complex(c_val, s_val)
                mat_z = torch.diag(torch.stack([elem0, elem1]))

                lst = [I] * self.n_qubits
                lst[q] = mat_z
                op = lst[0]
                for k in range(1, self.n_qubits):
                    op = torch.kron(op, lst[k])
                psi = op @ psi

        return psi

print("\n--- PHASE 2: QuILT OPTIMIZATION ---")

# NOTE: If all costs are identical (e.g., -1.0 because training was too short on CPU),
# we add tiny noise to valid_costs to demonstrate the VQE's ability to seek a minimum.
# Otherwise VQE has no gradient to follow on a perfectly flat landscape.
costs_tensor = torch.tensor(costs, dtype=torch.cfloat, device=device)
if torch.std(costs_tensor.real) == 0:
    print("Notice: Fitness landscape is flat (models need more training time). Injecting microspectrum noise for demo.")
    noise = torch.randn_like(costs_tensor.real) * 0.01
    hamiltonian = torch.diag(costs_tensor + noise)
else:
    hamiltonian = torch.diag(costs_tensor)

vqe = VQESelector(n_qubits_nas, depth=3).to(device)
opt_vqe = torch.optim.Adam(vqe.parameters(), lr=0.1)

print("Optimizing Quantum State...")
for epoch in range(101):
    opt_vqe.zero_grad()
    state = vqe()
    # Minimize Expected Cost <psi|H|psi>
    energy = torch.real(torch.vdot(state, hamiltonian @ state))
    energy.backward()
    opt_vqe.step()

    if epoch % 20 == 0:
        print(f"VQE Epoch {epoch}: System Energy = {energy.item():.4f}")

# ==============================================================================
# PART 4: RESULT
# ==============================================================================

final_state = vqe().detach()
probs = (final_state.abs()**2).cpu().numpy()
best_idx = np.argmax(probs)
winner_config = configs[best_idx]
winner_bitstring = bitstrings[best_idx]

print("\n" + "="*50)
print(f"  OPTIMAL ARCHITECTURE FOUND: {winner_config['name']}")
print("="*50)
print(f"Quantum Probability: {probs[best_idx]:.4f}")
print(f"Configuration Specs:")
print(f"   Layers:     {winner_config['n_layer']}")
print(f"   Embed Dim:  {winner_config['n_embd']}")
print(f"   Heads:      {winner_config['n_head']}")
print(f"   Block Size: {winner_config['block_size']}")
print(f"   LR:         {winner_config['lr']}")
print("-" * 50)

# ==============================================================================
# PART 4: RESULT
# ==============================================================================

final_state = vqe().detach()
probs = (final_state.abs()**2).cpu().numpy()
best_idx = np.argmax(probs)
winner_config = configs[best_idx]
winner_bitstring = bitstrings[best_idx]

print("\n" + "="*50)
print(f"  OPTIMAL ARCHITECTURE FOUND: {winner_config['name']}")
print("="*50)
print(f"Quantum Probability: {probs[best_idx]:.4f}")
print(f"Configuration Specs:")
print(f"   Layers:     {winner_config['n_layer']}")
print(f"   Embed Dim:  {winner_config['n_embd']}")
print(f"   Heads:      {winner_config['n_head']}")
print(f"   Block Size: {winner_config['block_size']}")
print(f"   LR:         {winner_config['lr']}")
print("-" * 50)
print("Ready for Full Scale Training on chosen architecture.")

--- QSFPGA SYSTEM STARTUP ---
Compute Device: cpu
Downloading Mathematics Dataset...
Extracting...
Loading Data...
Training Samples: 666466
Test Samples:     200

--- PHASE 1: COLLECTING FITNESS LANDSCAPE (AUTONOMOUS) ---
[1/16] Assessing Arch-0000... Cost: -1.00
[2/16] Assessing Arch-0001... Cost: -1.00
[3/16] Assessing Arch-0010... Cost: -1.00
[4/16] Assessing Arch-0011... Cost: -1.00
[5/16] Assessing Arch-0100... Cost: -1.00
[6/16] Assessing Arch-0101... Cost: -1.00
[7/16] Assessing Arch-0110... Cost: -1.00
[8/16] Assessing Arch-0111... Cost: -1.00
[9/16] Assessing Arch-1000... Cost: -1.00
[10/16] Assessing Arch-1001... Cost: -1.00
[11/16] Assessing Arch-1010... Cost: -1.00
[12/16] Assessing Arch-1011... Cost: -1.00
[13/16] Assessing Arch-1100... Cost: -1.00
[14/16] Assessing Arch-1101... Cost: -1.00
[15/16] Assessing Arch-1110... Cost: -1.00
[16/16] Assessing Arch-1111... Cost: -1.00

--- PHASE 2: QuILT OPTIMIZATION ---
Notice: Fitness landscape is flat (models need more training t

In [3]:
import torch
import torch.nn as nn
from torch.nn import functional as F
import math
import time
import random

# ==============================================================================
# CONFIGURATION: THE WINNER (Arch-1001)
# ==============================================================================
# Derived from your QuILT Output
config = {
    'name': 'Arch-1001 (QuILT Winner)',
    'n_layer': 4,
    'n_embd': 128,
    'n_head': 8,
    'block_size': 64,
    'vocab_size': vocab_size, # Inherited from previous cell
    'batch_size': 64,         # Increased for stability
    'dropout': 0.1,
    'lr': 0.0005              # The specific LR found by QuILT
}

print(f"--- STARTING FULL SCALE TRAINING ---")
print(f"Model: {config['name']}")
print(f"Hyperparams: {config}")

# ==============================================================================
# MODEL SETUP (Re-instantiating the precise architecture)
# ==============================================================================

model = PicoTransformer(config).to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=config['lr'])

# Calculate Parameters
n_params = sum(p.numel() for p in model.parameters())
print(f"Parameter Count: {n_params/1e6:.2f}M")

# ==============================================================================
# TRAINING LOOP (Extended)
# ==============================================================================

max_iters = 3000  # Enough to learn basic arithmetic patterns
eval_interval = 500

start_time = time.time()
losses = []

model.train()
for iter_num in range(max_iters):
    # 1. Get Batch
    xb, yb = get_batch('train', config['block_size'], config['batch_size'])

    # 2. Forward / Backward
    logits, loss = model(xb, yb)
    optimizer.zero_grad(set_to_none=True)
    loss.backward()
    optimizer.step()

    # 3. Logging
    if iter_num % eval_interval == 0:
        elapsed = time.time() - start_time
        print(f"Iter {iter_num}: Loss {loss.item():.4f} (Time: {elapsed:.1f}s)")
        losses.append(loss.item())

print(f"--- TRAINING COMPLETE in {time.time()-start_time:.1f}s ---")

# ==============================================================================
# FINAL EXAM: TESTING MATHEMATICAL COMPETENCE
# ==============================================================================

print("\n--- FINAL EXAM: DEEPMIND MATHEMATICS TEST ---")
model.eval()

score = 0
num_questions = 10
end_token = stoi.get('\n', None)

for i in range(num_questions):
    # Pick random test question
    q, correct_a = random.choice(test_qa_pairs)

    # Prompting
    prompt = f"{q}\n"
    context = torch.tensor(encode(prompt), dtype=torch.long, device=device).unsqueeze(0)

    # Generation
    try:
        # Generate until newline or max tokens
        out_ids = model.generate(context, max_new_tokens=15, end_token_id=end_token)[0].tolist()
        # Decode only the new part
        full_text = decode(out_ids)
        model_answer = full_text[len(prompt):].strip()

        # Grading
        is_correct = (model_answer == correct_answer)
        mark = "✅" if is_correct else "❌"
        if is_correct: score += 1

        print(f"Q: {q}")
        print(f"   Target: {correct_answer}")
        print(f"   Model:  {model_answer} {mark}")
        print("-" * 30)

    except Exception as e:
        print(f"Error generating for: {q}")

print(f"\nFINAL SCORE: {score}/{num_questions} ({(score/num_questions)*100}%)")

if score > 0:
    print("SUCCESS: The Quantum-Selected Architecture has learned to calculate!")
    print("This validates the entire QSFPGA -> QuILT -> Neural Network pipeline.")
else:
    print("RESULT: Model needs more training time (increase max_iters to 10k+ for high accuracy).")

--- STARTING FULL SCALE TRAINING ---
Model: Arch-1001 (QuILT Winner)
Hyperparams: {'name': 'Arch-1001 (QuILT Winner)', 'n_layer': 4, 'n_embd': 128, 'n_head': 8, 'block_size': 64, 'vocab_size': 43, 'batch_size': 64, 'dropout': 0.1, 'lr': 0.0005}
Parameter Count: 0.81M
Iter 0: Loss 3.7768 (Time: 1.7s)
Iter 500: Loss 1.2496 (Time: 417.2s)
Iter 1000: Loss 1.1801 (Time: 830.8s)
Iter 1500: Loss 1.1165 (Time: 1242.1s)
Iter 2000: Loss 1.0565 (Time: 1656.7s)
Iter 2500: Loss 1.0602 (Time: 2074.0s)
--- TRAINING COMPLETE in 2485.9s ---

--- FINAL EXAM: DEEPMIND MATHEMATICS TEST ---
Error generating for: Add 10295.5 and 1.
Error generating for: What is 3 less than -506?
Error generating for: Total of 5.67 and -9.
Error generating for: -2404 - -1.2
Error generating for: Put together -0.5 and 192.
Error generating for: Work out 54426 - 0.8.
Error generating for: Work out -1.2 - 6.
Error generating for: What is 0 + -171?
Error generating for: What is the difference between -0.4 and 260.4?
Error genera