# Flow SDK Advanced Features

This notebook explores advanced Flow SDK features for production workloads, including instance catalogs, persistent storage, multi-node computing, and self-managing tasks.

## What You'll Learn

1. **Instance Catalog** - Query and select optimal instances
2. **Storage Management** - Persistent volumes and data transfer
3. **Multi-Node Computing** - Distributed and parallel tasks
4. **Port Forwarding** - Interactive services and tunneling
5. **Custom Startup Scripts** - Advanced initialization
6. **Self-Terminating Tasks** - Cost optimization patterns
7. **Monitoring & Observability** - Logging and metrics

In [None]:
# Import required modules
from flow import Flow, TaskConfig
from flow.models import Instance, Volume
import json
import time
from datetime import datetime

## 1. Instance Catalog - Finding the Right Compute

Flow SDK provides a powerful catalog system to find instances that match your requirements.

In [None]:
# Query instance catalog
with Flow() as flow:
    # Find GPU instances under $20/hour
    requirements = {
        "max_price": 20.0,
        "min_gpu_count": 1,
        "gpu_memory_gb": 16  # Minimum GPU memory
    }
    
    print("Searching for GPU instances...\n")
    instances = flow.find_instances(requirements, limit=10)
    
    # Display results
    for inst in instances:
        print(f"Instance: {inst.instance_type}")
        print(f"  Provider: {inst.provider}")
        print(f"  Region: {inst.region}")
        print(f"  GPUs: {inst.gpu_count}x {inst.gpu_type}")
        print(f"  GPU Memory: {inst.gpu_memory_gb}GB")
        print(f"  Price: ${inst.price_per_hour}/hour")
        print(f"  Available: {inst.available}")
        print()

In [None]:
# Advanced catalog queries
with Flow() as flow:
    # Query 1: Find instances for large model training
    large_model_requirements = {
        "instance_type": "gpu.nvidia.h100",  # Specific GPU type
        "min_gpu_count": 8,
        "region": "us-west-2",
        "max_price": 100.0
    }
    
    print("Large model training instances:")
    instances = flow.find_instances(large_model_requirements, limit=5)
    for inst in instances:
        print(f"  {inst.instance_type}: {inst.gpu_count} GPUs @ ${inst.price_per_hour}/hr")
    
    # Query 2: Find CPU instances for data processing
    cpu_requirements = {
        "min_cpu_count": 32,
        "min_memory_gb": 128,
        "max_price": 5.0,
        "gpu_count": 0  # No GPUs needed
    }
    
    print("\nHigh-CPU instances for data processing:")
    instances = flow.find_instances(cpu_requirements, limit=5)
    for inst in instances:
        print(f"  {inst.instance_type}: {inst.cpu_count} CPUs, {inst.memory_gb}GB RAM @ ${inst.price_per_hour}/hr")

In [None]:
# Smart instance selection based on workload
def select_optimal_instance(workload_type, budget_per_hour):
    """Select optimal instance for workload type."""
    workload_configs = {
        "inference": {
            "min_gpu_count": 1,
            "gpu_memory_gb": 16,
            "preferred_gpu": "t4"
        },
        "training": {
            "min_gpu_count": 1,
            "gpu_memory_gb": 40,
            "preferred_gpu": "a100"
        },
        "large_training": {
            "min_gpu_count": 8,
            "gpu_memory_gb": 80,
            "preferred_gpu": "h100"
        },
        "data_processing": {
            "min_cpu_count": 32,
            "min_memory_gb": 128,
            "gpu_count": 0
        }
    }
    
    requirements = workload_configs.get(workload_type, {})
    requirements["max_price"] = budget_per_hour
    
    with Flow() as flow:
        instances = flow.find_instances(requirements, limit=3)
        if instances:
            # Return best price/performance
            return instances[0]
        return None

# Example usage
for workload in ["inference", "training", "data_processing"]:
    instance = select_optimal_instance(workload, budget_per_hour=20.0)
    if instance:
        print(f"{workload.title()}: {instance.instance_type} @ ${instance.price_per_hour}/hr")
    else:
        print(f"{workload.title()}: No instances found within budget")

## 2. Storage Management - Persistent Volumes

Flow SDK provides comprehensive storage management for persistent data across tasks.

In [None]:
# Create and manage volumes
with Flow() as flow:
    # Create a new volume
    volume = flow.create_volume(
        name="ml-datasets",
        size_gb=100
    )
    
    print(f"Created volume: {volume.id}")
    print(f"  Name: {volume.name}")
    print(f"  Size: {volume.size_gb}GB")
    print(f"  Status: {volume.status}")
    
    # List all volumes
    print("\nAll volumes:")
    volumes = flow.list_volumes()
    for vol in volumes:
        print(f"  {vol.name} ({vol.id}): {vol.size_gb}GB - {vol.status}")

