# Modal Execution Runtime for ML Experiments

This notebook provides a complete execution runtime for running ML experiments on Modal serverless infrastructure.

## Features:
- Spin up Modal serverless instances
- Execute experiment Python scripts with JSON criteria
- Return in-progress job IDs for polling
- Collect and compare benchmarks across multiple models

## 1. Setup and Dependencies

In [1]:
import modal
import json
import uuid
import time
import asyncio
import os
from pathlib import Path
from typing import Dict, List, Optional, Any
from datetime import datetime
import pandas as pd
import matplotlib.pyplot as plt
import base64
from dataclasses import dataclass, asdict
import tempfile
import pickle

## 2. Modal Configuration and Authentication

In [2]:
# Authenticate with Modal (run this once)
# You'll need to have Modal CLI installed: pip install modal
# Then run: modal token new
# Or set MODAL_TOKEN_ID and MODAL_TOKEN_SECRET environment variables

# Create Modal app
app = modal.App("ml-experiment-runner")

# Define the image with all required dependencies
image = modal.Image.debian_slim().pip_install(
    "torch",
    "torchvision",
    "scikit-learn",
    "matplotlib",
    "numpy",
    "pandas",
    "tqdm"
)

# Create a volume for storing experiment artifacts
volume = modal.Volume.from_name("experiment-artifacts", create_if_missing=True)

## 3. Experiment Data Models

In [3]:
@dataclass
class ExperimentConfig:
    """Configuration for an ML experiment"""
    experiment_id: str
    model_config: Dict[str, Any]
    dataset: str = "cifar10"
    metrics: List[str] = None
    baselines: List[str] = None
    training_params: Dict[str, Any] = None
    
    def __post_init__(self):
        if self.metrics is None:
            self.metrics = ["accuracy", "f1_score", "loss"]
        if self.baselines is None:
            self.baselines = ["cnn_basic", "resnet18"]
        if self.training_params is None:
            self.training_params = {
                "epochs": 3,
                "batch_size": 128,
                "learning_rate": 0.001
            }

@dataclass
class JobStatus:
    """Status of a running job"""
    job_id: str
    experiment_id: str
    status: str  # 'pending', 'running', 'completed', 'failed'
    started_at: datetime
    completed_at: Optional[datetime] = None
    error: Optional[str] = None
    results: Optional[Dict[str, Any]] = None
    artifacts_path: Optional[str] = None

@dataclass
class BenchmarkResult:
    """Results from a benchmark run"""
    model_name: str
    accuracy: float
    f1_score: float
    loss: float
    training_time: float
    inference_time: float
    loss_history: List[float]
    accuracy_history: List[float]

## 4. Job Management System

In [4]:
class JobManager:
    """Manages experiment jobs and their statuses"""
    
    def __init__(self):
        self.jobs: Dict[str, JobStatus] = {}
        self.results: Dict[str, List[BenchmarkResult]] = {}
    
    def create_job(self, experiment_id: str) -> str:
        """Create a new job and return its ID"""
        job_id = f"job_{uuid.uuid4().hex[:8]}"
        self.jobs[job_id] = JobStatus(
            job_id=job_id,
            experiment_id=experiment_id,
            status='pending',
            started_at=datetime.now()
        )
        return job_id
    
    def update_job_status(self, job_id: str, status: str, **kwargs):
        """Update job status"""
        if job_id in self.jobs:
            self.jobs[job_id].status = status
            for key, value in kwargs.items():
                setattr(self.jobs[job_id], key, value)
    
    def get_job_status(self, job_id: str) -> Optional[JobStatus]:
        """Get current job status"""
        return self.jobs.get(job_id)
    
    def add_benchmark_result(self, job_id: str, result: BenchmarkResult):
        """Add benchmark result for a job"""
        if job_id not in self.results:
            self.results[job_id] = []
        self.results[job_id].append(result)
    
    def get_job_results(self, job_id: str) -> List[BenchmarkResult]:
        """Get all benchmark results for a job"""
        return self.results.get(job_id, [])

