In [None]:
# Neural Architecture Search (NAS) - Code Examples
## Chapter 4 Companion Notebook

## This notebook contains all the code examples from Chapter 4 of the AutoML book, organized for easy experimentation and learning.

### Table of Contents
#1. [Setup and Dependencies](#setup)
#2. [Search Space Design](#search-spaces)
#3. [Search Strategies](#search-strategies)
#4. [Performance Estimation](#performance-estimation)
#5. [Efficient NAS Techniques](#efficient-nas)
#6. [Practical Tools and Applications](#practical-tools)



## 1. Setup and Dependencies {#setup}


# Core dependencies
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import random
from scipy.optimize import curve_fit
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import cross_val_score
import matplotlib.pyplot as plt

# AutoML frameworks (install as needed)
# pip install autokeras
# pip install optuna
# pip install ray[tune]
# pip install nni

# Optional imports (uncomment as needed)
# import autokeras as ak
# import tensorflow as tf
# import optuna
# from ray import tune
# from ray.tune.suggest.optuna import OptunaSearch
# from ray.tune.schedulers import ASHAScheduler



## 2. Search Space Design {#search-spaces}

### 2.1 Cell-Based Search Space with AutoKeras

# Note: Requires autokeras installation
# pip install autokeras

import autokeras as ak

def create_nas_model():
    """
    Define a cell-based search space for image classification.
    This example shows how to create flexible, scalable search spaces.
    """
    input_node = ak.ImageInput()
    
    # Search over different cell types
    cell_type = ak.Choice(['conv_cell', 'dense_cell', 'mobile_cell'])
    
    # Each cell can have different configurations
    conv_cell = ak.ConvBlock(
        num_blocks=ak.Choice([2, 3, 4]),
        num_layers=ak.Choice([1, 2]),
        filters=ak.Choice([32, 64, 128, 256]),
        kernel_size=ak.Choice([3, 5]),
        activation=ak.Choice(['relu', 'swish'])
    )
    
    # Stack multiple cells with variable depth
    output = conv_cell(input_node)
    for i in range(ak.Choice([3, 5, 7])):  # Variable depth
        output = conv_cell(output)
    
    output = ak.ClassificationHead()(output)
    return ak.AutoModel(inputs=input_node, outputs=output)

# Example usage:
# model = create_nas_model()
# print("Search space created successfully!")


### 2.2 Custom Search Space Definition

class SearchSpaceConfig:
    """
    Configuration class for defining custom search spaces.
    Demonstrates how to encode architectural choices programmatically.
    """
    def __init__(self):
        self.layer_types = ['conv', 'depthwise_conv', 'dilated_conv', 'skip_connect']
        self.filter_sizes = [16, 32, 64, 128, 256]
        self.kernel_sizes = [3, 5, 7]
        self.activation_functions = ['relu', 'swish', 'gelu']
        self.depth_range = (5, 20)
        self.width_multipliers = [0.5, 0.75, 1.0, 1.25]
    
    def sample_architecture(self, depth=None):
        """Sample a random architecture from the search space."""
        if depth is None:
            depth = random.randint(*self.depth_range)
        
        architecture = []
        for i in range(depth):
            layer_config = {
                'type': random.choice(self.layer_types),
                'filters': random.choice(self.filter_sizes),
                'kernel_size': random.choice(self.kernel_sizes),
                'activation': random.choice(self.activation_functions)
            }
            architecture.append(layer_config)
        
        return architecture
    
    def get_search_space_size(self):
        """Calculate the theoretical size of the search space."""
        choices_per_layer = (len(self.layer_types) * 
                           len(self.filter_sizes) * 
                           len(self.kernel_sizes) * 
                           len(self.activation_functions))
        
        total_size = 0
        for depth in range(*self.depth_range):
            total_size += choices_per_layer ** depth
        
        return total_size

# Example usage:
search_config = SearchSpaceConfig()
sample_arch = search_config.sample_architecture(depth=10)
print(f"Sample architecture: {sample_arch[:3]}...")  # Show first 3 layers
print(f"Search space size: {search_config.get_search_space_size():.2e}")




## 3. Search Strategies {#search-strategies}

### 3.1 Evolutionary Neural Architecture Search

