<a href="https://colab.research.google.com/github/jonathanrbelanger-lang/Exorobourii.com/blob/main/JANUS.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Project Janus: HTL/HTN Prototype

In [None]:
# --- CELL 1: SETUP, COMPILATION, AND PHYSICS FOUNDATION ---

import torch
import numpy as np
from scipy.linalg import eigh
import os
import json
from google.colab import drive
import warnings

# Suppress warnings related to complex number handling or minor library issues
warnings.filterwarnings("ignore", category=RuntimeWarning)

# --- A. Environment Setup & Compilation Check ---

# ELI12: First, we install necessary Python tools and the llama.cpp dependency.
print("Installing Python dependencies...")
!pip install -q -U transformers accelerate sentencepiece llama-cpp-python
# NOTE: llama-cpp-python is installed here to support later GGUF loading/inference


# ELI12: Next, we download the entire 'llama.cpp' project from GitHub.
print("Cloning llama.cpp repository...")
!git clone https://github.com/ggerganov/llama.cpp.git


# ELI12: Compile the C++ tools using CMake.
print("Configuring and compiling C++ tools...")
!cd llama.cpp && cmake -B build
!cd llama.cpp && cmake --build build --config Release


# ELI12: Install Python helper libraries required by llama.cpp scripts.
print("Installing llama.cpp Python requirements...")
!cd llama.cpp && pip install -r requirements.txt


# ELI12: CRITICAL ROBUSTNESS CHECK: Verify the key executable exists.
QUANTIZE_PATH_CHECK = 'llama.cpp/build/bin/llama-quantize'

if os.path.exists(QUANTIZE_PATH_CHECK):
    print(f"✅ Compilation successful. Quantize tool found at: {QUANTIZE_PATH_CHECK}")
else:
    raise RuntimeError("❌ [FATAL SETUP ERROR] llama-quantize executable not found. Compilation failed.")


# ELI12: Finally, we import the Python libraries we'll use in this notebook.
drive.mount('/content/drive')
print("✅ Google Drive mounted.")


# --- B. Physics Utility Module Definition ---

# Constants derived from Phase 1 validation
E_STATE = -60.0
E_TRANS = -2.00
D_LOCAL = 16 # Local Hilbert Space Dimension