# Initialize global job manager
job_manager = JobManager()

## 5. Modal Serverless Execution Functions

In [5]:
@app.function(
    image=image,
    volumes={'/artifacts': volume},
    gpu="any",  # Use GPU if available, otherwise CPU
    timeout=1800,  # 30 minutes timeout
    memory=8192,  # 8GB RAM
)
def run_experiment(
    experiment_script: str,
    config: Dict[str, Any],
    job_id: str
) -> Dict[str, Any]:
    """Run an ML experiment on Modal"""
    
    import torch
    import torchvision
    import torchvision.transforms as transforms
    from torch import nn, optim
    from sklearn.metrics import f1_score, accuracy_score
    import time
    import json
    import sys
    from io import StringIO
    
    # Create artifacts directory for this job
    artifacts_dir = Path(f"/artifacts/{job_id}")
    artifacts_dir.mkdir(parents=True, exist_ok=True)
    
    # Save config
    with open(artifacts_dir / "config.json", "w") as f:
        json.dump(config, f, indent=2)
    
    # Execute the experiment script
    results = {}
    
    try:
        # Create a namespace for script execution
        namespace = {
            'config': config,
            'artifacts_dir': str(artifacts_dir),
            'results': {},
            'torch': torch,
            'torchvision': torchvision,
            'transforms': transforms,
            'nn': nn,
            'optim': optim,
            'f1_score': f1_score,
            'accuracy_score': accuracy_score,
        }
        
        # Execute the script
        exec(experiment_script, namespace)
        
        # Get results from namespace
        results = namespace.get('results', {})
        
        # Save results
        with open(artifacts_dir / "results.json", "w") as f:
            json.dump(results, f, indent=2)
        
        # Commit volume changes
        volume.commit()
        
        return {
            'status': 'success',
            'results': results,
            'artifacts_path': str(artifacts_dir)
        }
        
    except Exception as e:
        return {
            'status': 'error',
            'error': str(e),
            'artifacts_path': str(artifacts_dir)
        }

@app.function(
    image=image,
    volumes={'/artifacts': volume},
)
def fetch_artifacts(job_id: str) -> Dict[str, Any]:
    """Fetch artifacts from a completed job"""
    artifacts_dir = Path(f"/artifacts/{job_id}")
    
    if not artifacts_dir.exists():
        return {'error': f'No artifacts found for job {job_id}'}
    
    artifacts = {}
    
    # Load results
    results_file = artifacts_dir / "results.json"
    if results_file.exists():
        with open(results_file, "r") as f:
            artifacts['results'] = json.load(f)
    
    # Load plots as base64
    for plot_file in artifacts_dir.glob("*.png"):
        with open(plot_file, "rb") as f:
            artifacts[plot_file.stem] = base64.b64encode(f.read()).decode('utf-8')
    
    # Load config
    config_file = artifacts_dir / "config.json"
    if config_file.exists():
        with open(config_file, "r") as f:
            artifacts['config'] = json.load(f)
    
    return artifacts

## 6. Experiment Execution Interface