In [None]:
# Task with existing volume
config = TaskConfig(
    name="use-existing-volume",
    instance_type="gpu.nvidia.t4",
    volumes=[
        {
            "volume_id": "vol-123456",  # Attach existing volume
            "mount_path": "/data"
        },
        {
            "name": "scratch",  # Create new volume
            "size_gb": 50,
            "mount_path": "/scratch"
        }
    ],
    command="""
        echo "=== Volume Information ==="
        df -h | grep -E '(Filesystem|/data|/scratch)'
        
        echo "\n=== Existing data ==="
        ls -la /data/
        
        echo "\n=== Processing data ==="
        # Process data from persistent volume
        python process.py --input /data/dataset --output /scratch/results
    """
)

print("Task configuration with volumes:")
print(f"  Existing volume: {config.volumes[0]['volume_id']}")
print(f"  New volume: {config.volumes[1]['name']} ({config.volumes[1]['size_gb']}GB)")

In [None]:
# Data upload/download workflow
upload_download_example = """
# Upload local data to volume
with Flow() as flow:
    # Upload directory
    flow.upload_directory(
        volume_id="vol-123456",
        local_path="./datasets/imagenet",
        remote_path="/datasets/imagenet"
    )
    
    # Upload single file
    flow.upload_file(
        volume_id="vol-123456",
        local_path="./models/checkpoint.pt",
        remote_path="/models/checkpoint.pt"
    )

# Download results after task completion
with Flow() as flow:
    # Download directory
    flow.download_directory(
        volume_id="vol-123456",
        remote_path="/results",
        local_path="./results"
    )
"""

print("Data Transfer Example:")
print(upload_download_example)

## 3. Multi-Node Computing - Distributed Tasks

Flow SDK supports multi-node tasks for distributed computing workloads.

In [None]:
# Multi-node configuration
multi_node_config = TaskConfig(
    name="distributed-training",
    instance_type="gpu.nvidia.a100",
    instance_count=4,  # Launch 4 nodes
    environment={
        # Must be configured per node:
        "FLOW_NODE_RANK": "0",  # Set differently for each node
        "FLOW_NODE_COUNT": "4",
        "FLOW_NODE_0_IP": "SET_ME",  # IP of rank 0 node
        "MASTER_PORT": "29500"
    },
    command="""
        echo "=== Node Information ==="
        echo "Node Rank: ${FLOW_NODE_RANK}"
        echo "Total Nodes: ${FLOW_NODE_COUNT}"
        echo "Master Node: ${FLOW_NODE_0_IP}"
        echo ""
        
        # Set up distributed training environment
        export MASTER_ADDR=${FLOW_NODE_0_IP}
        export WORLD_SIZE=${FLOW_NODE_COUNT}
        export RANK=${FLOW_NODE_RANK}
        
        # Run distributed training
        python -m torch.distributed.launch \
            --nproc_per_node=1 \
            --nnodes=${WORLD_SIZE} \
            --node_rank=${RANK} \
            --master_addr=${MASTER_ADDR} \
            --master_port=${MASTER_PORT} \
            train_distributed.py
    """
)

print("Multi-node configuration:")
print(f"  Nodes: {multi_node_config.instance_count}")
print(f"  Instance type: {multi_node_config.instance_type}")
print(f"  Total GPUs: {multi_node_config.instance_count * 1}")
print("\nNOTE: Environment variables must be set manually per node")

In [None]:
# Distributed data processing example
data_parallel_config = TaskConfig(
    name="parallel-data-processing",
    instance_type="cpu.large",
    instance_count=10,  # 10 parallel workers
    volumes=[{
        "volume_id": "shared-data-vol",
        "mount_path": "/data"
    }],
    command="""
        # Each node processes a subset of data
        TOTAL_FILES=1000
        FILES_PER_NODE=$((TOTAL_FILES / FLOW_NODE_COUNT))
        START=$((FLOW_NODE_RANK * FILES_PER_NODE))
        END=$((START + FILES_PER_NODE))
        
        echo "Node $FLOW_NODE_RANK processing files $START to $END"
        
        # Process assigned files
        for i in $(seq $START $END); do
            python process_file.py --input /data/file_$i.json --output /data/processed/
        done
        
        echo "Node $FLOW_NODE_RANK completed"
    """
)

print("Data parallel processing:")
print(f"  Workers: {data_parallel_config.instance_count}")
print(f"  Files per worker: 100")
print(f"  Total throughput: {data_parallel_config.instance_count * 100} files")