def evolutionary_nas(population_size=50, generations=100, search_config=None):
    """
    Simplified evolutionary NAS implementation.
    
    Args:
        population_size: Number of architectures in each generation
        generations: Number of evolutionary cycles
        search_config: SearchSpaceConfig instance
    
    Returns:
        Best architecture found
    """
    if search_config is None:
        search_config = SearchSpaceConfig()
    
    # Initialize random population
    population = []
    for _ in range(population_size):
        arch = search_config.sample_architecture()
        population.append(arch)
    
    fitness_history = []
    
    for generation in range(generations):
        # Evaluate all architectures (placeholder - would train real models)
        fitness_scores = [evaluate_architecture_placeholder(arch) for arch in population]
        fitness_history.append(max(fitness_scores))
        
        # Select top performers (top 50%)
        combined = list(zip(population, fitness_scores))
        combined.sort(key=lambda x: x[1], reverse=True)
        survivors = [arch for arch, _ in combined[:population_size//2]]
        
        # Generate next generation through mutation
        new_population = []
        for parent in survivors:
            # Keep parent
            new_population.append(parent)
            # Create mutated offspring
            child = mutate_architecture(parent, search_config)
            new_population.append(child)
        
        population = new_population
        
        if generation % 10 == 0:
            print(f"Generation {generation}: Best fitness = {max(fitness_scores):.4f}")
    
    # Return best architecture
    final_scores = [evaluate_architecture_placeholder(arch) for arch in population]
    best_idx = np.argmax(final_scores)
    
    return population[best_idx], fitness_history

def mutate_architecture(architecture, search_config, mutation_rate=0.3):
    """
    Mutate an architecture by randomly changing some layers.
    
    Args:
        architecture: List of layer configurations
        search_config: SearchSpaceConfig instance
        mutation_rate: Probability of mutating each layer
    
    Returns:
        Mutated architecture
    """
    mutated = []
    for layer in architecture:
        if random.random() < mutation_rate:
            # Create new random layer
            new_layer = {
                'type': random.choice(search_config.layer_types),
                'filters': random.choice(search_config.filter_sizes),
                'kernel_size': random.choice(search_config.kernel_sizes),
                'activation': random.choice(search_config.activation_functions)
            }
            mutated.append(new_layer)
        else:
            # Keep original layer
            mutated.append(layer.copy())
    
    return mutated

def evaluate_architecture_placeholder(architecture):
    """
    Placeholder evaluation function.
    In practice, this would train the architecture and return validation accuracy.
    """
    # Simulate architecture evaluation with some heuristics
    score = 0.5  # Base score
    
    # Reward skip connections
    skip_connections = sum(1 for layer in architecture if layer['type'] == 'skip_connect')
    score += skip_connections * 0.02
    
    # Penalize very deep or very shallow networks
    depth = len(architecture)
    if 8 <= depth <= 15:
        score += 0.1
    
    # Add some randomness to simulate training variance
    score += random.gauss(0, 0.05)
    
    return max(0, min(1, score))

# Example usage:
# best_arch, history = evolutionary_nas(population_size=20, generations=50)
# print(f"Best architecture found: {len(best_arch)} layers")


### 3.2 Differentiable Architecture Search (DARTS)

class MixedOperation(nn.Module):
    """
    A mixed operation that combines multiple candidate operations.
    Core component of differentiable NAS.
    """
    def __init__(self, operations, channels):
        super().__init__()
        self.operations = nn.ModuleList()
        
        # Create all candidate operations
        for op_name in operations:
            if op_name == 'conv3':
                op = nn.Conv2d(channels, channels, 3, padding=1)
            elif op_name == 'conv5':
                op = nn.Conv2d(channels, channels, 5, padding=2)
            elif op_name == 'maxpool':
                op = nn.MaxPool2d(3, stride=1, padding=1)
            elif op_name == 'skip':
                op = nn.Identity()
            else:
                raise ValueError(f"Unknown operation: {op_name}")
            
            self.operations.append(op)
        
        # Learnable weights for combining operations
        self.weights = nn.Parameter(torch.randn(len(operations)))
    
    def forward(self, x):
        # Weighted combination of all operations
        weights_softmax = F.softmax(self.weights, dim=0)
        output = sum(w * op(x) for w, op in zip(weights_softmax, self.operations))
        return output

class DARTSCell(nn.Module):
    """
    A DARTS cell that searches over different operation combinations.
    """
    def __init__(self, channels=64, num_nodes=4):
        super().__init__()
        self.num_nodes = num_nodes
        self.operations = ['conv3', 'conv5', 'maxpool', 'skip']
        
        # Create mixed operations for each edge in the cell
        self.mixed_ops = nn.ModuleList()
        for i in range(num_nodes):
            for j in range(i + 2):  # Each node connects to all previous nodes + 2 inputs
                mixed_op = MixedOperation(self.operations, channels)
                self.mixed_ops.append(mixed_op)
    
    def forward(self, x):
        # Process through the cell DAG
        states = [x, x]  # Two initial states
        
        op_idx = 0
        for i in range(self.num_nodes):
            # Collect inputs from all previous nodes
            node_inputs = []
            for j in range(i + 2):
                node_input = self.mixed_ops[op_idx](states[j])
                node_inputs.append(node_input)
                op_idx += 1
            
            # Combine inputs for this node
            node_output = sum(node_inputs)
            states.append(node_output)
        
        # Return concatenation of final states
        return torch.cat(states[-self.num_nodes:], dim=1)
    
    def get_best_architecture(self):
        """Extract the best discrete architecture from learned weights."""
        best_ops = []
        op_idx = 0
        
        for i in range(self.num_nodes):
            node_ops = []
            for j in range(i + 2):
                weights = self.mixed_ops[op_idx].weights
                best_op_idx = torch.argmax(weights)
                best_op = self.operations[best_op_idx]
                node_ops.append((j, best_op))
                op_idx += 1
            best_ops.append(node_ops)
        
        return best_ops

class DARTSNetwork(nn.Module):
    """Complete network using DARTS cells."""
    def __init__(self, num_classes=10, channels=64, num_cells=8):
        super().__init__()
        
        # Stem
        self.stem = nn.Conv2d(3, channels, 3, padding=1)
        
        # Stack of DARTS cells
        self.cells = nn.ModuleList()
        for _ in range(num_cells):
            cell = DARTSCell(channels)
            self.cells.append(cell)
        
        # Classifier head
        self.global_pool = nn.AdaptiveAvgPool2d(1)
        self.classifier = nn.Linear(channels * 4, num_classes)  # 4 from concatenation
    
    def forward(self, x):
        x = self.stem(x)
        
        for cell in self.cells:
            x = cell(x)
        
        x = self.global_pool(x)
        x = x.view(x.size(0), -1)
        x = self.classifier(x)
        
        return x

# Example usage:
# model = DARTSNetwork(num_classes=10)
# print("DARTS model created with", sum(p.numel() for p in model.parameters()), "parameters")


## 4. Performance Estimation {#performance-estimation}

### 4.1 Successive Halving for Multi-fidelity Evaluation

def successive_halving_nas(architectures, max_epochs=100, reduction_factor=2):
    """
    Implement Successive Halving for efficient architecture evaluation.
    
    Args:
        architectures: List of architectures to evaluate
        max_epochs: Maximum training epochs for final candidates
        reduction_factor: Factor by which to reduce population each round
    
    Returns:
        Best performing architecture
    """
    candidates = architectures.copy()
    epochs = max_epochs // (reduction_factor ** 3)  # Start with fewer epochs
    
    print(f"Starting Successive Halving with {len(candidates)} candidates")
    
    round_num = 1
    while len(candidates) > 1 and epochs <= max_epochs:
        print(f"\nRound {round_num}: Evaluating {len(candidates)} candidates for {epochs} epochs")
        
        # Train all candidates for current epoch budget
        results = []
        for i, arch in enumerate(candidates):
            score = train_and_evaluate_placeholder(arch, epochs=epochs)
            results.append((arch, score))
            if i % 10 == 0:
                print(f"  Evaluated {i+1}/{len(candidates)} candidates")
        
        # Keep top performers
        results.sort(key=lambda x: x[1], reverse=True)
        keep_count = max(1, len(results) // reduction_factor)
        candidates = [arch for arch, _ in results[:keep_count]]
        
        print(f"  Best score this round: {results[0][1]:.4f}")
        print(f"  Keeping top {keep_count} candidates")
        
        # Double the training budget for next round
        epochs *= reduction_factor
        round_num += 1
    
    return candidates[0] if candidates else None

def train_and_evaluate_placeholder(architecture, epochs):
    """
    Placeholder for training and evaluating an architecture.
    In practice, this would involve actual model training.
    """
    # Simulate training time based on epochs
    import time
    time.sleep(0.01 * epochs)  # Simulate training
    
    # Simulate performance based on architecture and training time
    base_score = evaluate_architecture_placeholder(architecture)
    
    # Improve score based on training epochs (with diminishing returns)
    epoch_bonus = 0.1 * (1 - np.exp(-epochs / 50))
    final_score = min(1.0, base_score + epoch_bonus)
    
    # Add some noise to simulate training variance
    noise = random.gauss(0, 0.02)
    return max(0, final_score + noise)

# Example usage:
# sample_architectures = [search_config.sample_architecture() for _ in range(27)]
# best_architecture = successive_halving_nas(sample_architectures, max_epochs=81)
# print(f"Best architecture has {len(best_architecture)} layers")


### 4.2 One-Shot Architecture Search (Weight Sharing)

class Supernet(nn.Module):
    """
    Supernet for one-shot architecture search.
    Contains all possible operations and shares weights.
    """
    def __init__(self, search_space_config, input_channels=3, num_classes=10):
        super().__init__()
        self.search_config = search_space_config
        
        # Create operations for all possible layer types
        self.operations = nn.ModuleDict()
        
        # Define operation implementations
        for layer_type in search_space_config.layer_types:
            ops_for_type = nn.ModuleDict()
            
            for filters in search_space_config.filter_sizes:
                for kernel_size in search_space_config.kernel_sizes:
                    op_name = f"{layer_type}_{filters}_{kernel_size}"
                    
                    if layer_type == 'conv':
                        op = nn.Conv2d(filters, filters, kernel_size, padding=kernel_size//2)
                    elif layer_type == 'depthwise_conv':
                        op = nn.Conv2d(filters, filters, kernel_size, groups=filters, padding=kernel_size//2)
                    elif layer_type == 'dilated_conv':
                        op = nn.Conv2d(filters, filters, kernel_size, dilation=2, padding=kernel_size)
                    elif layer_type == 'skip_connect':
                        op = nn.Identity()
                    
                    ops_for_type[op_name] = op
            
            self.operations[layer_type] = ops_for_type
        
        # Stem and head
        self.stem = nn.Conv2d(input_channels, 64, 3, padding=1)
        self.head = nn.Linear(64, num_classes)
        self.pool = nn.AdaptiveAvgPool2d(1)
    
    def forward(self, x, architecture):
        """
        Forward pass with specific architecture.
        
        Args:
            x: Input tensor
            architecture: List of layer configurations
        """
        x = self.stem(x)
        
        for layer_config in architecture:
            layer_type = layer_config['type']
            filters = layer_config['filters']
            kernel_size = layer_config['kernel_size']
            
            op_name = f"{layer_type}_{filters}_{kernel_size}"
            
            if op_name in self.operations[layer_type]:
                # Adjust channels if needed
                if x.size(1) != filters:
                    x = F.adaptive_avg_pool2d(x, x.size()[2:])
                    if x.size(1) < filters:
                        # Pad channels
                        padding = filters - x.size(1)
                        x = F.pad(x, (0, 0, 0, 0, 0, padding))
                    elif x.size(1) > filters:
                        # Reduce channels
                        x = x[:, :filters, :, :]
                
                x = self.operations[layer_type][op_name](x)
                x = F.relu(x)
        
        x = self.pool(x)
        x = x.view(x.size(0), -1)
        x = self.head(x)
        
        return x

def evaluate_architecture_fast(supernet, architecture, test_loader):
    """
    Evaluate architecture using pre-trained supernet weights.
    This is much faster than training from scratch.
    """
    supernet.eval()
    correct = 0
    total = 0
    
    with torch.no_grad():
        for data, targets in test_loader:
            outputs = supernet(data, architecture)
            _, predicted = torch.max(outputs.data, 1)
            total += targets.size(0)
            correct += (predicted == targets).sum().item()
    
    return correct / total

# Example usage (requires actual data):
# supernet = Supernet(search_config)
# sample_arch = search_config.sample_architecture(depth=5)
# print("Supernet created with", sum(p.numel() for p in supernet.parameters()), "parameters")


### 4.3 Learning Curve Extrapolation

def exponential_curve(x, a, b, c):
    """Exponential saturation curve for learning curve fitting."""
    return a * (1 - np.exp(-b * x)) + c

def predict_final_accuracy(early_accuracies, target_epochs, plot=False):
    """
    Predict final accuracy from early training epochs.
    
    Args:
        early_accuracies: List of validation accuracies from early epochs
        target_epochs: Number of epochs to predict performance for
        plot: Whether to visualize the fit
    
    Returns:
        Predicted final accuracy
    """
    if len(early_accuracies) < 3:
        return early_accuracies[-1]  # Need at least 3 points to fit
    
    epochs = np.arange(1, len(early_accuracies) + 1)
    
    # Fit exponential curve to early training
    try:
        # Initial parameter guess
        initial_guess = [max(early_accuracies), 0.1, min(early_accuracies)]
        params, _ = curve_fit(exponential_curve, epochs, early_accuracies, 
                            p0=initial_guess, maxfev=1000)
        
        predicted = exponential_curve(target_epochs, *params)
        predicted = max(0, min(1, predicted))  # Clamp to valid range
        
        if plot:
            plt.figure(figsize=(10, 6))
            plt.plot(epochs, early_accuracies, 'bo', label='Observed')
            
            extended_epochs = np.linspace(1, target_epochs, 100)
            predicted_curve = exponential_curve(extended_epochs, *params)
            plt.plot(extended_epochs, predicted_curve, 'r-', label='Fitted curve')
            
            plt.axvline(x=len(early_accuracies), color='g', linestyle='--', 
                       label='Prediction point')
            plt.axhline(y=predicted, color='r', linestyle='--', 
                       label=f'Predicted: {predicted:.3f}')
            
            plt.xlabel('Epochs')
            plt.ylabel('Accuracy')
            plt.title('Learning Curve Extrapolation')
            plt.legend()
            plt.grid(True)
            plt.show()
        
        return predicted
        
    except Exception as e:
        print(f"Curve fitting failed: {e}")
        return early_accuracies[-1]  # Fallback to last observed value

def simulate_learning_curve(true_final_accuracy=0.85, noise_level=0.02):
    """Simulate a realistic learning curve for testing."""
    epochs = np.arange(1, 21)  # 20 epochs
    
    # Generate realistic learning curve
    base_curve = true_final_accuracy * (1 - np.exp(-epochs / 8))
    
    # Add realistic noise (decreasing over time)
    noise = np.random.normal(0, noise_level / np.sqrt(epochs))
    noisy_curve = base_curve + noise
    
    return np.maximum(0, np.minimum(1, noisy_curve))

# Example usage:
# Simulate early training results
simulated_accuracies = simulate_learning_curve()
early_epochs = simulated_accuracies[:5]  # First 5 epochs
true_final = simulated_accuracies[-1]  # True final accuracy

predicted_final = predict_final_accuracy(early_epochs, target_epochs=20, plot=True)
print(f"Predicted final accuracy: {predicted_final:.3f}")
print(f"True final accuracy: {true_final:.3f}")
print(f"Prediction error: {abs(predicted_final - true_final):.3f}")


### 4.4 Zero-Cost Proxies

def snip_score(model, data_sample):
    """
    Compute SNIP score for architecture ranking.
    SNIP measures gradient magnitudes after a single forward-backward pass.
    """
    model.train()
    
    # Single forward-backward pass
    data, targets = data_sample
    outputs = model(data)
    loss = F.cross_entropy(outputs, targets)
    
    # Compute gradients
    gradients = torch.autograd.grad(loss, model.parameters(), create_graph=False)
    
    # SNIP score is sum of gradient magnitudes
    score = sum(torch.sum(torch.abs(grad)) for grad in gradients if grad is not None)
    return score.item()

def gradient_norm_proxy(model, data_sample):
    """Simple gradient norm proxy for architecture evaluation."""
    model.train()
    
    data, targets = data_sample
    outputs = model(data)
    loss = F.cross_entropy(outputs, targets)
    
    # Compute gradients
    total_norm = 0
    for param in model.parameters():
        if param.grad is not None:
            param_norm = param.grad.data.norm(2)
            total_norm += param_norm.item() ** 2
    
    return total_norm ** 0.5

def connectivity_proxy(architecture):
    """
    Measure architecture connectivity as a proxy for performance.
    More connected architectures often perform better.
    """
    skip_connections = sum(1 for layer in architecture if layer['type'] == 'skip_connect')
    total_layers = len(architecture)
    
    connectivity_ratio = skip_connections / max(1, total_layers)
    return connectivity_ratio

class ZeroCostFilter:
    """
    Multi-proxy filter for rapid architecture screening.
    """
    def __init__(self):
        self.proxies = {
            'connectivity': connectivity_proxy,
            # Add more proxies as functions that take (model, data) or just (architecture)
        }
    
    def filter_architectures(self, architectures, keep_fraction=0.1):
        """
        Filter architectures using multiple zero-cost proxies.
        
        Args:
            architectures: List of architectures to filter
            keep_fraction: Fraction of architectures to keep
        
        Returns:
            Filtered list of architectures
        """
        scores = {}
        
        # Compute proxy scores
        for i, arch in enumerate(architectures):
            proxy_scores = []
            
            # Connectivity proxy (no model needed)
            conn_score = connectivity_proxy(arch)
            proxy_scores.append(conn_score)
            
            # For model-based proxies, you would create the model and evaluate:
            # model = create_model_from_architecture(arch)
            # snip = snip_score(model, data_sample)
            # proxy_scores.append(snip)
            
            # Combine scores (simple average here)
            scores[i] = np.mean(proxy_scores)
        
        # Sort by score and keep top fraction
        sorted_indices = sorted(range(len(architectures)), 
                              key=lambda i: scores[i], reverse=True)
        keep_count = int(len(architectures) * keep_fraction)
        
        filtered_architectures = [architectures[i] for i in sorted_indices[:keep_count]]
        
        return filtered_architectures

# Example usage:
filter_system = ZeroCostFilter()
sample_architectures = [search_config.sample_architecture() for _ in range(100)]
filtered_archs = filter_system.filter_architectures(sample_architectures, keep_fraction=0.2)
print(f"Filtered {len(sample_architectures)} architectures down to {len(filtered_archs)}")


## 5. Efficient NAS Techniques {#efficient-nas}

### 5.1 Supernet Training with Balanced Sampling
class SupernetTraining:
    """
    Training system for supernets with balanced operation sampling.
    Prevents bias toward simple operations during training.
    """
    def __init__(self, supernet, search_space):
        self.supernet = supernet
        self.search_space = search_space
        self.operation_counters = {}  # Track operation usage
        
    def compute_balanced_weights(self, operations):
        """
        Compute sampling weights to balance operation usage.
        """
        if not self.operation_counters:
            # Equal weights initially
            return np.ones(len(operations)) / len(operations)
        
        # Inverse frequency weighting
        weights = []
        for op in operations:
            count = self.operation_counters.get(op, 0)
            weight = 1.0 / (count + 1)  # +1 to avoid division by zero
            weights.append(weight)
        
        # Normalize
        weights = np.array(weights)
        weights = weights / weights.sum()
        
        return weights
    
    def sample_architecture(self):
        """Sample architecture ensuring balanced operation usage."""
        architecture = []
        
        for layer_idx in range(10):  # Fixed depth for example
            # Get available operations for this layer
            ops = self.search_space.layer_types
            
            # Balance operation sampling to prevent bias
            op_weights = self.compute_balanced_weights(ops)
            chosen_op = np.random.choice(ops, p=op_weights)
            
            # Sample other parameters
            layer_config = {
                'type': chosen_op,
                'filters': random.choice(self.search_space.filter_sizes),
                'kernel_size': random.choice(self.search_space.kernel_sizes),
                'activation': random.choice(self.search_space.activation_functions)
            }
            
            architecture.append(layer_config)
            
            # Update counter
            self.operation_counters[chosen_op] = self.operation_counters.get(chosen_op, 0) + 1
        
        return architecture
    
    def train_step(self, batch):
        """Single training step with architecture sampling."""
        # Sample different architecture for each training step
        architecture = self.sample_architecture()
        
        # Forward pass with sampled architecture
        inputs, targets = batch
        outputs = self.supernet(inputs, architecture)
        loss = F.cross_entropy(outputs, targets)
        
        # Backward pass updates only active operations
        loss.backward()
        return loss.item()
    
    def get_operation_statistics(self):
        """Get statistics about operation sampling."""
        total_samples = sum(self.operation_counters.values())
        stats = {}
        
        for op, count in self.operation_counters.items():
            stats[op] = {
                'count': count,
                'frequency': count / total_samples if total_samples > 0 else 0
            }
        
        return stats

# Example usage:
# supernet = Supernet(search_config)
# trainer = SupernetTraining(supernet, search_config)
# 
# # Simulate training
# for step in range(100):
#     # Create dummy batch
#     batch = (torch.randn(4, 3, 32, 32), torch.randint(0, 10, (4,)))
#     loss = trainer.train_step(batch)
#     
#     if step % 20 == 0:
#         print(f"Step {step}, Loss: {loss:.4f}")
# 
# # Check operation balance
# stats = trainer.get_operation_statistics()
# for op, stat in stats.items():
#     print(f"{op}: {stat['frequency']:.3f}")

### 5.2 Once-For-All (OFA) Networks

class EfficiencyPredictor:
    """
    Predictor for hardware efficiency metrics (latency, memory, energy).
    In practice, this would be trained on actual hardware measurements.
    """
    def __init__(self):
        self.latency_base = 10  # Base latency in ms
        self.memory_base = 100  # Base memory in MB
        self.energy_base = 50   # Base energy in mJ
    
    def predict_latency(self, architecture):
        """Predict inference latency for architecture."""
        complexity = 0
        for layer in architecture:
            if layer['type'] == 'conv':
                complexity += layer['filters'] * layer['kernel_size']**2
            elif layer['type'] == 'depthwise_conv':
                complexity += layer['filters'] * layer['kernel_size']**2 * 0.3
            elif layer['type'] == 'skip_connect':
                complexity += 1
        
        return self.latency_base + complexity * 0.01
    
    def predict_memory(self, architecture):
        """Predict memory usage for architecture."""
        memory = self.memory_base
        for layer in architecture:
            memory += layer['filters'] * 0.5  # Simplified calculation
        return memory
    
    def predict_energy(self, architecture):
        """Predict energy consumption for architecture."""
        energy = self.energy_base
        for layer in architecture:
            if layer['type'] in ['conv', 'depthwise_conv']:
                energy += layer['filters'] * layer['kernel_size'] * 0.1
        return energy

class OFANetwork:
    """
    Once-For-All network that can extract specialized sub-networks
    for different deployment constraints.
    """
    def __init__(self, max_depth=20, max_width=320):
        self.supernet = None  # Would be actual supernet
        self.efficiency_predictor = EfficiencyPredictor()
        self.max_depth = max_depth
        self.max_width = max_width
    
    def extract_subnet(self, constraints):
        """
        Extract optimal subnet for given constraints.
        
        Args:
            constraints: Dict with keys like 'max_latency', 'max_memory', etc.
        
        Returns:
            Architecture configuration that meets constraints
        """
        best_arch = None
        best_score = 0
        
        # Search over possible configurations
        for depth in range(5, min(self.max_depth, constraints.get('max_depth', 20))):
            for width_mult in [0.5, 0.75, 1.0]:
                # Generate architecture with these specifications
                arch = self.generate_architecture(depth, width_mult)
                
                # Check if architecture meets constraints
                if self.meets_constraints(arch, constraints):
                    # Estimate accuracy (would use actual prediction in practice)
                    score = self.estimate_accuracy(arch)
                    
                    if score > best_score:
                        best_arch = arch
                        best_score = score
        
        return best_arch
    
    def generate_architecture(self, depth, width_multiplier):
        """Generate architecture with specified depth and width."""
        base_filters = int(64 * width_multiplier)
        
        architecture = []
        for i in range(depth):
            # Gradually increase filters with depth
            filters = min(base_filters * (2 ** (i // 3)), self.max_width)
            
            layer_config = {
                'type': random.choice(['conv', 'depthwise_conv', 'skip_connect']),
                'filters': filters,
                'kernel_size': random.choice([3, 5]),
                'activation': 'relu'
            }
            architecture.append(layer_config)
        
        return architecture
    
    def meets_constraints(self, architecture, constraints):
        """Check if architecture satisfies deployment constraints."""
        latency = self.efficiency_predictor.predict_latency(architecture)
        memory = self.efficiency_predictor.predict_memory(architecture)
        energy = self.efficiency_predictor.predict_energy(architecture)
        
        checks = []
        if 'max_latency' in constraints:
            checks.append(latency <= constraints['max_latency'])
        if 'max_memory' in constraints:
            checks.append(memory <= constraints['max_memory'])
        if 'max_energy' in constraints:
            checks.append(energy <= constraints['max_energy'])
        
        return all(checks)
    
    def estimate_accuracy(self, architecture):
        """Estimate accuracy for architecture (placeholder)."""
        # Simple heuristic based on architecture properties
        base_score = 0.7
        
        # Reward balanced depth
        depth = len(architecture)
        if 8 <= depth <= 15:
            base_score += 0.1
        
        # Reward skip connections
        skip_ratio = sum(1 for layer in architecture if layer['type'] == 'skip_connect') / depth
        base_score += skip_ratio * 0.1
        
        return min(1.0, base_score + random.gauss(0, 0.02))

# Example usage:
ofa = OFANetwork()

# Define deployment constraints
mobile_constraints = {
    'max_latency': 50,   # 50ms
    'max_memory': 200,   # 200MB
    'max_energy': 100    # 100mJ
}

server_constraints = {
    'max_latency': 10,   # 10ms
    'max_memory': 1000,  # 1GB
    'max_energy': 500    # 500mJ
}

# Extract specialized architectures
mobile_arch = ofa.extract_subnet(mobile_constraints)
server_arch = ofa.extract_subnet(server_constraints)

print(f"Mobile architecture: {len(mobile_arch)} layers")
print(f"Server architecture: {len(server_arch)} layers")

# Compare efficiency
mobile_latency = ofa.efficiency_predictor.predict_latency(mobile_arch)
server_latency = ofa.efficiency_predictor.predict_latency(server_arch)

print(f"Mobile latency: {mobile_latency:.1f}ms")
print(f"Server latency: {server_latency:.1f}ms")

### 5.3 Progressive Search

class ProgressiveNAS:
    """
    Progressive NAS that starts simple and gradually increases complexity.
    """
    def __init__(self):
        self.search_stages = [
            {
                'name': 'Basic',
                'max_depth': 5,
                'operations': ['conv', 'skip_connect'],
                'max_filters': 64,
                'trials': 20
            },
            {
                'name': 'Intermediate', 
                'max_depth': 10,
                'operations': ['conv', 'depthwise_conv', 'skip_connect'],
                'max_filters': 128,
                'trials': 30
            },
            {
                'name': 'Advanced',
                'max_depth': 20,
                'operations': ['conv', 'depthwise_conv', 'dilated_conv', 'skip_connect'],
                'max_filters': 256,
                'trials': 50
            }
        ]
    
    def create_search_space(self, stage_config):
        """Create search space configuration for a specific stage."""
        config = SearchSpaceConfig()
        config.layer_types = stage_config['operations']
        config.filter_sizes = [f for f in config.filter_sizes if f <= stage_config['max_filters']]
        config.depth_range = (3, stage_config['max_depth'])
        return config
    
    def run_search(self):
        """Run progressive search across all stages."""
        best_arch = None
        search_history = []
        
        for stage in self.search_stages:
            print(f"\n=== {stage['name']} Stage ===")
            print(f"Max depth: {stage['max_depth']}")
            print(f"Operations: {stage['operations']}")
            print(f"Trials: {stage['trials']}")
            
            # Create search space for this stage
            search_space = self.create_search_space(stage)
            
            # Generate candidate architectures
            candidates = []
            for _ in range(stage['trials']):
                arch = search_space.sample_architecture()
                candidates.append(arch)
            
            # Add best from previous stage as seed (if available)
            if best_arch:
                # Adapt previous best architecture to current stage constraints
                adapted_arch = self.adapt_architecture(best_arch, stage)
                if adapted_arch:
                    candidates.append(adapted_arch)
            
            # Evaluate candidates
            stage_results = []
            for arch in candidates:
                score = evaluate_architecture_placeholder(arch)
                stage_results.append((arch, score))
            
            # Find best in this stage
            stage_results.sort(key=lambda x: x[1], reverse=True)
            stage_best_arch, stage_best_score = stage_results[0]
            
            print(f"Best score: {stage_best_score:.4f}")
            print(f"Best architecture depth: {len(stage_best_arch)}")
            
            # Update global best
            if best_arch is None or stage_best_score > search_history[-1]['best_score']:
                best_arch = stage_best_arch
            
            # Record stage results
            search_history.append({
                'stage': stage['name'],
                'best_score': stage_best_score,
                'best_arch': stage_best_arch,
                'num_candidates': len(candidates)
            })
        
        return best_arch, search_history
    
    def adapt_architecture(self, architecture, stage_config):
        """
        Adapt an architecture from previous stage to current stage constraints.
        """
        adapted = []
        
        for layer in architecture:
            # Check if layer type is allowed in this stage
            if layer['type'] in stage_config['operations']:
                # Adapt filter size if needed
                adapted_layer = layer.copy()
                if layer['filters'] > stage_config['max_filters']:
                    adapted_layer['filters'] = stage_config['max_filters']
                adapted.append(adapted_layer)
        
        # Truncate if too deep
        if len(adapted) > stage_config['max_depth']:
            adapted = adapted[:stage_config['max_depth']]
        
        return adapted if adapted else None

# Example usage:
progressive_nas = ProgressiveNAS()
best_architecture, history = progressive_nas.run_search()

print(f"\n=== Final Results ===")
print(f"Best architecture found: {len(best_architecture)} layers")

# Show progression through stages
for stage_result in history:
    print(f"{stage_result['stage']}: {stage_result['best_score']:.4f} "
          f"({stage_result['num_candidates']} candidates)")

## 6. Practical Tools and Applications {#practical-tools}

### 6.1 AutoKeras Integration

# Note: This requires autokeras installation
# pip install autokeras tensorflow

def autokeras_image_classification_example():
    """
    Complete example using AutoKeras for automated architecture search.
    """
    try:
        import autokeras as ak
        import tensorflow as tf
        
        # Load sample data (CIFAR-10)
        (x_train, y_train), (x_test, y_test) = tf.keras.datasets.cifar10.load_data()
        
        # Normalize pixel values
        x_train = x_train.astype('float32') / 255.0
        x_test = x_test.astype('float32') / 255.0
        
        print(f"Training data shape: {x_train.shape}")
        print(f"Test data shape: {x_test.shape}")
        
        # Create AutoKeras image classifier
        clf = ak.ImageClassifier(
            max_trials=10,  # Number of different architectures to try
            overwrite=True,
            objective='val_accuracy',
            seed=42
        )
        
        # Search for best architecture and train
        print("Starting architecture search...")
        clf.fit(
            x_train, y_train,
            validation_split=0.2,
            epochs=5,  # Few epochs for demo
            verbose=1
        )
        
        # Evaluate on test set
        test_loss, test_acc = clf.evaluate(x_test, y_test, verbose=0)
        print(f"\nTest accuracy: {test_acc:.4f}")
        
        # Export the best model
        best_model = clf.export_model()
        print(f"Best model has {best_model.count_params()} parameters")
        
        # Save the model
        # best_model.save('autokeras_best_model.h5')
        
        return clf, best_model
        
    except ImportError:
        print("AutoKeras not installed. Install with: pip install autokeras")
        return None, None

# Uncomment to run:
# clf, model = autokeras_image_classification_example()

### 6.2 Optuna Integration

def optuna_nas_example():
    """
    Example using Optuna for architecture search with custom objective.
    """
    try:
        import optuna
        
        def objective(trial):
            """
            Objective function for Optuna optimization.
            Defines the architecture search space and evaluation.
            """
            # Define architecture hyperparameters
            num_layers = trial.suggest_int('num_layers', 3, 15)
            base_filters = trial.suggest_categorical('base_filters', [32, 64, 128])
            kernel_size = trial.suggest_categorical('kernel_size', [3, 5, 7])
            dropout_rate = trial.suggest_float('dropout_rate', 0.1, 0.5)
            activation = trial.suggest_categorical('activation', ['relu', 'swish', 'gelu'])
            
            # Use these to create architecture
            architecture = []
            for i in range(num_layers):
                # Gradually increase filters
                filters = base_filters * (2 ** (i // 3))
                layer_config = {
                    'type': 'conv',
                    'filters': min(filters, 512),  # Cap at 512
                    'kernel_size': kernel_size,
                    'activation': activation
                }
                architecture.append(layer_config)
            
            # Add dropout configuration
            for layer in architecture:
                layer['dropout'] = dropout_rate
            
            # Evaluate architecture (placeholder)
            score = evaluate_architecture_placeholder(architecture)
            
            # Add small bonus for efficient architectures
            efficiency_bonus = 0.01 / (1 + len(architecture) * base_filters / 1000)
            final_score = score + efficiency_bonus
            
            return final_score
        
        # Create study and optimize
        study = optuna.create_study(
            direction='maximize',
            study_name='nas_example',
            pruner=optuna.pruners.MedianPruner()  # Early stopping
        )
        
        print("Starting Optuna optimization...")
        study.optimize(objective, n_trials=50, timeout=300)  # 5 minute timeout
        
        # Print results
        print(f"\nBest trial: {study.best_trial.number}")
        print(f"Best score: {study.best_value:.4f}")
        print("Best parameters:")
        for key, value in study.best_params.items():
            print(f"  {key}: {value}")
        
        # Create best architecture
        best_params = study.best_params
        best_architecture = []
        for i in range(best_params['num_layers']):
            filters = best_params['base_filters'] * (2 ** (i // 3))
            layer_config = {
                'type': 'conv',
                'filters': min(filters, 512),
                'kernel_size': best_params['kernel_size'],
                'activation': best_params['activation'],
                'dropout': best_params['dropout_rate']
            }
            best_architecture.append(layer_config)
        
        return study, best_architecture
        
    except ImportError:
        print("Optuna not installed. Install with: pip install optuna")
        return None, None

# Example usage:
# study, best_arch = optuna_nas_example()
# if best_arch:
#     print(f"Best architecture has {len(best_arch)} layers")

### 6.3 Ray Tune Integration

def ray_tune_nas_example():
    """
    Example using Ray Tune for distributed architecture search.
    """
    try:
        from ray import tune
        from ray.tune.schedulers import ASHAScheduler
        from ray.tune.suggest.optuna import OptunaSearch
        
        def train_architecture(config):
            """
            Training function for Ray Tune.
            This would contain actual model training in practice.
            """
            # Build architecture from config
            architecture = []
            for i in range(config['num_layers']):
                layer_config = {
                    'type': config['layer_type'],
                    'filters': config['base_filters'] * (2 ** (i // 3)),
                    'kernel_size': config['kernel_size'],
                    'activation': config['activation']
                }
                architecture.append(layer_config)
            
            # Simulate training epochs with intermediate reporting
            for epoch in range(config['max_epochs']):
                # Simulate training
                intermediate_score = evaluate_architecture_placeholder(architecture)
                
                # Add epoch-based improvement
                epoch_bonus = 0.1 * (1 - np.exp(-epoch / 20))
                current_score = intermediate_score + epoch_bonus
                
                # Report intermediate result for early stopping
                tune.report(accuracy=current_score, epoch=epoch)
        
        # Define search space
        search_space = {
            'num_layers': tune.randint(3, 15),
            'base_filters': tune.choice([32, 64, 128]),
            'kernel_size': tune.choice([3, 5, 7]),
            'layer_type': tune.choice(['conv', 'depthwise_conv']),
            'activation': tune.choice(['relu', 'swish', 'gelu']),
            'max_epochs': 50
        }
        
        # Configure scheduler for early stopping
        scheduler = ASHAScheduler(
            metric='accuracy',
            mode='max',
            max_t=50,  # Maximum epochs
            grace_period=5,  # Don't eliminate before epoch 5
            reduction_factor=2  # Keep half at each round
        )
        
        # Configure search algorithm
        search_alg = OptunaSearch()
        
        print("Starting Ray Tune optimization...")
        
        # Run the search
        analysis = tune.run(
            train_architecture,
            config=search_space,
            num_samples=20,  # Number of trials
            scheduler=scheduler,
            search_alg=search_alg,
            resources_per_trial={'cpu': 1},  # Adjust based on your setup
            verbose=1
        )
        
        # Get best result
        best_config = analysis.best_config
        best_score = analysis.best_result['accuracy']
        
        print(f"\nBest configuration found:")
        print(f"Score: {best_score:.4f}")
        for key, value in best_config.items():
            print(f"  {key}: {value}")
        
        return analysis, best_config
        
    except ImportError:
        print("Ray Tune not installed. Install with: pip install ray[tune]")
        return None, None

# Example usage:
# analysis, best_config = ray_tune_nas_example()

### 6.4 Complete NAS Pipeline

class CompletePipeLineNAS:
    """
    Complete NAS pipeline combining multiple techniques.
    """
    def __init__(self, search_config=None):
        self.search_config = search_config or SearchSpaceConfig()
        self.zero_cost_filter = ZeroCostFilter()
        self.results_history = []
    
    def run_complete_search(self, total_budget_hours=2):
        """
        Run complete NAS pipeline with time budget.
        
        Args:
            total_budget_hours: Total time budget in hours
        
        Returns:
            Best architecture and search statistics
        """
        import time
        start_time = time.time()
        budget_seconds = total_budget_hours * 3600
        
        print(f"Starting complete NAS pipeline with {total_budget_hours}h budget")
        
        # Stage 1: Generate large pool of candidates
        print("\n=== Stage 1: Candidate Generation ===")
        num_initial_candidates = 1000
        candidates = [
            self.search_config.sample_architecture() 
            for _ in range(num_initial_candidates)
        ]
        print(f"Generated {len(candidates)} initial candidates")
        
        # Stage 2: Zero-cost filtering
        print("\n=== Stage 2: Zero-Cost Filtering ===")
        filtered_candidates = self.zero_cost_filter.filter_architectures(
            candidates, keep_fraction=0.1
        )
        print(f"Filtered to {len(filtered_candidates)} candidates")
        
        # Stage 3: Multi-fidelity evaluation
        print("\n=== Stage 3: Multi-Fidelity Evaluation ===")
        if len(filtered_candidates) > 27:
            # Further reduce if still too many
            filtered_candidates = filtered_candidates[:27]
        
        best_arch = successive_halving_nas(
            filtered_candidates, 
            max_epochs=20,  # Reduced for demo
            reduction_factor=3
        )
        
        # Stage 4: Final validation
        print("\n=== Stage 4: Final Validation ===")
        final_score = train_and_evaluate_placeholder(best_arch, epochs=50)
        
        elapsed_time = time.time() - start_time
        
        # Compile results
        results = {
            'best_architecture': best_arch,
            'final_score': final_score,
            'initial_candidates': num_initial_candidates,
            'filtered_candidates': len(filtered_candidates),
            'elapsed_time_hours': elapsed_time / 3600,
            'budget_used_percent': (elapsed_time / budget_seconds) * 100
        }
        
        self.results_history.append(results)
        
        print(f"\n=== Final Results ===")
        print(f"Best architecture: {len(best_arch)} layers")
        print(f"Final score: {final_score:.4f}")
        print(f"Time used: {elapsed_time/3600:.2f}h ({results['budget_used_percent']:.1f}% of budget)")
        
        return best_arch, results
    
    def analyze_results(self):
        """Analyze results from multiple runs."""
        if not self.results_history:
            print("No results to analyze")
            return
        
        scores = [r['final_score'] for r in self.results_history]
        times = [r['elapsed_time_hours'] for r in self.results_history]
        
        print(f"\n=== Analysis of {len(self.results_history)} runs ===")
        print(f"Best score: {max(scores):.4f}")
        print(f"Average score: {np.mean(scores):.4f} ± {np.std(scores):.4f}")
        print(f"Average time: {np.mean(times):.2f}h ± {np.std(times):.2f}h")

# Example usage:
pipeline = CompletePipeLineNAS()
best_architecture, results = pipeline.run_complete_search(total_budget_hours=0.1)  # 6 minutes for demo

# Run multiple times to see consistency
# for i in range(3):
#     print(f"\n{'='*50}")
#     print(f"Run {i+1}")
#     pipeline.run_complete_search(total_budget_hours=0.05)

# pipeline.analyze_results()


## Usage Instructions

#1. **Setup**: Install required dependencies using pip
#2. **Basic Usage**: Start with the AutoKeras example for simplicity
#3. **Custom Search**: Use the SearchSpaceConfig class to define your own search spaces
#4. **Evaluation**: Implement your own `train_and_evaluate` function for real model training
#5. **Scaling**: Use Ray Tune for distributed search across multiple machines

## Notes

#- Most functions include placeholder evaluations for demonstration
#- Replace `evaluate_architecture_placeholder` with actual model training
#- Adjust search spaces and budgets based on your computational resources
#- Consider starting with smaller search spaces and gradually expanding

## Further Reading

#- Original DARTS paper: https://arxiv.org/abs/1806.09055
#- NAS survey: https://arxiv.org/abs/1808.05377
#- AutoML book: Chapter 4 for detailed explanations