In [6]:
class ExperimentRunner:
    """High-level interface for running experiments"""
    
    def __init__(self):
        self.job_manager = job_manager
    
    async def submit_experiment(
        self,
        experiment_script: str,
        config: ExperimentConfig
    ) -> str:
        """Submit an experiment for execution and return job ID"""
        
        # Create job
        job_id = self.job_manager.create_job(config.experiment_id)
        
        # Update status to running
        self.job_manager.update_job_status(job_id, 'running')
        
        try:
            # Deploy and run on Modal
            with app.run():
                result = await run_experiment.remote.aio(
                    experiment_script,
                    asdict(config),
                    job_id
                )
            
            # Update job status based on result
            if result['status'] == 'success':
                self.job_manager.update_job_status(
                    job_id,
                    'completed',
                    completed_at=datetime.now(),
                    results=result['results'],
                    artifacts_path=result['artifacts_path']
                )
                
                # Parse and store benchmark results
                self._parse_benchmark_results(job_id, result['results'])
            else:
                self.job_manager.update_job_status(
                    job_id,
                    'failed',
                    completed_at=datetime.now(),
                    error=result.get('error', 'Unknown error')
                )
        
        except Exception as e:
            self.job_manager.update_job_status(
                job_id,
                'failed',
                completed_at=datetime.now(),
                error=str(e)
            )
        
        return job_id
    
    def _parse_benchmark_results(self, job_id: str, results: Dict[str, Any]):
        """Parse results and create BenchmarkResult objects"""
        for model_name, model_results in results.items():
            if isinstance(model_results, dict):
                benchmark = BenchmarkResult(
                    model_name=model_name,
                    accuracy=model_results.get('accuracy', 0.0),
                    f1_score=model_results.get('f1_score', 0.0),
                    loss=model_results.get('loss', 0.0),
                    training_time=model_results.get('training_time', 0.0),
                    inference_time=model_results.get('inference_time', 0.0),
                    loss_history=model_results.get('loss_history', []),
                    accuracy_history=model_results.get('accuracy_history', [])
                )
                self.job_manager.add_benchmark_result(job_id, benchmark)
    
    def poll_job(self, job_id: str) -> JobStatus:
        """Poll job status"""
        return self.job_manager.get_job_status(job_id)
    
    async def get_artifacts(self, job_id: str) -> Dict[str, Any]:
        """Fetch artifacts from a completed job"""
        with app.run():
            return await fetch_artifacts.remote.aio(job_id)
    
    def compare_benchmarks(self, job_id: str) -> pd.DataFrame:
        """Compare benchmarks across models for a job"""
        results = self.job_manager.get_job_results(job_id)
        
        if not results:
            return pd.DataFrame()
        
        data = []
        for r in results:
            data.append({
                'Model': r.model_name,
                'Accuracy': r.accuracy,
                'F1 Score': r.f1_score,
                'Final Loss': r.loss,
                'Training Time (s)': r.training_time,
                'Inference Time (ms)': r.inference_time * 1000
            })
        
        df = pd.DataFrame(data)
        return df.sort_values('Accuracy', ascending=False)
    
    def plot_training_curves(self, job_id: str):
        """Plot training curves for all models"""
        results = self.job_manager.get_job_results(job_id)
        
        if not results:
            print("No results to plot")
            return
        
        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))
        
        for r in results:
            if r.loss_history:
                ax1.plot(r.loss_history, label=r.model_name)
            if r.accuracy_history:
                ax2.plot(r.accuracy_history, label=r.model_name)
        
        ax1.set_xlabel('Epoch')
        ax1.set_ylabel('Loss')
        ax1.set_title('Training Loss')
        ax1.legend()
        ax1.grid(True)
        
        ax2.set_xlabel('Epoch')
        ax2.set_ylabel('Accuracy')
        ax2.set_title('Training Accuracy')
        ax2.legend()
        ax2.grid(True)
        
        plt.tight_layout()
        plt.show()

# Create global runner instance
runner = ExperimentRunner()

## 7. Sample Test Inputs

In [7]:
# Create sample test inputs in scripts/sample/execution
sample_dir = Path("scripts/sample/execution")
sample_dir.mkdir(parents=True, exist_ok=True)

# Sample experiment configuration
sample_config = {
    "experiment_id": "exp_sample_001",
    "model_config": {
        "type": "cnn_custom",
        "activation": "gelu",
        "layers": ["conv", "batchnorm", "maxpool"]
    },
    "dataset": "cifar10",
    "metrics": ["accuracy", "f1_score", "loss"],
    "baselines": ["cnn_basic", "resnet18"],
    "training_params": {
        "epochs": 2,
        "batch_size": 128,
        "learning_rate": 0.001
    }
}

# Save sample config
with open(sample_dir / "sample_config.json", "w") as f:
    json.dump(sample_config, f, indent=2)

