In [1]:
# First cell: Install dependencies (run once — fast on RunPod H100)
!pip install torch matplotlib numpy

# Second cell: The sim code (fixed — no backward error)
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.amp
from torch.utils.checkpoint import checkpoint
import numpy as np
from contextlib import nullcontext
import math

torch.cuda.empty_cache()

device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f"Using device: {device}")

# CONFIG – optimized for H100 speed (fast epochs)
triality = 3
dim = 384
latent_dim = 8
seq_len = 1024
batch_size = 32
epochs = 20000
lr = 5e-5
use_amp = True
use_checkpoint = True

# Synthetic CRISPR gRNA proxy
alphabet = torch.tensor([0, 1, 2, 3], device=device)
gRNAs = []
for b in range(batch_size):
    base = torch.sin(torch.linspace(0, 10*math.pi, seq_len, device=device)) * 1.5 + 2
    base = base % 4
    gRNA = base.long() + torch.randint(-1, 2, (seq_len,), device=device)
    gRNA = torch.clamp(gRNA, 0, 3)
    gRNAs.append(gRNA)

gRNAs = torch.stack(gRNAs).to(device)

embed = nn.Embedding(4, dim).to(device)
clean_data = embed(gRNAs)

missing_rate = torch.linspace(0.4, 0.7, batch_size, device=device).view(batch_size, 1, 1)
mask = torch.rand_like(clean_data) < missing_rate
real_data = clean_data.clone()
real_data[mask] = 0
real_data = real_data.detach() # Ensure real_data is a leaf tensor for the training loop

target = gRNAs

# E8 roots – precompute
def get_e8_roots():
    roots = []
    for i in range(8):
        for j in range(i+1, 8):
            for signs in [(1,1), (1,-1), (-1,1), (-1,-1)]:
                v = torch.zeros(8)
                v[i] = signs[0]; v[j] = signs[1]
                roots.append(v); roots.append(-v)
    for signs in range(1 << 8):
        v = torch.tensor([(1 if (signs & (1<<k)) else -1) for k in range(8)], dtype=torch.float32) * 0.5
        if bin(signs).count('1') % 2 == 0:
            roots.append(v); roots.append(-v)
    roots = torch.stack(roots[:240])
    return roots / roots.norm(dim=-1, keepdim=True)

e8_roots = get_e8_roots().to(device)