## 4. Port Forwarding - Interactive Services

Enable port forwarding to run interactive services like Jupyter, TensorBoard, or custom web applications.

In [None]:
# Interactive service with port forwarding
interactive_config = TaskConfig(
    name="ml-workspace",
    instance_type="gpu.nvidia.a10g",
    ports=[
        8888,  # Jupyter
        6006,  # TensorBoard
        8501,  # Streamlit
        5000   # Flask API
    ],
    command="""
        # Install required packages
        pip install jupyter tensorboard streamlit flask
        
        # Start services
        echo "Starting ML workspace services..."
        
        # Start Jupyter (background)
        jupyter notebook \
            --ip=0.0.0.0 \
            --port=8888 \
            --no-browser \
            --allow-root \
            --NotebookApp.token='flow-demo-token' &
        
        # Start TensorBoard (background)
        tensorboard --logdir=/logs --port=6006 --bind_all &
        
        # Start Streamlit app (background)
        streamlit run app.py --server.port=8501 --server.address=0.0.0.0 &
        
        # Keep services running
        echo "All services started. Press Ctrl+C to stop."
        sleep infinity
    """
)

print("Interactive ML Workspace:")
print(f"  Ports: {interactive_config.ports}")
print("\nAfter task starts, access services at:")
print("  Jupyter: http://<instance-ip>:8888 (token: flow-demo-token)")
print("  TensorBoard: http://<instance-ip>:6006")
print("  Streamlit: http://<instance-ip>:8501")
print("  API: http://<instance-ip>:5000")

In [None]:
# SSH tunnel setup for secure access
tunnel_setup = """
# After task starts, create SSH tunnel for secure access

# Get task info
task_info = task.get_info()
instance_ip = task_info['instance_ip']

# Create SSH tunnel command
tunnel_cmd = f'''
ssh -L 8888:localhost:8888 \
    -L 6006:localhost:6006 \
    -L 8501:localhost:8501 \
    -L 5000:localhost:5000 \
    ubuntu@{instance_ip}
'''

print(f"SSH Tunnel Command:")
print(tunnel_cmd)
print("\nThen access services locally:")
print("  Jupyter: http://localhost:8888")
print("  TensorBoard: http://localhost:6006")
print("  Streamlit: http://localhost:8501")
print("  API: http://localhost:5000")
"""

print("SSH Tunnel Setup:")
print(tunnel_setup)

## 5. Custom Startup Scripts - Advanced Initialization

Create sophisticated startup scripts for complex environments.

In [None]:
# Advanced startup script with error handling
startup_script = '''
#!/bin/bash
set -euo pipefail  # Exit on error, undefined vars, pipe failures

# Logging setup
LOG_FILE="/var/log/flow-task.log"
exec 1> >(tee -a "$LOG_FILE")
exec 2>&1

echo "[$(date)] Starting Flow task initialization"

# Function for retrying commands
retry() {
    local n=1
    local max=5
    local delay=5
    while true; do
        "$@" && break || {
            if [[ $n -lt $max ]]; then
                ((n++))
                echo "[$(date)] Command failed. Attempt $n/$max:"
                sleep $delay;
            else
                echo "[$(date)] Command failed after $n attempts."
                return 1
            fi
        }
    done
}

# System setup
echo "[$(date)] Configuring system..."
sudo sysctl -w vm.overcommit_memory=1
sudo sysctl -w net.core.somaxconn=1024

# Install dependencies with retry
echo "[$(date)] Installing dependencies..."
retry sudo apt-get update
retry sudo apt-get install -y htop nvtop iotop git-lfs

# Setup Python environment
echo "[$(date)] Setting up Python environment..."
python -m venv /opt/venv
source /opt/venv/bin/activate
retry pip install --upgrade pip setuptools wheel
retry pip install -r /data/requirements.txt

# Configure CUDA
if command -v nvidia-smi &> /dev/null; then
    echo "[$(date)] Configuring CUDA..."
    export CUDA_DEVICE_ORDER=PCI_BUS_ID
    export CUDA_VISIBLE_DEVICES=0,1,2,3
    nvidia-smi --persistence-mode=1
fi

# Mount cloud storage
if [ -n "${S3_BUCKET:-}" ]; then
    echo "[$(date)] Mounting S3 bucket..."
    retry s3fs $S3_BUCKET /mnt/s3 -o allow_other
fi

# Start monitoring
echo "[$(date)] Starting monitoring services..."
nohup nvidia-smi dmon -s pucvmet -d 30 > /var/log/gpu-monitor.log &
nohup iostat -x 30 > /var/log/io-monitor.log &

# Signal readiness
echo "[$(date)] Initialization complete"
touch /tmp/flow-ready

# Run main application
echo "[$(date)] Starting main application..."
cd /workspace
exec python main.py
'''