print(f"Sample config saved to {sample_dir / 'sample_config.json'}")

Sample config saved to scripts/sample/execution/sample_config.json


In [8]:
# Sample experiment script
sample_experiment_script = '''
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
import time
import numpy as np
from sklearn.metrics import f1_score

# Simple CNN model
class SimpleCNN(nn.Module):
    def __init__(self, activation='relu'):
        super(SimpleCNN, self).__init__()
        self.conv1 = nn.Conv2d(3, 32, 3, padding=1)
        self.conv2 = nn.Conv2d(32, 64, 3, padding=1)
        self.pool = nn.MaxPool2d(2, 2)
        self.fc1 = nn.Linear(64 * 8 * 8, 128)
        self.fc2 = nn.Linear(128, 10)
        
        if activation == 'gelu':
            self.activation = nn.GELU()
        else:
            self.activation = nn.ReLU()
    
    def forward(self, x):
        x = self.pool(self.activation(self.conv1(x)))
        x = self.pool(self.activation(self.conv2(x)))
        x = x.view(-1, 64 * 8 * 8)
        x = self.activation(self.fc1(x))
        x = self.fc2(x)
        return x

# Training function
def train_model(model, train_loader, test_loader, epochs, device):
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=config['training_params']['learning_rate'])
    
    loss_history = []
    accuracy_history = []
    
    start_time = time.time()
    
    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        
        for i, (inputs, labels) in enumerate(train_loader):
            inputs, labels = inputs.to(device), labels.to(device)
            
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
            running_loss += loss.item()
        
        # Evaluate
        model.eval()
        correct = 0
        total = 0
        all_preds = []
        all_labels = []
        
        with torch.no_grad():
            for inputs, labels in test_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
                all_preds.extend(predicted.cpu().numpy())
                all_labels.extend(labels.cpu().numpy())
        
        accuracy = correct / total
        epoch_loss = running_loss / len(train_loader)
        
        loss_history.append(epoch_loss)
        accuracy_history.append(accuracy)
        
        print(f'Epoch {epoch+1}: Loss={epoch_loss:.4f}, Accuracy={accuracy:.4f}')
    
    training_time = time.time() - start_time
    
    # Calculate F1 score
    f1 = f1_score(all_labels, all_preds, average='weighted')
    
    # Inference time
    model.eval()
    start_time = time.time()
    with torch.no_grad():
        for inputs, _ in test_loader:
            _ = model(inputs.to(device))
            break
    inference_time = (time.time() - start_time) / inputs.size(0)
    
    return {
        'accuracy': accuracy,
        'f1_score': f1,
        'loss': epoch_loss,
        'training_time': training_time,
        'inference_time': inference_time,
        'loss_history': loss_history,
        'accuracy_history': accuracy_history
    }

# Main execution
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

# Load CIFAR-10 dataset
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

train_dataset = datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
test_dataset = datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)

train_loader = DataLoader(train_dataset, batch_size=config['training_params']['batch_size'], shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=config['training_params']['batch_size'], shuffle=False)

# Train custom model
print("Training custom model...")
custom_model = SimpleCNN(activation=config['model_config'].get('activation', 'relu')).to(device)
custom_results = train_model(
    custom_model,
    train_loader,
    test_loader,
    config['training_params']['epochs'],
    device
)

# Train baseline CNN
print("Training baseline CNN...")
baseline_model = SimpleCNN(activation='relu').to(device)
baseline_results = train_model(
    baseline_model,
    train_loader,
    test_loader,
    config['training_params']['epochs'],
    device
)

# Store results
results['custom_model'] = custom_results
results['baseline_cnn'] = baseline_results

print("Experiment completed!")
'''

# Save sample experiment script
with open(sample_dir / "sample_experiment.py", "w") as f:
    f.write(sample_experiment_script)

print(f"Sample experiment script saved to {sample_dir / 'sample_experiment.py'}")

Sample experiment script saved to scripts/sample/execution/sample_experiment.py