def index_to_bits(k):
    """Converts index k (0-15) to (b3, b2, b1, b0)."""
    b0 = k % 2
    b1 = (k // 2) % 2
    b2 = (k // 4) % 2
    b3 = (k // 8) % 2
    return np.array([b3, b2, b1, b0])

def generate_OP_matrix():
    """Generates the 16x16 matrix for O_P = sum(sigma_z_i)."""
    P = np.zeros((D_LOCAL, D_LOCAL), dtype=np.float64)
    for k in range(D_LOCAL):
        bits = index_to_bits(k)
        diag_value = np.sum((-1)**bits)
        P[k, k] = diag_value
    return P

def generate_OF_matrix():
    """Generates the 16x16 matrix for O'_F = sigma^x_0 * sigma^z_1 * sigma^z_2 * sigma^z_3."""
    F = np.zeros((D_LOCAL, D_LOCAL), dtype=np.float64)
    for k in range(D_LOCAL):
        bits = index_to_bits(k)
        b3, b2, b1, b0 = bits

        if b0 == 0:
            k_prime = k + 1
            sign = (-1)**(b1 + b2 + b3)
            F[k, k_prime] = sign
            F[k_prime, k] = sign
    return F

# --- CALCULATE AND STORE THE GROUND STATE VECTOR |psi_0> ---
P_matrix = generate_OP_matrix()
F_matrix = generate_OF_matrix()
H_site_np = E_STATE * P_matrix + E_TRANS * F_matrix

# Diagonalize to find the ground state (E_values sorted ascendingly)
E_values, E_vectors = eigh(H_site_np)
PSI_0_NP = E_vectors[:, 0] # The ground state vector (NumPy)

# Convert to PyTorch Tensor (Complex type is essential for general quantum state representation)
PSI_0_TORCH = torch.tensor(PSI_0_NP, dtype=torch.cfloat)

# --- Utility Functions for ML Layer ---
def normalize_input_to_state(v_patch):
    """Normalizes a 16-element input vector to represent a quantum state vector."""
    norm = torch.linalg.norm(v_patch)
    if norm == 0:
        return torch.ones_like(v_patch) / np.sqrt(16)
    return v_patch / norm

def calculate_projection_s(psi_in, psi_0):
    """Calculates the physics-derived feature s for the HTL: s = |<psi_0 | psi_in>|^2"""
    # Ensure inputs are complex for correct inner product
    overlap = torch.vdot(psi_0.to(psi_in.dtype), psi_in)
    return torch.abs(overlap)**2

print("\n✅ Physics Foundation Established. $|\psi_0\rangle$ loaded as PyTorch Tensor.")
print(f"Ground State Energy E0: {E_values[0]:.4f}")

In [None]:
# ============================================================================
# CELL 2: MNIST DATAMODULE WITH NOISE INJECTION (REVISED)
# ============================================================================
# This cell defines the data loading and augmentation pipeline.
# No changes were required by the audit.
# ============================================================================

# ----------------------------------------------------------------------------
# 1. IMPORT LIBRARIES
# ----------------------------------------------------------------------------
import torch
from torch.utils.data import DataLoader, random_split
from torchvision.datasets import MNIST
from torchvision import transforms
import pytorch_lightning as pl
import matplotlib.pyplot as plt
import numpy as np

# ----------------------------------------------------------------------------
# 2. CUSTOM TRANSFORM FOR PIXEL DROPOUT
# ----------------------------------------------------------------------------
class PixelDropout(torch.nn.Module):
    """
    Custom transform to apply pixel dropout noise.
    Sets a random fraction of pixels to 0.
    """
    def __init__(self, epsilon: float):
        """
        Args:
            epsilon (float): The fraction of pixels to drop (e.g., 0.2 for 20%).
                             Must be between 0.0 and 1.0.
        """
        super().__init__()
        if not 0.0 <= epsilon <= 1.0:
            raise ValueError(f"Epsilon must be between 0 and 1, but got {epsilon}")
        self.epsilon = epsilon

    def forward(self, img: torch.Tensor) -> torch.Tensor:
        """
        Applies the dropout to the input image tensor.
        """
        if self.epsilon == 0.0:
            return img

        # Create a random mask of the same shape as the image
        # bernoulli_(1 - epsilon) creates a mask where `1-epsilon` fraction of
        # elements are 1 (kept) and `epsilon` are 0 (dropped).
        mask = torch.empty_like(img).bernoulli_(1.0 - self.epsilon)

        return img * mask

    def __repr__(self) -> str:
        return f"{self.__class__.__name__}(epsilon={self.epsilon})"

# ----------------------------------------------------------------------------
# 3. PYTORCH LIGHTNING DATAMODULE
# ----------------------------------------------------------------------------
class MNISTDataModule(pl.LightningDataModule):
    """
    PyTorch Lightning DataModule for MNIST.
    Handles downloading, splitting, transformations, and data loading.
    Includes configurable pixel dropout noise.
    """
    def __init__(self,
                 data_dir: str = "./data",
                 batch_size: int = 256,
                 num_workers: int = 4,
                 noise_level: float = 0.0):
        """
        Args:
            data_dir (str): Directory to save the MNIST data.
            batch_size (int): Batch size for the data loaders.
            num_workers (int): Number of subprocesses for data loading.
            noise_level (float): Epsilon for PixelDropout in the training set.
        """
        super().__init__()
        self.data_dir = data_dir
        self.batch_size = batch_size
        self.num_workers = num_workers
        self.noise_level = noise_level
        self.save_hyperparameters() # Saves args to self.hparams

        # Define transformations
        # 1. Resize to 32x32 to get 1024 dimensions
        # 2. Convert to tensor
        # 3. Normalize to [0, 1] range
        # 4. Apply pixel dropout (for training set)
        self.transform_train = transforms.Compose([
            transforms.Resize((32, 32)),
            transforms.ToTensor(),
            PixelDropout(self.noise_level)
        ])
        self.transform_val_test = transforms.Compose([
            transforms.Resize((32, 32)),
            transforms.ToTensor()
        ])

    def prepare_data(self):
        """Download data if not present."""
        MNIST(self.data_dir, train=True, download=True)
        MNIST(self.data_dir, train=False, download=True)
        print("✅ MNIST data prepared.")

    def setup(self, stage: str = None):
        """
        Assign train/val/test datasets for use in dataloaders.
        This is called on every GPU in DDP.
        """
        if stage == "fit" or stage is None:
            mnist_full = MNIST(self.data_dir, train=True, transform=self.transform_train)
            self.mnist_train, self.mnist_val = random_split(mnist_full, [55000, 5000])
            # Override validation transform to be clean (no noise)
            self.mnist_val.dataset.transform = self.transform_val_test

        if stage == "test" or stage is None:
            self.mnist_test = MNIST(self.data_dir, train=False, transform=self.transform_val_test)

        print("✅ MNIST datasets assigned for train, validation, and test.")

    def train_dataloader(self):
        return DataLoader(
            self.mnist_train,
            batch_size=self.batch_size,
            num_workers=self.num_workers,
            pin_memory=True, # A100 optimization
            shuffle=True,
            persistent_workers=True if self.num_workers > 0 else False
        )

    def val_dataloader(self):
        return DataLoader(
            self.mnist_val,
            batch_size=self.batch_size,
            num_workers=self.num_workers,
            pin_memory=True, # A100 optimization
            persistent_workers=True if self.num_workers > 0 else False
        )

    def test_dataloader(self):
        return DataLoader(
            self.mnist_test,
            batch_size=self.batch_size,
            num_workers=self.num_workers,
            pin_memory=True, # A100 optimization
            persistent_workers=True if self.num_workers > 0 else False
        )

# ----------------------------------------------------------------------------
# 4. VERIFICATION AND DEBUG DUMP
# ----------------------------------------------------------------------------
def verify_datamodule(datamodule: MNISTDataModule, debug_dir: Path):
    """
    Visualizes a sample batch to verify transformations and noise.
    Saves the visualization as a debug artifact.
    """
    print("\n" + "=" * 80)
    print("Verifying DataModule and creating debug dump...")
    print("=" * 80)

    # Prepare and set up the datamodule
    datamodule.prepare_data()
    datamodule.setup("fit")

    # Get one batch from the training loader
    train_loader = datamodule.train_dataloader()
    images, labels = next(iter(train_loader))

    print(f"✅ Loaded one batch of data.")
    print(f"  - Image batch shape: {images.shape}")
    print(f"  - Label batch shape: {labels.shape}")

    # Check image dimensions
    img_h, img_w = images.shape[2], images.shape[3]
    if img_h != 32 or img_w != 32:
        print(f"❌ ERROR: Images are not 32x32. Found {img_h}x{img_w}.", file=sys.stderr)
        return

    # Create a clean version of the same images for comparison
    clean_transform = transforms.Compose([
        transforms.Resize((32, 32)),
        transforms.ToTensor()
    ])
    clean_dataset = MNIST(datamodule.data_dir, train=True, transform=clean_transform)
    clean_loader = DataLoader(clean_dataset, batch_size=datamodule.batch_size)
    clean_images, _ = next(iter(clean_loader))

    # --- Visualization ---
    fig, axes = plt.subplots(4, 4, figsize=(10, 10))
    fig.suptitle(f"DataModule Verification\nTrain Noise (ε={datamodule.noise_level}) vs. Clean", fontsize=16)

    for i in range(8):
        # Plot clean image
        ax_clean = axes[i // 2, (i % 2) * 2]
        ax_clean.imshow(clean_images[i].squeeze(), cmap="gray")
        ax_clean.set_title(f"Clean (Label: {labels[i]})")
        ax_clean.axis("off")

        # Plot noisy image from the actual train loader
        ax_noisy = axes[i // 2, (i % 2) * 2 + 1]
        ax_noisy.imshow(images[i].squeeze(), cmap="gray")
        ax_noisy.set_title(f"Noisy (ε={datamodule.noise_level})")
        ax_noisy.axis("off")

    plt.tight_layout(rect=[0, 0.03, 1, 0.95])

    # Save the plot to the debug directory
    dump_path = debug_dir / "datamodule_verification.png"
    plt.savefig(dump_path)
    print(f"\n✅ Verification plot saved to: {dump_path}")
    plt.show()
    print("=" * 80)
    print("DataModule verification complete.")
    print("=" * 80)


# --- Main Execution ---
# Instantiate the DataModule with a moderate noise level for verification
# We will create new instances later for actual training runs.
data_module = MNISTDataModule(
    batch_size=256,
    num_workers=os.cpu_count() // 2, # Use a safe number of workers
    noise_level=0.3 # 30% pixel dropout for this test
)

# Run the verification and generate the debug dump
verify_datamodule(data_module, PROJECT_CONFIG["debug_dir"])

In [None]:
# ============================================================================
# CELL 3: BASE LIGHTNINGMODULE FOR MODEL ARCHITECTURE (REVISED)
# ============================================================================
# This cell defines the shared architecture and training logic for both
# the HTL and Dense models to ensure a fair comparison.
# ============================================================================

# ----------------------------------------------------------------------------
# 1. IMPORT LIBRARIES
# ----------------------------------------------------------------------------
import torch
import torch.nn as nn
import torch.nn.functional as F
import pytorch_lightning as pl
from torch.optim import AdamW
from torch.optim.lr_scheduler import CosineAnnealingLR
from torchinfo import summary
from typing import Dict, Any

# Import our custom physics module
import physics_utils

# ----------------------------------------------------------------------------
# 2. DEFINE THE BASE LIGHTNINGMODULE
# ----------------------------------------------------------------------------
class BaseLitModel(pl.LightningModule):
    """
    A base LightningModule that defines the shared architecture and logic.
    This class is designed to be inherited by specific model implementations
    (e.g., HTLModel, DenseModel).
    """
    def __init__(self,
                 learning_rate: float = 1e-3,
                 weight_decay: float = 1e-4,
                 total_steps: int = 10000):
        """
        Args:
            learning_rate (float): The peak learning rate for the optimizer.
            weight_decay (float): The weight decay for the AdamW optimizer.
            total_steps (int): Total training steps, used for the LR scheduler.
        """
        super().__init__()
        # Save hyperparameters for logging and checkpointing
        self.save_hyperparameters("learning_rate", "weight_decay")
        self.total_steps = total_steps

        # --- Architecture Definition ---
        # This architecture is designed to meet the ~132K parameter budget.

        # 1. Feature Extractor (placeholder to be defined in child classes)
        # This is the ONLY part that will differ between the HTL and Dense models.
        self.feature_extractor = None # Must be set by child class

        # 2. Classifier Head (IDENTICAL for both models)
        # This ensures a fair comparison of the feature extractors.
        # The input dimension (128) is chosen to balance the parameter counts.
        self.classifier_head = nn.Sequential(
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(128, 10) # 10 classes for MNIST
        )

        # --- Loss Function ---
        self.criterion = nn.CrossEntropyLoss()

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        Defines the forward pass of the model.

        Args:
            x (torch.Tensor): Input tensor of shape (N, 1, 32, 32)

        Returns:
            torch.Tensor: Output logits of shape (N, 10)
        """
        # Flatten the input image: (N, 1, 32, 32) -> (N, 1024)
        x = x.view(x.size(0), -1)

        # Pass through the specific feature extractor (HTL or Dense)
        features = self.feature_extractor(x)

        # Pass features through the common classifier head
        logits = self.classifier_head(features)

        return logits

    def _shared_step(self, batch: tuple, batch_idx: int) -> Dict[str, Any]:
        """
        Common logic for training, validation, and test steps.
        """
        x, y = batch
        logits = self(x)
        loss = self.criterion(logits, y)

        preds = torch.argmax(logits, dim=1)
        acc = (preds == y).float().mean()

        return {"loss": loss, "acc": acc}

    def training_step(self, batch: tuple, batch_idx: int) -> torch.Tensor:
        result = self._shared_step(batch, batch_idx)
        self.log('train_loss', result['loss'], on_step=True, on_epoch=True, prog_bar=True, logger=True)
        self.log('train_acc', result['acc'], on_step=True, on_epoch=True, prog_bar=True, logger=True)
        return result['loss']

    def validation_step(self, batch: tuple, batch_idx: int):
        result = self._shared_step(batch, batch_idx)
        self.log('val_loss', result['loss'], on_epoch=True, prog_bar=True, logger=True)
        self.log('val_acc', result['acc'], on_epoch=True, prog_bar=True, logger=True)

    def test_step(self, batch: tuple, batch_idx: int):
        result = self._shared_step(batch, batch_idx)
        self.log('test_loss', result['loss'], on_epoch=True, logger=True)
        self.log('test_acc', result['acc'], on_epoch=True, logger=True)

    def configure_optimizers(self) -> Dict[str, Any]:
        """
        Configures the optimizer and learning rate scheduler.
        """
        optimizer = AdamW(self.parameters(), lr=self.hparams.learning_rate, weight_decay=self.hparams.weight_decay)

        scheduler = {
            "scheduler": CosineAnnealingLR(optimizer, T_max=self.total_steps, eta_min=1e-6),
            "interval": "step",
            "frequency": 1,
        }

        return {"optimizer": optimizer, "lr_scheduler": scheduler}

    def model_summary(self, batch_size: int = 64):
        """
        Prints a detailed summary of the model architecture and parameters.
        """
        # The input size is (batch_size, channels, height, width)
        input_size = (batch_size, 1, 32, 32)
        print("\n" + "=" * 80)
        print(f"MODEL SUMMARY: {self.__class__.__name__}")
        print("=" * 80)
        summary(self, input_size=input_size, col_names=["input_size", "output_size", "num_params", "mult_adds"])
        print("=" * 80)

# ----------------------------------------------------------------------------
# 3. DEFINE THE SPECIFIC MODEL IMPLEMENTATIONS
# ----------------------------------------------------------------------------

# --- HTL Model ---
class HTLModel(BaseLitModel):
    """
    The full model using the Hex-Tensor Layer (HTL) as the feature extractor.
    """
    def __init__(self, psi_0: np.ndarray, **kwargs):
        super().__init__(**kwargs)

        # The feature extractor is our custom HTL.
        # Input: 1024, Output: 128
        self.feature_extractor = physics_utils.HexTensorLayer(psi_0, output_dim=128)

# --- Dense Baseline Model ---
class DenseModel(BaseLitModel):
    """
    The baseline model using a standard Dense layer as the feature extractor.
    Parameter count is matched to the HTL model for a fair comparison.
    """
    def __init__(self, **kwargs):
        super().__init__(**kwargs)

        # The feature extractor is a standard Dense layer.
        # Input: 1024, Output: 128
        self.feature_extractor = physics_utils.StandardDenseLayer(
            input_dim=PROJECT_CONSTANTS["HILBERT_DIM"],
            output_dim=128
        )

In [None]:
# ============================================================================
# CELL 4: PIPELINE VERIFICATION & SANITY CHECK (REVISED)
# ============================================================================
# This cell prepares all components for the main experiment, verifies their
# correctness, and runs a brief "smoke test" to ensure the pipeline is functional.
# ============================================================================

# ----------------------------------------------------------------------------
# 1. IMPORT LIBRARIES
# ----------------------------------------------------------------------------
import torch
import pytorch_lightning as pl
from pytorch_lightning.callbacks import ModelCheckpoint, LearningRateMonitor, EarlyStopping
from pytorch_lightning.loggers import TensorBoardLogger
from pytorch_lightning.profilers import PyTorchProfiler
from typing import Type
import numpy as np

# Import our project modules
import physics_utils
# The model classes are available from the execution of the revised Cell 3.

# ----------------------------------------------------------------------------
# 2. EXPERIMENT RUNNER FUNCTION
# ----------------------------------------------------------------------------
def run_experiment(
    model_class: Type[BaseLitModel],
    config: dict,
    project_dirs: dict,
    psi_0: np.ndarray = None,
    smoke_test_steps: int = None
):
    """
    Executes a full training and testing pipeline for a given model and config.

    Args:
        model_class (Type[BaseLitModel]): The model class to train (HTLModel or DenseModel).
        config (dict): A dictionary of training hyperparameters.
        project_dirs (dict): A dictionary of project directories.
        psi_0 (np.ndarray, optional): The ground state vector, required for HTLModel.
        smoke_test_steps (int, optional): If provided, limits training to this many steps.
    """
    model_name = model_class.__name__
    print("\n" + "=" * 80)
    print(f"CONFIGURING EXPERIMENT: {model_name} with Noise ε = {config['noise_level']}")
    print("=" * 80)

    # --- 1. Setup DataModule ---
    datamodule = MNISTDataModule(
        batch_size=config["batch_size"],
        num_workers=config["num_workers"],
        noise_level=config["noise_level"]
    )
    datamodule.setup('fit')

    # --- 2. Calculate Total Steps for LR Scheduler ---
    steps_per_epoch = len(datamodule.train_dataloader())
    if smoke_test_steps:
        # For a short smoke test, don't base scheduler on full run
        total_steps = smoke_test_steps
    else:
        total_steps = steps_per_epoch * config["max_epochs"]
    print(f"Training details: {steps_per_epoch} steps/epoch, {total_steps} total steps planned.")

    # --- 3. Initialize Model ---
    model_args = {
        "learning_rate": config["learning_rate"],
        "weight_decay": config["weight_decay"],
        "total_steps": total_steps
    }
    if model_class == HTLModel:
        if psi_0 is None:
            raise ValueError("psi_0 must be provided for HTLModel")
        model = HTLModel(psi_0=psi_0, **model_args)
    else:
        model = DenseModel(**model_args)

    # REVISION: torch.compile is now applied here, to the model instance.
    try:
        print(f"Compiling {model_name} with torch.compile()...")
        model = torch.compile(model, mode="max-autotune")
        print(f"✅ {model_name} compiled successfully.")
    except Exception as e:
        print(f"⚠️ WARNING: torch.compile() failed: {e}. Using uncompiled model.")

    # --- 4. Configure Callbacks & Logger ---
    logger = TensorBoardLogger(project_dirs["log_dir"], name=f"{model_name}_noise_{config['noise_level']:.1f}")
    profiler = PyTorchProfiler(
        dirpath=project_dirs["debug_dir"], filename=f"{model_name}_profile",
        schedule=torch.profiler.schedule(wait=1, warmup=1, active=5, repeat=1),
        on_trace_ready=torch.profiler.tensorboard_trace_handler(logger.log_dir)
    )
    checkpoint_callback = ModelCheckpoint(
        dirpath=project_dirs["checkpoint_dir"], monitor="val_acc", mode="max", save_top_k=1, verbose=False
    )
    lr_monitor = LearningRateMonitor(logging_interval='step')
    early_stopping = EarlyStopping(monitor="val_loss", patience=config["early_stopping_patience"], mode="min", verbose=False)

    # --- 5. Configure Trainer ---
    trainer = pl.Trainer(
        accelerator="gpu", devices=1, max_epochs=config["max_epochs"],
        logger=logger, profiler=profiler,
        callbacks=[checkpoint_callback, lr_monitor, early_stopping],
        precision="16-mixed", benchmark=True, log_every_n_steps=20,
        limit_train_batches=smoke_test_steps if smoke_test_steps else 1.0
    )

    # --- 6. Run Training & Testing ---
    print(f"\n--- Starting run for {model_name} ---")
    with torch.cuda.nvtx.range("trainer.fit"):
        trainer.fit(model, datamodule=datamodule)

    with torch.cuda.nvtx.range("trainer.test"):
        test_results = trainer.test(ckpt_path="best", datamodule=datamodule, verbose=False)
    print(f"--- Run finished for {model_name} ---")

    print(f"  - Best Checkpoint: {checkpoint_callback.best_model_path}")
    print(f"  - Final Test Accuracy:   {test_results[0]['test_acc']:.4f}")

    return test_results[0]

# ----------------------------------------------------------------------------
# 4. MAIN EXECUTION BLOCK: SETUP, VERIFICATION, AND SMOKE TEST
# ----------------------------------------------------------------------------
if __name__ == '__main__':
    # --- 1. Initialize Physics Foundation (Single Source of Truth) ---
    psi_0, E_0 = physics_utils.initialize_physics_foundation(
        e_state=PROJECT_CONSTANTS["E_STATE"],
        e_trans=PROJECT_CONSTANTS["E_TRANS"]
    )
    psi_0_path = PROJECT_CONFIG["debug_dir"] / "psi_0_ground_state.npy"
    np.save(psi_0_path, psi_0)
    print(f"\n✅ Ground state vector created and saved to: {psi_0_path}")

    # --- 2. Verify Model Architectures and Parameter Counts ---
    # REVISION: This logic is moved from Cell 3's __main__ block.
    print("\n" + "=" * 80)
    print("VERIFYING MODEL PARAMETER COUNTS")
    print("=" * 80)

    # Instantiate models for verification
    htl_model_verify = HTLModel(psi_0=psi_0, total_steps=1)
    dense_model_verify = DenseModel(total_steps=1)

    htl_params = sum(p.numel() for p in htl_model_verify.parameters() if p.requires_grad)
    dense_params = sum(p.numel() for p in dense_model_verify.parameters() if p.requires_grad)

    print(f"Target Parameter Count: ~132,000")
    print(f"  - HTL Model Total Parameters:   {htl_params:,}")
    print(f"  - Dense Model Total Parameters: {dense_params:,}")

    param_diff_percent = abs(htl_params - dense_params) / dense_params * 100
    if param_diff_percent < 1.0:
        print("✅ SUCCESS: Parameter counts are matched within a 1% tolerance.")
    else:
        print(f"❌ FAILURE: Parameter counts do not match ({param_diff_percent:.2f}% diff).", file=sys.stderr)

    htl_model_verify.model_summary(batch_size=64)

    # --- 3. Run a Brief Smoke Test ---
    print("\n" + "=" * 80)
    print("PERFORMING A QUICK SMOKE TEST ON THE HTL MODEL")
    print("=" * 80)

    SMOKE_TEST_CONFIG = {
        "model_type": "HTL", "noise_level": 0.3, "learning_rate": 1e-3,
        "weight_decay": 1e-4, "batch_size": 512, "max_epochs": 1,
        "num_workers": os.cpu_count() // 2, "early_stopping_patience": 3,
    }

    run_experiment(
        model_class=HTLModel,
        config=SMOKE_TEST_CONFIG,
        project_dirs=PROJECT_CONFIG,
        psi_0=psi_0,
        smoke_test_steps=50  # Limit to 50 training batches
    )

    print("\n" + "*" * 80)
    print("PIPELINE VERIFIED: System is ready for the full experimental suite.")
    print("The `psi_0` object is now available for the next cell.")
    print("*" * 80)

In [None]:
# ============================================================================
# CELL 5: PHASE 1B - FULL EXPERIMENTAL SUITE (REVISED)
# ============================================================================
# This cell executes the complete, statistically robust Phase 1b experimental
# protocol. It replaces the original Cell 5.
# WARNING: This cell will run for several hours.
# ============================================================================

# ----------------------------------------------------------------------------
# 1. IMPORT LIBRARIES
# ----------------------------------------------------------------------------
import pandas as pd
import time
from datetime import timedelta
import pytorch_lightning as pl
from pytorch_lightning.loggers import TensorBoardLogger
from pytorch_lightning.callbacks import EarlyStopping, ModelCheckpoint

# ----------------------------------------------------------------------------
# 2. PREREQUISITE CHECK
# ----------------------------------------------------------------------------
if 'psi_0' not in locals() or psi_0 is None:
    raise NameError("The `psi_0` ground state vector is not defined. Please run the verification cell (Cell 4) before this one.")
else:
    print("✅ Prerequisite check passed: `psi_0` ground state is available.")

# ----------------------------------------------------------------------------
# 3. DEFINE PHASE 1B EXPERIMENTAL PROTOCOL
# ----------------------------------------------------------------------------
# This grid defines the three arms of our experiment.
MODEL_CONFIGS = [
    {
        'name': 'HTL',
        'model_class': HTLModel,
        'train_noise': 0.3,  # HTL is trained on noisy data
    },
    {
        'name': 'Dense',
        'model_class': DenseModel,
        'train_noise': 0.0,   # Standard baseline is trained on clean data
    },
    {
        'name': 'DenseAug',
        'model_class': DenseModel,  # Same architecture as Dense...
        'train_noise': 0.3,  # ...but trained on noisy data (the critical control)
    }
]

TEST_NOISE_LEVELS = [0.0, 0.1, 0.2, 0.3, 0.4, 0.5]
RANDOM_SEEDS = [42, 123, 456]  # Use 3 seeds for statistical validity

# Base configuration for all runs
BASE_TRAIN_CONFIG = {
    "learning_rate": 1e-3, "weight_decay": 1e-4, "batch_size": 512,
    "max_epochs": 20, "num_workers": os.cpu_count() // 2,
    "early_stopping_patience": 3,
}

# ----------------------------------------------------------------------------
# 4. AUTOMATED EXPERIMENT EXECUTION LOOP
# ----------------------------------------------------------------------------
all_results = []
total_runs = len(RANDOM_SEEDS) * len(MODEL_CONFIGS)
current_run = 0
total_start_time = time.time()

print("=" * 80)
print("STARTING PROJECT JANUS: PHASE 1B EXPERIMENTAL SUITE")
print(f"Total models to train: {total_runs} ({len(MODEL_CONFIGS)} configs x {len(RANDOM_SEEDS)} seeds)")
print("=" * 80)

for seed in RANDOM_SEEDS:
    for model_config in MODEL_CONFIGS:
        current_run += 1
        run_start_time = time.time()

        # --- Set Seed for this Run ---
        pl.seed_everything(seed, workers=True)

        model_name = model_config['name']
        train_noise = model_config['train_noise']

        print("\n" + "*" * 80)
        print(f"RUN {current_run}/{total_runs}: Training model '{model_name}' with seed={seed}, train_noise={train_noise}")
        print("*" * 80)

        # --- 1. Initialize DataModule for TRAINING ---
        train_datamodule = MNISTDataModule(
            batch_size=BASE_TRAIN_CONFIG["batch_size"],
            num_workers=BASE_TRAIN_CONFIG["num_workers"],
            noise_level=train_noise
        )
        train_datamodule.setup('fit')

        # --- 2. Initialize Model ---
        steps_per_epoch = len(train_datamodule.train_dataloader())
        total_steps = steps_per_epoch * BASE_TRAIN_CONFIG["max_epochs"]

        model_args = {
            "learning_rate": BASE_TRAIN_CONFIG["learning_rate"],
            "weight_decay": BASE_TRAIN_CONFIG["weight_decay"],
            "total_steps": total_steps
        }
        ModelClass = model_config['model_class']
        if ModelClass == HTLModel:
            model = HTLModel(psi_0=psi_0, **model_args)
        else:
            model = DenseModel(**model_args)

        # Apply torch.compile
        try:
            model = torch.compile(model, mode="max-autotune")
        except Exception:
            pass # Failsafe for non-compatible versions

        # --- 3. Configure Trainer and Train the Model ---
        logger = TensorBoardLogger(PROJECT_CONFIG["log_dir"], name=f"{model_name}_seed{seed}_train_noise{train_noise}")
        checkpoint_callback = ModelCheckpoint(monitor="val_acc", mode="max")

        trainer = pl.Trainer(
            accelerator="gpu", devices=1, max_epochs=BASE_TRAIN_CONFIG["max_epochs"],
            logger=logger, precision="16-mixed", benchmark=True, deterministic=True,
            callbacks=[EarlyStopping(monitor="val_loss", patience=BASE_TRAIN_CONFIG["early_stopping_patience"]), checkpoint_callback],
            enable_progress_bar=True, log_every_n_steps=50
        )

        with torch.cuda.nvtx.range(f"fit_{model_name}_seed{seed}"):
            trainer.fit(model, datamodule=train_datamodule)

        # --- 4. Test the SINGLE Trained Model on MULTIPLE Noise Levels ---
        print(f"\n--- Testing '{model_name}' (seed={seed}) across noise spectrum ---")
        best_model_path = checkpoint_callback.best_model_path

        for test_noise in TEST_NOISE_LEVELS:
            # Create a new datamodule for each test noise level
            test_datamodule = MNISTDataModule(
                batch_size=BASE_TRAIN_CONFIG["batch_size"],
                num_workers=BASE_TRAIN_CONFIG["num_workers"],
                noise_level=0.0 # Test data itself is clean before flattening
            )

            # Manually create a test loader with noise transform for evaluation
            test_dataset = MNIST(test_datamodule.data_dir, train=False, transform=transforms.Compose([
                transforms.Resize((32, 32)),
                transforms.ToTensor(),
                PixelDropout(test_noise)
            ]))
            test_loader = DataLoader(test_dataset, batch_size=BASE_TRAIN_CONFIG["batch_size"], num_workers=BASE_TRAIN_CONFIG["num_workers"])

            with torch.cuda.nvtx.range(f"test_{model_name}_seed{seed}_test_noise{test_noise}"):
                test_results = trainer.test(model, dataloaders=test_loader, ckpt_path=best_model_path, verbose=False)

            test_acc = test_results[0]['test_acc']

            # Store the comprehensive result
            all_results.append({
                'model': model_name,
                'seed': seed,
                'train_noise': train_noise,
                'test_noise': test_noise,
                'test_accuracy': test_acc
            })
            print(f"  - Test Accuracy at noise ε={test_noise:.1f}: {test_acc:.4f}")

        run_elapsed_time = time.time() - run_start_time
        print(f"--- Run {current_run} finished in {timedelta(seconds=run_elapsed_time)} ---")

total_elapsed_time = time.time() - total_start_time
print("\n" + "=" * 80)
print("PHASE 1B EXPERIMENTAL SUITE COMPLETE")
print(f"Total execution time: {timedelta(seconds=total_elapsed_time)}")
print("=" * 80)

# ----------------------------------------------------------------------------
# 5. SAVE RAW RESULTS
# ----------------------------------------------------------------------------
results_df = pd.DataFrame(all_results)
results_csv_path = PROJECT_CONFIG["run_dir"] / "phase1b_full_results.csv"
results_df.to_csv(results_csv_path, index=False)

print("\n--- Raw Results Summary ---")
print(results_df.to_string())
print(f"\n✅ Full raw results saved to: {results_csv_path}")
print("\nProceed to the next cells for visualization and statistical analysis.")

In [None]:
# ============================================================================
# CELL 6: PHASE 1B - RESULTS VISUALIZATION
# ============================================================================
# This cell aggregates the raw experimental data and generates the final,
# publication-quality comparative plot with statistical error bars.
# ============================================================================

# ----------------------------------------------------------------------------
# 1. IMPORT LIBRARIES
# ----------------------------------------------------------------------------
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np

# ----------------------------------------------------------------------------
# 2. PREREQUISITE CHECK
# ----------------------------------------------------------------------------
# Ensure the raw results from Cell 5 are available.
if 'results_df' not in locals() or not isinstance(results_df, pd.DataFrame):
    raise NameError("The `results_df` DataFrame is not defined. Please run the experimental suite (Cell 5) before this one.")
else:
    print("✅ Prerequisite check passed: `results_df` is available for analysis.")

# ----------------------------------------------------------------------------
# 3. DATA AGGREGATION ACROSS SEEDS
# ----------------------------------------------------------------------------
print("\n--- Aggregating results across random seeds ---")

# Group by model type and test noise level, then calculate mean and std deviation.
summary_df = results_df.groupby(['model', 'test_noise']).agg(
    test_accuracy_mean=('test_accuracy', 'mean'),
    test_accuracy_std=('test_accuracy', 'std')
).reset_index()

# Save the aggregated summary for easy access
summary_csv_path = PROJECT_CONFIG["run_dir"] / "phase1b_summary_results.csv"
summary_df.to_csv(summary_csv_path, index=False)

print("\n--- Aggregated Results Summary (Mean ± Std) ---")
print(summary_df.to_string())
print(f"\n✅ Aggregated summary saved to: {summary_csv_path}")

# ----------------------------------------------------------------------------
# 4. GENERATE PUBLICATION-QUALITY VISUALIZATION
# ----------------------------------------------------------------------------
print("\n--- Generating comparative visualization ---")

# Set publication-quality style
sns.set_style("whitegrid")
plt.rcParams['figure.figsize'] = (12, 8)
plt.rcParams['font.size'] = 12

# Create plot
fig, ax = plt.subplots()

# Define distinct styles for each model for maximum clarity
styles = {
    'HTL':      {'color': '#0072B2', 'marker': 'o', 'label': 'HTL (Trained Noisy)'},
    'Dense':    {'color': '#D55E00', 'marker': 's', 'label': 'Dense (Trained Clean)'},
    'DenseAug': {'color': '#009E73', 'marker': '^', 'label': 'DenseAug (Trained Noisy)'}
}

# Plot each model's performance curve with error bars
for model_name, style in styles.items():
    model_data = summary_df[summary_df['model'] == model_name]

    if not model_data.empty:
        ax.errorbar(
            x=model_data['test_noise'],
            y=model_data['test_accuracy_mean'],
            yerr=model_data['test_accuracy_std'],
            label=style['label'],
            color=style['color'],
            marker=style['marker'],
            markersize=8,
            linewidth=2.5,
            capsize=5, # Width of the error bar caps
            capthick=2,
            alpha=0.9
        )

# --- Formatting and Annotations ---
ax.set_title('Project Janus: Model Robustness vs. Input Noise', fontsize=18, weight='bold', pad=20)
ax.set_xlabel('Test Noise Level (ε: Pixel Dropout Probability)', fontsize=14, weight='bold')
ax.set_ylabel('Test Accuracy (Mean ± Std over 3 Seeds)', fontsize=14, weight='bold')

ax.set_xlim(-0.02, 0.52)
ax.set_ylim(0.0, 1.05)
ax.grid(True, which='both', linestyle='--', linewidth=0.5)
ax.legend(loc='lower left', fontsize=12, frameon=True, shadow=True)

# Format y-axis as percentage
ax.yaxis.set_major_formatter(plt.FuncFormatter(lambda y, _: f'{y:.0%}'))

# Add a shaded region to highlight the high-noise regime
ax.axvspan(0.3, 0.5, alpha=0.1, color='red')
ax.text(0.4, 0.1, 'High Corruption\nRegime', ha='center', fontsize=12,
        color='darkred', style='italic', alpha=0.8)

plt.tight_layout()

# --- Save and Display ---
plot_path = PROJECT_CONFIG["run_dir"] / "phase1b_robustness_comparison.png"
plt.savefig(plot_path, dpi=300, bbox_inches='tight')
plt.show()

print(f"\n✅ Publication-quality visualization saved to: {plot_path}")
print("\nProceed to the final cell for statistical analysis.")

In [None]:
# ============================================================================
# CELL 7: PHASE 1B - STATISTICAL ANALYSIS
# ============================================================================
# This cell provides the final, quantitative validation of our results.
# It calculates a single robustness metric (Area Under Curve) and performs
# statistical tests to determine the significance of the findings.
# ============================================================================

# ----------------------------------------------------------------------------
# 1. IMPORT LIBRARIES
# ----------------------------------------------------------------------------
import pandas as pd
import numpy as np
from scipy import stats

# ----------------------------------------------------------------------------
# 2. PREREQUISITE CHECK
# ----------------------------------------------------------------------------
# Ensure the raw results from Cell 5 are available.
if 'results_df' not in locals() or not isinstance(results_df, pd.DataFrame):
    raise NameError("The `results_df` DataFrame is not defined. Please run the experimental suite (Cell 5) before this one.")
else:
    print("✅ Prerequisite check passed: `results_df` is available for analysis.")

# ----------------------------------------------------------------------------
# 3. CALCULATE ROBUSTNESS METRIC: AREA UNDER CURVE (AUC)
# ----------------------------------------------------------------------------
print("\n--- Calculating Robustness Metric: Area Under the Accuracy Curve (AUC) ---")
print("This metric integrates accuracy over all tested noise levels. Higher is better.")

auc_results = []

# Iterate over every unique model and seed combination
for model_name in results_df['model'].unique():
    for seed in results_df['seed'].unique():
        # Filter the DataFrame for the specific run
        subset = results_df[
            (results_df['model'] == model_name) &
            (results_df['seed'] == seed)
        ].sort_values('test_noise') # IMPORTANT: Sort by x-axis value

        if not subset.empty:
            # Calculate the area under the curve using the trapezoidal rule
            auc = np.trapz(y=subset['test_accuracy'], x=subset['test_noise'])

            auc_results.append({
                'model': model_name,
                'seed': seed,
                'auc': auc
            })

auc_df = pd.DataFrame(auc_results)

# Display the aggregated AUC results
print("\n--- AUC Summary (Mean ± Std) ---")
print(auc_df.groupby('model')['auc'].agg(['mean', 'std']).to_string())

# ----------------------------------------------------------------------------
# 4. PERFORM PAIRWISE STATISTICAL HYPOTHESIS TESTS
# ----------------------------------------------------------------------------
print("\n" + "=" * 80)
print("STATISTICAL HYPOTHESIS TESTING")
print("=" * 80)
print("Performing one-sided Mann-Whitney U tests to check if the first model's")
print("AUC is significantly GREATER than the second's (α = 0.05).")

# Extract the AUC scores for each model group
htl_auc = auc_df[auc_df['model'] == 'HTL']['auc']
dense_auc = auc_df[auc_df['model'] == 'Dense']['auc']
denseaug_auc = auc_df[auc_df['model'] == 'DenseAug']['auc']

# --- Comparison 1: DenseAug vs. Dense (Sanity Check) ---
print("\n--- Test 1: Is noise augmentation effective? (DenseAug vs. Dense) ---")
stat, p_value = stats.mannwhitneyu(denseaug_auc, dense_auc, alternative='greater')
print(f"  - U-statistic: {stat:.3f}, p-value: {p_value:.4f}")
if p_value < 0.05:
    print("  - ✅ Result is SIGNIFICANT. Noise augmentation provides a clear robustness benefit.")
else:
    print("  - ✗ Result is NOT SIGNIFICANT. Noise augmentation did not provide a clear benefit.")

# --- Comparison 2: HTL vs. Dense ---
print("\n--- Test 2: Is the HTL architecture better than a naive baseline? (HTL vs. Dense) ---")
stat, p_value = stats.mannwhitneyu(htl_auc, dense_auc, alternative='greater')
print(f"  - U-statistic: {stat:.3f}, p-value: {p_value:.4f}")
if p_value < 0.05:
    print("  - ✅ Result is SIGNIFICANT. The HTL is demonstrably more robust than the standard Dense model.")
else:
    print("  - ✗ Result is NOT SIGNIFICANT. The HTL did not show a clear advantage over the standard Dense model.")

# --- Comparison 3: HTL vs. DenseAug (The Critical Test) ---
print("\n--- Test 3 (CRITICAL): Is the HTL architecture's benefit unique? (HTL vs. DenseAug) ---")
print("This tests if the physics-informed structure provides an advantage BEYOND what data augmentation alone can achieve.")
stat, p_value = stats.mannwhitneyu(htl_auc, denseaug_auc, alternative='greater')
print(f"  - U-statistic: {stat:.3f}, p-value: {p_value:.4f}")
if p_value < 0.05:
    print("  - ✅ Result is SIGNIFICANT. The HTL's architectural advantage is real and not just an effect of training on noisy data.")
    print("  - This is a Tier 1 (Strong) result for Project Janus.")
else:
    print("  - ✗ Result is NOT SIGNIFICANT. The HTL's performance is statistically indistinguishable from a standard model trained with noise augmentation.")
    print("  - This is a Tier 2 (Moderate) or Tier 3 result.")

# ----------------------------------------------------------------------------
# 5. PROJECT CONCLUSION
# ----------------------------------------------------------------------------
print("\n" + "*" * 80)
print("PROJECT JANUS: PHASE 1B COMPLETE")
print("All experiments, visualizations, and statistical analyses have been executed.")
print(f"All artifacts for this run are located in: {PROJECT_CONFIG['run_dir']}")
print("*" * 80)