# Create task with advanced startup
advanced_config = TaskConfig(
    name="advanced-startup",
    instance_type="gpu.nvidia.a100",
    startup_script=startup_script,
    command="echo 'Main application running'"
)

print("Advanced startup script features:")
print("  - Error handling and retries")
print("  - Comprehensive logging")
print("  - System configuration")
print("  - Environment setup")
print("  - Monitoring services")
print("  - Cloud storage mounting")

## 6. Self-Terminating Tasks - Cost Optimization

Implement tasks that monitor themselves and terminate when appropriate to optimize costs.

In [None]:
# Self-terminating training task
self_terminating_config = TaskConfig(
    name="auto-stop-training",
    instance_type="gpu.nvidia.a100",
    max_price_per_hour=50.0,
    command="""
        python << 'EOF'
import time
import subprocess
import os
from datetime import datetime, timedelta

# Training parameters
MAX_RUNTIME = timedelta(hours=4)
IDLE_TIMEOUT = timedelta(minutes=15)
CONVERGENCE_THRESHOLD = 0.001

start_time = datetime.now()
last_improvement = datetime.now()
best_loss = float('inf')

def should_terminate():
    """Check if task should terminate."""
    # Check max runtime
    if datetime.now() - start_time > MAX_RUNTIME:
        print("Max runtime reached")
        return True
    
    # Check idle timeout
    if datetime.now() - last_improvement > IDLE_TIMEOUT:
        print("No improvement for 15 minutes")
        return True
    
    return False

# Simulated training loop
for epoch in range(1000):
    # Simulate training
    loss = 1.0 / (epoch + 1)  # Decreasing loss
    
    print(f"Epoch {epoch}: loss={loss:.4f}")
    
    # Check for improvement
    if loss < best_loss - CONVERGENCE_THRESHOLD:
        best_loss = loss
        last_improvement = datetime.now()
    
    # Check termination conditions
    if should_terminate():
        print(f"Terminating: final loss={best_loss:.4f}")
        # Cancel own task
        task_name = os.environ.get('FLOW_TASK_NAME')
        if task_name:
            subprocess.run(['flow', 'cancel', task_name])
        break
    
    # Check convergence
    if loss < 0.01:
        print("Model converged!")
        break
    
    time.sleep(2)  # Simulate work

print("Training completed")
EOF
    """
)

print("Self-terminating task features:")
print("  - Maximum runtime limit (4 hours)")
print("  - Idle timeout (15 minutes without improvement)")
print("  - Convergence detection")
print("  - Automatic task cancellation")

In [None]:
# Cost-aware task scheduling
cost_aware_script = '''
import os
import time
from datetime import datetime
import requests

# Cost monitoring
BUDGET_LIMIT = 100.0  # Maximum spend in dollars
PRICE_PER_HOUR = float(os.environ.get('FLOW_INSTANCE_PRICE', '10.0'))
start_time = datetime.now()

def get_current_cost():
    """Calculate current cost."""
    runtime_hours = (datetime.now() - start_time).total_seconds() / 3600
    return runtime_hours * PRICE_PER_HOUR

def check_spot_termination():
    """Check if spot instance is being terminated."""
    try:
        # AWS spot instance termination notice
        response = requests.get(
            'http://169.254.169.254/latest/meta-data/spot/termination-time',
            timeout=1
        )
        if response.status_code == 200:
            print(f"Spot termination notice: {response.text}")
            return True
    except:
        pass
    return False

# Main loop with cost monitoring
while True:
    current_cost = get_current_cost()
    print(f"Current cost: ${current_cost:.2f}")
    
    # Check budget
    if current_cost >= BUDGET_LIMIT:
        print(f"Budget limit reached: ${BUDGET_LIMIT}")
        break
    
    # Check spot termination
    if check_spot_termination():
        print("Saving checkpoint before termination...")
        # Save checkpoint logic here
        break
    
    # Do work
    # ...
    
    time.sleep(60)
'''

print("Cost-aware scheduling features:")
print("  - Budget limit enforcement")
print("  - Real-time cost tracking")
print("  - Spot instance termination handling")
print("  - Checkpoint saving on termination")

## 7. Monitoring and Observability

Implement comprehensive monitoring for production workloads.