In [9]:
# Create evaluation criteria JSON
evaluation_criteria = {
    "success_criteria": {
        "min_accuracy": 0.6,
        "max_training_time": 300,  # 5 minutes
        "required_metrics": ["accuracy", "f1_score", "loss"]
    },
    "comparison_metrics": [
        {
            "name": "accuracy_improvement",
            "baseline": "baseline_cnn",
            "target": "custom_model",
            "threshold": 0.02  # 2% improvement
        },
        {
            "name": "training_efficiency",
            "metric": "training_time",
            "max_value": 180  # 3 minutes
        }
    ],
    "output_artifacts": [
        "loss_curves.png",
        "accuracy_curves.png",
        "benchmark_comparison.csv",
        "results_summary.json"
    ]
}

# Save evaluation criteria
with open(sample_dir / "evaluation_criteria.json", "w") as f:
    json.dump(evaluation_criteria, f, indent=2)

print(f"Evaluation criteria saved to {sample_dir / 'evaluation_criteria.json'}")

Evaluation criteria saved to scripts/sample/execution/evaluation_criteria.json


## 8. Execution Examples

In [10]:
# Example: Submit an experiment
async def run_sample_experiment():
    """Run a sample experiment"""
    
    # Load sample config
    with open("scripts/sample/execution/sample_config.json", "r") as f:
        config_dict = json.load(f)
    
    # Create experiment config
    config = ExperimentConfig(**config_dict)
    
    # Load experiment script
    with open("scripts/sample/execution/sample_experiment.py", "r") as f:
        script = f.read()
    
    # Submit experiment
    print("Submitting experiment...")
    job_id = await runner.submit_experiment(script, config)
    print(f"Job submitted with ID: {job_id}")
    
    # Poll for completion
    while True:
        status = runner.poll_job(job_id)
        print(f"Job status: {status.status}")
        
        if status.status in ['completed', 'failed']:
            break
        
        await asyncio.sleep(5)
    
    if status.status == 'completed':
        print("\nExperiment completed successfully!")
        
        # Get benchmark comparison
        print("\nBenchmark Comparison:")
        comparison = runner.compare_benchmarks(job_id)
        print(comparison)
        
        # Plot training curves
        print("\nPlotting training curves...")
        runner.plot_training_curves(job_id)
        
        # Fetch artifacts
        print("\nFetching artifacts...")
        artifacts = await runner.get_artifacts(job_id)
        print(f"Artifacts retrieved: {list(artifacts.keys())}")
    else:
        print(f"\nExperiment failed: {status.error}")
    
    return job_id

# Run the sample experiment (uncomment to execute)
# job_id = await run_sample_experiment()

In [11]:
# Helper function to poll job status
def check_job_status(job_id: str):
    """Check the status of a job"""
    status = runner.poll_job(job_id)
    
    if status:
        print(f"Job ID: {status.job_id}")
        print(f"Status: {status.status}")
        print(f"Started: {status.started_at}")
        
        if status.completed_at:
            print(f"Completed: {status.completed_at}")
            duration = (status.completed_at - status.started_at).total_seconds()
            print(f"Duration: {duration:.2f} seconds")
        
        if status.error:
            print(f"Error: {status.error}")
        
        if status.results:
            print(f"Results available: {list(status.results.keys())}")
    else:
        print(f"No job found with ID: {job_id}")

# Example usage (provide actual job_id)
# check_job_status("job_12345678")

