# DAG Concurrency Manager - Sample Usage
This notebook demonstrates how to use the production-ready DagConcurrencyManager with various scenarios

In [None]:
import logging
import time
import random
from typing import Dict, Any, Optional
from unittest.mock import Mock, MagicMock

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# Import the DagConcurrencyManager (assuming it's in the same directory)
from dag_concurrency_manager import DagConc

In [None]:
class MockScheduler:
    """Mock scheduler that simulates real scheduler behavior with different slot configurations."""
    
    def __init__(self, failure_rate: float = 0.0):
        self.failure_rate = failure_rate
        self.call_count = 0
        
    def get_slot_stats(self) -> Dict[str, Any]:
        """Simulate scheduler slot statistics with realistic data."""
        self.call_count += 1
        
        # Simulate occasional failures
        if random.random() < self.failure_rate:
            raise Exception("Scheduler connection failed")
        
        # Simulate realistic slot data
        return {
            'cpu_slots': {
                'total': 100,
                'used': random.randint(20, 80),
                'available': None  # Will be calculated
            },
            'memory_slots': {
                'total': 200,
                'used': random.randint(50, 150),
                'available': None
            },
            'gpu_slots': {
                'total': 10,
                'used': random.randint(0, 8),
                'available': None
            },
            'io_slots': {
                'total': 50,
                'used': random.randint(5, 40),
                'available': None
            }
        }

# Create mock scheduler instances
reliable_scheduler = MockScheduler(failure_rate=0.0)
unreliable_scheduler = MockScheduler(failure_rate=0.2)

In [None]:
# Basic usage example
logger.info("=== Basic Usage Example ===")

# Initialize the manager with default configuration
manager = DagConcurrencyManager(scheduler=reliable_scheduler)

# Get capacity analysis
capacity = manager.get_capacity_analysis()
print(f"Recommended DAG limit: {capacity['recommended_dag_limit']}")
print(f"Current utilization: {capacity['current_utilization']:.2%}")
print(f"Bottleneck resource: {capacity['bottleneck_resource']}")
print(f"Safety margin: {capacity['safety_margin']:.2%}")

# Display slot details
print("\nSlot Details:")
for slot_type, details in capacity['slot_details'].items():
    print(f"  {slot_type}: {details['used']}/{details['total']} "
          f"({details['utilization']:.1%} utilization)")

In [None]:
# Configuration updates demonstration
logger.info("=== Configuration Updates ===")

# Update configuration with custom multipliers and weights
new_config = {
    'resource_multipliers': {
        'cpu_slots': 1.2,
        'memory_slots': 1.5,
        'gpu_slots': 2.0,
        'io_slots': 1.1
    },
    'resource_weights': {
        'cpu_slots': 0.4,
        'memory_slots': 0.3,
        'gpu_slots': 0.2,
        'io_slots': 0.1
    },
    'strategy_balance': 0.3,  # More conservative
    'safety_margin': 0.15     # 15% safety margin
}

manager.update_config(new_config)
print("Configuration updated successfully")

# Get new capacity analysis with updated config
updated_capacity = manager.get_capacity_analysis()
print(f"Updated recommended DAG limit: {updated_capacity['recommended_dag_limit']}")
print(f"Updated safety margin: {updated_capacity['safety_margin']:.2%}")

# Compare with previous results
print(f"Change in DAG limit: {updated_capacity['recommended_dag_limit'] - capacity['recommended_dag_

In [None]:
# Performance testing and caching demonstration
logger.info("=== Performance Testing ===")

# Test multiple calculations with timing
start_time = time.time