In [None]:
# Task with comprehensive monitoring
monitoring_config = TaskConfig(
    name="monitored-training",
    instance_type="gpu.nvidia.a100",
    environment={
        "WANDB_API_KEY": "your-wandb-key"
    },
    command="""
        # Install monitoring tools
        pip install wandb prometheus-client psutil gpustat
        
        # Start metrics collection
        python << 'MONITORING_EOF'
import os
import time
import psutil
import gpustat
import wandb
from prometheus_client import start_http_server, Gauge
import threading

# Initialize monitoring
if os.environ.get('WANDB_API_KEY'):
    wandb.init(project="flow-monitoring")

# Prometheus metrics
gpu_utilization = Gauge('gpu_utilization_percent', 'GPU utilization')
gpu_memory_used = Gauge('gpu_memory_used_mb', 'GPU memory used')
cpu_usage = Gauge('cpu_usage_percent', 'CPU usage')
memory_usage = Gauge('memory_usage_percent', 'Memory usage')

# Start Prometheus server
start_http_server(8000)

def collect_metrics():
    """Collect system metrics."""
    while True:
        # CPU and memory
        cpu_pct = psutil.cpu_percent(interval=1)
        mem_pct = psutil.virtual_memory().percent
        
        # GPU stats
        gpu_stats = gpustat.GPUStatCollection.new_query()
        if gpu_stats.gpus:
            gpu = gpu_stats.gpus[0]
            gpu_util = gpu.utilization
            gpu_mem = gpu.memory_used
        else:
            gpu_util = 0
            gpu_mem = 0
        
        # Update Prometheus metrics
        cpu_usage.set(cpu_pct)
        memory_usage.set(mem_pct)
        gpu_utilization.set(gpu_util)
        gpu_memory_used.set(gpu_mem)
        
        # Log to W&B
        if wandb.run:
            wandb.log({
                "cpu_usage": cpu_pct,
                "memory_usage": mem_pct,
                "gpu_utilization": gpu_util,
                "gpu_memory_mb": gpu_mem
            })
        
        # Print summary
        print(f"CPU: {cpu_pct:.1f}%, Mem: {mem_pct:.1f}%, "
              f"GPU: {gpu_util}%, GPU Mem: {gpu_mem}MB")
        
        time.sleep(10)

# Start monitoring in background
monitor_thread = threading.Thread(target=collect_metrics, daemon=True)
monitor_thread.start()

# Run main task
print("Starting main task with monitoring...")
# Your main task code here
time.sleep(300)  # Simulate work

MONITORING_EOF
    """
)

print("Monitoring features:")
print("  - Real-time metrics collection")
print("  - Prometheus metrics endpoint (port 8000)")
print("  - Weights & Biases integration")
print("  - GPU and system metrics")

In [None]:
# Log aggregation and analysis
log_aggregation_example = """
# Structured logging setup
import logging
import json
from datetime import datetime

# Configure structured logging
class JSONFormatter(logging.Formatter):
    def format(self, record):
        log_obj = {
            'timestamp': datetime.utcnow().isoformat(),
            'level': record.levelname,
            'message': record.getMessage(),
            'module': record.module,
            'task_id': os.environ.get('FLOW_TASK_ID'),
            'node_rank': os.environ.get('FLOW_NODE_RANK', '0')
        }
        if hasattr(record, 'metrics'):
            log_obj['metrics'] = record.metrics
        return json.dumps(log_obj)

# Setup logger
logger = logging.getLogger('flow_task')
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logger.addHandler(handler)
logger.setLevel(logging.INFO)

# Log with metrics
logger.info('Training started', extra={'metrics': {'epoch': 0, 'loss': 1.0}})
"""

print("Structured Logging Example:")
print(log_aggregation_example)

## Summary and Best Practices

### Advanced Features Summary

1. **Instance Catalog**
   - Query by requirements (GPU, price, region)
   - Smart instance selection
   - Real-time availability

2. **Storage Management**
   - Persistent volumes across tasks
   - Data upload/download
   - Shared storage for multi-node

3. **Multi-Node Computing**
   - Distributed training support
   - Environment variables for coordination
   - Parallel data processing

4. **Interactive Services**
   - Port forwarding for web services
   - SSH tunnel for secure access
   - Long-running services

5. **Cost Optimization**
   - Self-terminating tasks
   - Budget enforcement
   - Spot instance handling

### Best Practices

- **Always set price limits** to avoid unexpected costs
- **Use persistent volumes** for data that needs to survive tasks
- **Implement monitoring** for production workloads
- **Handle failures gracefully** with retries and checkpoints
- **Optimize instance selection** based on workload requirements

### Next Steps

- **Notebook 5**: Real-World Examples - Complete ML workflows
- Explore the Flow SDK documentation for more details
- Join the community for tips and best practices