In [12]:
# Function to compare multiple experiments
def compare_experiments(job_ids: List[str]):
    """Compare results across multiple experiment jobs"""
    
    all_results = []
    
    for job_id in job_ids:
        status = runner.poll_job(job_id)
        if status and status.status == 'completed':
            results = runner.get_job_results(job_id)
            for r in results:
                all_results.append({
                    'Job ID': job_id,
                    'Experiment': status.experiment_id,
                    'Model': r.model_name,
                    'Accuracy': r.accuracy,
                    'F1 Score': r.f1_score,
                    'Training Time': r.training_time
                })
    
    if all_results:
        df = pd.DataFrame(all_results)
        
        # Summary statistics
        print("\nSummary Statistics:")
        print(df.groupby('Model')[['Accuracy', 'F1 Score', 'Training Time']].mean())
        
        # Best performing model
        best_model = df.loc[df['Accuracy'].idxmax()]
        print(f"\nBest Model:")
        print(best_model)
        
        # Plot comparison
        fig, axes = plt.subplots(1, 3, figsize=(15, 5))
        
        df.groupby('Model')['Accuracy'].mean().plot(kind='bar', ax=axes[0])
        axes[0].set_title('Average Accuracy by Model')
        axes[0].set_ylabel('Accuracy')
        
        df.groupby('Model')['F1 Score'].mean().plot(kind='bar', ax=axes[1])
        axes[1].set_title('Average F1 Score by Model')
        axes[1].set_ylabel('F1 Score')
        
        df.groupby('Model')['Training Time'].mean().plot(kind='bar', ax=axes[2])
        axes[2].set_title('Average Training Time by Model')
        axes[2].set_ylabel('Time (seconds)')
        
        plt.tight_layout()
        plt.show()
        
        return df
    else:
        print("No completed experiments found")
        return None

# Example usage with multiple job IDs
# compare_experiments(["job_id1", "job_id2", "job_id3"])

## 9. API Interface for Integration

In [13]:
class ExperimentAPI:
    """API interface for integration with the FastAPI server"""
    
    def __init__(self):
        self.runner = runner
    
    async def execute_experiment(
        self,
        plan_id: str,
        code_files: List[Dict[str, str]],
        spec: Dict[str, Any]
    ) -> Dict[str, str]:
        """Execute an experiment from API request"""
        
        # Create experiment config from spec
        config = ExperimentConfig(
            experiment_id=plan_id,
            model_config=spec.get('model', {}),
            dataset=spec.get('dataset', 'cifar10'),
            metrics=spec.get('metrics', ['accuracy', 'f1']),
            baselines=spec.get('baselines', ['cnn_basic']),
            training_params=spec.get('train', {})
        )
        
        # Combine code files into single script
        script = self._combine_code_files(code_files)
        
        # Submit experiment
        job_id = await self.runner.submit_experiment(script, config)
        
        return {
            'job_id': job_id,
            'status': 'running'
        }
    
    def get_job_status(self, job_id: str) -> Dict[str, Any]:
        """Get job status for API response"""
        status = self.runner.poll_job(job_id)
        
        if not status:
            return {'error': 'Job not found'}
        
        response = {
            'job_id': status.job_id,
            'status': status.status,
            'started_at': status.started_at.isoformat(),
        }
        
        if status.completed_at:
            response['completed_at'] = status.completed_at.isoformat()
        
        if status.error:
            response['error'] = status.error
        
        if status.results:
            response['metrics'] = status.results
            response['artifacts_path'] = status.artifacts_path
        
        return response
    
    async def get_report_data(self, job_id: str) -> Dict[str, Any]:
        """Get data for report generation"""
        status = self.runner.poll_job(job_id)
        
        if not status or status.status != 'completed':
            return {'error': 'Job not completed'}
        
        # Get benchmark comparison
        comparison = self.runner.compare_benchmarks(job_id)
        
        # Get artifacts
        artifacts = await self.runner.get_artifacts(job_id)
        
        return {
            'job_id': job_id,
            'metrics': status.results,
            'comparison': comparison.to_dict('records') if not comparison.empty else [],
            'artifacts': artifacts
        }
    
    def _combine_code_files(self, code_files: List[Dict[str, str]]) -> str:
        """Combine multiple code files into a single executable script"""
        script_parts = []
        
        for file_info in code_files:
            path = file_info.get('path', '')
            content = file_info.get('content', '')
            
            if path.endswith('.py'):
                script_parts.append(f"# File: {path}")
                script_parts.append(content)
                script_parts.append("\n")
        
        return "\n".join(script_parts)