# Triality Cycle Block (detached pump scalar)
class CRISPRCycleBlock(nn.Module):
    def __init__(self):
        super().__init__()
        self.proj = nn.Linear(latent_dim, dim // triality, bias=False)
        self.register_buffer('roots', e8_roots)

    def forward(self, x, step):
        pos_emb = self.roots[torch.arange(x.shape[1], device=device) % 240]
        low_dim = self.proj(pos_emb)
        emb = low_dim.repeat(1, triality)
        # Detached pump scalar
        with torch.no_grad():
            pump_scalar = 0.8 * math.sin(step * 0.006 * 2 * math.pi)
        pump = torch.full((1, x.shape[1], 1), pump_scalar, device=device)
        emb_broadcast = emb.unsqueeze(0)
        x_rot1 = x * (emb_broadcast.cos() + pump)
        x_rot2 = torch.roll(x_rot1, shifts=1, dims=1) * emb_broadcast.sin()
        x_rot3 = torch.roll(x_rot2, shifts=1, dims=1) * emb_broadcast.cos()
        fused = (x_rot1 + x_rot2 + x_rot3) / triality
        return fused

# Dummy cycle for ablation
class DummyCycle(nn.Module):
    def forward(self, x, step=None):
        return x

# Model with ablation support
class E8CRISPRFusion(nn.Module):
    def __init__(self, depth=32, use_triality=True):
        super().__init__()
        self.use_triality = use_triality
        self.cycle = CRISPRCycleBlock() if use_triality else DummyCycle()
        self.layers = nn.ModuleList([nn.MultiheadAttention(dim, triality if use_triality else 8, batch_first=True) for _ in range(depth)])
        self.norm = nn.LayerNorm(dim)
        self.head = nn.Linear(dim, 4)

    def forward(self, x, step):
        x = self.cycle(x, step)
        for layer in self.layers:
            if use_checkpoint:
                attn, _ = checkpoint(layer, x, x, x, use_reentrant=False)
            else:
                attn, _ = layer(x, x, x)
            x = x + self.norm(attn)
        return self.head(x)

# Models
model = E8CRISPRFusion(use_triality=True).to(device)
model_ablation = E8CRISPRFusion(use_triality=False).to(device)

opt = torch.optim.AdamW(model.parameters(), lr=lr)
scaler = torch.amp.GradScaler('cuda') if use_amp else nullcontext()

opt_ablation = torch.optim.AdamW(model_ablation.parameters(), lr=lr)
scaler_ablation = torch.amp.GradScaler('cuda') if use_amp else nullcontext()

loss_fn = nn.CrossEntropyLoss()

loss_hist = []
loss_abl_hist = []

for epoch in range(epochs):
    # --- Training for model (with triality) ---
    opt.zero_grad(set_to_none=True)
    with torch.amp.autocast(device_type='cuda', dtype=torch.float16) if use_amp else nullcontext():
        logits = model(real_data, epoch)
        loss = loss_fn(logits.view(-1, 4), target.view(-1))

    scaler.scale(loss).backward(retain_graph=True) if use_amp else loss.backward(retain_graph=True)
    scaler.unscale_(opt) if use_amp else None
    torch.nn.utils.clip_grad_norm_(model.parameters(), 1e6)
    scaler.step(opt) if use_amp else opt.step()
    scaler.update() if use_amp else None

    # --- Training for model_ablation (without triality) ---
    opt_ablation.zero_grad(set_to_none=True)
    with torch.amp.autocast(device_type='cuda', dtype=torch.float16) if use_amp else nullcontext():
        logits_abl = model_ablation(real_data, epoch) # Pass real_data again to create a fresh graph
        loss_abl = loss_fn(logits_abl.view(-1, 4), target.view(-1))

    scaler_ablation.scale(loss_abl).backward() if use_amp else loss_abl.backward()
    scaler_ablation.unscale_(opt_ablation) if use_amp else None
    torch.nn.utils.clip_grad_norm_(model_ablation.parameters(), 1e6)
    scaler_ablation.step(opt_ablation) if use_amp else opt_ablation.step()
    scaler_ablation.update() if use_amp else None

    loss_hist.append(loss.item())
    loss_abl_hist.append(loss_abl.item())

    if epoch % 500 == 0:
        print(f"Epoch {epoch} | Triality Loss {loss.item():.6f} | Ablation Loss {loss_abl.item():.6f}")

# Final Sigma Test
triality_mean = np.mean(loss_hist)
abl_mean = np.mean(loss_abl_hist)
std = np.std(loss_hist + loss_abl_hist)
sigma = (abl_mean - triality_mean) / std if std > 0 else 0

print(f"Final Sigma (Triality vs Ablation): {sigma:.2f} (higher = triality advantage)")

print("Sim complete — epochs + sigma test done")

Using device: cuda
Epoch 0 | Triality Loss 1.739608 | Ablation Loss 3.377460
Epoch 500 | Triality Loss 0.000136 | Ablation Loss 0.000211
Epoch 1000 | Triality Loss 0.000044 | Ablation Loss 0.000051
Epoch 1500 | Triality Loss 0.000007 | Ablation Loss 0.000025
Epoch 2000 | Triality Loss 0.000025 | Ablation Loss 0.000015
Epoch 2500 | Triality Loss 0.000102 | Ablation Loss 0.000009
Epoch 3000 | Triality Loss 0.000002 | Ablation Loss 0.000006
Epoch 3500 | Triality Loss 0.000001 | Ablation Loss 0.000005
Epoch 4000 | Triality Loss 0.000001 | Ablation Loss 0.000004
Epoch 4500 | Triality Loss 0.000001 | Ablation Loss 0.000002
Epoch 5000 | Triality Loss 0.000000 | Ablation Loss 0.000002
Epoch 5500 | Triality Loss 0.000000 | Ablation Loss 0.000001
Epoch 6000 | Triality Loss 0.000000 | Ablation Loss 0.000001
Epoch 6500 | Triality Loss 0.000000 | Ablation Loss 0.000001
Epoch 7000 | Triality Loss 0.000000 | Ablation Loss 0.000001
Epoch 7500 | Triality Loss 0.000000 | Ablation Loss 0.000001
Epoch 800