# Create global API instance
api = ExperimentAPI()

print("Experiment API initialized and ready for integration")

Experiment API initialized and ready for integration


## 10. Testing and Validation

In [14]:
# Test the complete flow
async def test_complete_flow():
    """Test the complete experiment execution flow"""
    
    print("Starting complete flow test...\n")
    
    # 1. Load test inputs
    with open("scripts/sample/execution/sample_config.json", "r") as f:
        config_dict = json.load(f)
    
    with open("scripts/sample/execution/evaluation_criteria.json", "r") as f:
        criteria = json.load(f)
    
    print("✓ Test inputs loaded")
    
    # 2. Create minimal test script
    test_script = '''
import numpy as np
import time

# Simulate model training
print("Running test experiment...")
time.sleep(2)

# Generate mock results
results['test_model'] = {
    'accuracy': 0.85 + np.random.random() * 0.1,
    'f1_score': 0.83 + np.random.random() * 0.1,
    'loss': 0.3 + np.random.random() * 0.2,
    'training_time': 10.5,
    'inference_time': 0.001,
    'loss_history': [0.8, 0.6, 0.4, 0.3],
    'accuracy_history': [0.6, 0.7, 0.8, 0.85]
}

print("Test experiment completed!")
'''
    
    # 3. Submit experiment
    config = ExperimentConfig(**config_dict)
    job_id = await runner.submit_experiment(test_script, config)
    print(f"✓ Experiment submitted: {job_id}")
    
    # 4. Poll for completion
    max_polls = 20
    poll_count = 0
    
    while poll_count < max_polls:
        status = runner.poll_job(job_id)
        
        if status.status in ['completed', 'failed']:
            break
        
        await asyncio.sleep(2)
        poll_count += 1
    
    if status.status == 'completed':
        print("✓ Experiment completed successfully")
        
        # 5. Validate results against criteria
        results = status.results
        if results:
            test_model = results.get('test_model', {})
            
            # Check success criteria
            success = True
            for metric, threshold in criteria['success_criteria'].items():
                if metric == 'min_accuracy':
                    if test_model.get('accuracy', 0) < threshold:
                        success = False
                        print(f"✗ Accuracy below threshold: {test_model.get('accuracy', 0)} < {threshold}")
                    else:
                        print(f"✓ Accuracy meets threshold: {test_model.get('accuracy', 0)} >= {threshold}")
            
            if success:
                print("✓ All success criteria met")
        
        # 6. Get benchmark comparison
        comparison = runner.compare_benchmarks(job_id)
        if not comparison.empty:
            print("✓ Benchmark comparison generated")
            print(comparison)
        
        return True
    else:
        print(f"✗ Experiment failed: {status.error}")
        return False

# Run test (uncomment to execute)
# success = await test_complete_flow()
# print(f"\nTest {'PASSED' if success else 'FAILED'}")

## Summary

This notebook provides a complete Modal execution runtime with the following capabilities:

1. **Modal Serverless Integration**: Spin up serverless instances for ML experiments
2. **Experiment Management**: Submit Python scripts with JSON configuration criteria
3. **Job Tracking**: Generate job IDs and poll for status updates
4. **Benchmark Collection**: Collect and compare metrics across multiple models
5. **Test Inputs**: Sample scripts and configurations in `scripts/sample/execution`

### Key Components:
- `ExperimentRunner`: Main interface for submitting and managing experiments
- `JobManager`: Tracks job statuses and results
- `ExperimentAPI`: Integration interface for FastAPI server
- Modal functions for remote execution
- Sample test inputs and validation criteria

### Usage:
1. Configure Modal authentication
2. Load experiment script and configuration
3. Submit experiment using `runner.submit_experiment()`
4. Poll job status with `runner.poll_job(job_id)`
5. Compare benchmarks with `runner.compare_benchmarks(job_id)`
6. Fetch artifacts with `runner.get_artifacts(job_id)`