# Metaflow Fundamentals - Week 1 Workshop

Welcome to the core of our MLOps journey! In this notebook, we'll explore Metaflow - the foundation that makes machine learning workflows reproducible, scalable, and production-ready.

## What is Metaflow?

Metaflow solves three critical problems in ML development:
1. **Versioning**: Your code, data, and results
2. **Scalability**: From laptop to cloud seamlessly  
3. **Reproducibility**: Anyone can rerun your exact experiment

Let's see this in action!

## 1. Your First Metaflow Workflow

In [None]:
# Import Metaflow components
from metaflow import FlowSpec, step
import pandas as pd
import numpy as np

class WorkshopIntroFlow(FlowSpec):
    """
    Our first Metaflow workflow - demonstrates core concepts
    """
    
    @step
    def start(self):
        """
        Initialize our workflow with sample data
        """
        print("🚀 Starting our first Metaflow workflow!")
        
        # Create sample data
        np.random.seed(42)
        self.sample_data = {
            'values': np.random.normal(100, 15, 1000),
            'categories': np.random.choice(['A', 'B', 'C'], 1000),
            'timestamps': pd.date_range('2024-01-01', periods=1000)
        }
        
        print(f"✅ Generated {len(self.sample_data['values'])} data points")
        self.next(self.process_data)
    
    @step  
    def process_data(self):
        """
        Process our data and calculate statistics
        """
        print("🔧 Processing data...")
        
        # Convert to DataFrame
        df = pd.DataFrame(self.sample_data)
        
        # Calculate statistics
        self.statistics = {
            'mean': df['values'].mean(),
            'std': df['values'].std(),
            'count_by_category': df['categories'].value_counts().to_dict()
        }
        
        print(f"📊 Statistics calculated:")
        print(f"   Mean: {self.statistics['mean']:.2f}")
        print(f"   Std: {self.statistics['std']:.2f}")
        
        self.next(self.end)
    
    @step
    def end(self):
        """
        Finalize workflow
        """
        print("🎉 Workflow completed successfully!")
        print(f"📋 Final statistics: {self.statistics}")

# Note: We define the flow here, but will run it from command line
print("✅ WorkshopIntroFlow defined successfully!")
print("💡 To run this flow, save it as a .py file and use: python flow_file.py run")

## 2. Understanding Metaflow Structure

Every Metaflow workflow has:
- **FlowSpec**: The main class that defines your workflow
- **@step**: Decorators that mark individual processing steps
- **self.next()**: Defines the flow between steps
- **Artifacts**: Data that persists between steps (self.variable_name)

In [None]:
# Let's explore the flow structure in more detail
from metaflow import FlowSpec, step, Parameter

class DetailedFlow(FlowSpec):
    """
    A more detailed flow showing Metaflow features
    """
    
    # Parameters allow customization when running
    sample_size = Parameter('sample_size', 
                           help='Number of samples to generate',
                           default=100)
    
    @step
    def start(self):
        """
        Generate data with configurable size
        """
        print(f"🎯 Generating {self.sample_size} samples")
        
        # Generate data using the parameter
        np.random.seed(42)
        self.data = np.random.normal(0, 1, self.sample_size)
        
        # Store metadata about our data
        self.metadata = {
            'created_at': pd.Timestamp.now(),
            'sample_size': len(self.data),
            'data_type': 'normal_distribution'
        }
        
        print(f"✅ Data generated: shape {self.data.shape}")
        self.next(self.analyze)
    
    @step
    def analyze(self):
        """
        Analyze the generated data
        """
        print("📊 Analyzing data...")
        
        # Calculate comprehensive statistics
        self.analysis = {
            'mean': float(np.mean(self.data)),
            'std': float(np.std(self.data)),
            'min': float(np.min(self.data)),
            'max': float(np.max(self.data)),
            'percentiles': {
                '25th': float(np.percentile(self.data, 25)),
                '50th': float(np.percentile(self.data, 50)),
                '75th': float(np.percentile(self.data, 75))
            }
        }
        
        print(f"   Mean: {self.analysis['mean']:.3f}")
        print(f"   Std: {self.analysis['std']:.3f}")
        print(f"   Range: [{self.analysis['min']:.3f}, {self.analysis['max']:.3f}]")
        
        self.next(self.end)
    
    @step
    def end(self):
        """
        Summarize results
        """
        print("🎉 Analysis complete!")
        
        # Create final summary
        self.summary = {
            'workflow': 'DetailedFlow',
            'parameters': {'sample_size': self.sample_size},
            'metadata': self.metadata,
            'results': self.analysis
        }
        
        print(f"📋 Summary created with {len(self.summary)} sections")

print("✅ DetailedFlow defined successfully!")
print("💡 This flow demonstrates parameters and comprehensive data tracking")

## 3. Accessing Flow Results

One of Metaflow's superpowers is that you can access results from any previous run programmatically!

In [None]:
# This cell demonstrates how to access flow results
# Note: This will only work after you've actually run a flow

from metaflow import Flow

# Function to safely demonstrate flow access
def demonstrate_flow_access():
    try:
        # Try to access a flow (this will fail if no flows have been run)
        # flow = Flow('WorkshopIntroFlow')
        # latest_run = flow.latest_run
        
        print("📋 How to access flow results:")
        print("")
        print("# Get a specific flow")
        print("flow = Flow('WorkshopIntroFlow')")
        print("")
        print("# Get the latest run")
        print("latest_run = flow.latest_run")
        print("")
        print("# Check if run was successful")
        print("if latest_run.successful:")
        print("    print('Run completed successfully!')")
        print("")
        print("# Access artifacts from specific steps")
        print("end_step = latest_run['end']")
        print("statistics = end_step.task.data.statistics")
        print("")
        print("# Access all artifacts")
        print("for artifact_name in end_step.task.data:")
        print("    print(f'Artifact: {artifact_name}')")
        
        print("\n✅ Flow access patterns demonstrated!")
        print("💡 Run a flow first, then use these patterns to access results")
        
    except Exception as e:
        print(f"ℹ️  No flows run yet: {e}")
        print("This is expected - run a flow first to see real results!")

demonstrate_flow_access()

## 4. Metaflow Best Practices

Let's look at a production-ready flow that follows best practices:

In [None]:
from metaflow import FlowSpec, step, Parameter, catch
import pandas as pd
import numpy as np
from datetime import datetime

class ProductionReadyFlow(FlowSpec):
    """
    A production-ready flow demonstrating best practices:
    - Comprehensive error handling
    - Detailed logging
    - Parameter validation
    - Artifact organization
    """
    
    # Well-documented parameters with validation
    data_size = Parameter('data_size',
                         help='Number of data points to generate (10-10000)',
                         default=1000,
                         type=int)
    
    noise_level = Parameter('noise_level',
                           help='Noise level for data generation (0.1-2.0)',
                           default=0.5,
                           type=float)
    
    @step
    def start(self):
        """
        Initialize workflow with parameter validation and logging
        """
        print("🚀 Starting ProductionReadyFlow")
        print("=" * 40)
        print(f"Parameters:")
        print(f"  data_size: {self.data_size}")
        print(f"  noise_level: {self.noise_level}")
        
        # Parameter validation
        if not (10 <= self.data_size <= 10000):
            raise ValueError(f"data_size must be between 10 and 10000, got {self.data_size}")
        
        if not (0.1 <= self.noise_level <= 2.0):
            raise ValueError(f"noise_level must be between 0.1 and 2.0, got {self.noise_level}")
        
        # Store run metadata
        self.run_metadata = {
            'start_time': datetime.now().isoformat(),
            'parameters': {
                'data_size': self.data_size,
                'noise_level': self.noise_level
            },
            'version': '1.0.0',
            'description': 'Production-ready data processing workflow'
        }
        
        print("✅ Parameters validated and metadata stored")
        self.next(self.generate_data)
    
    @catch(var='generation_error')
    @step
    def generate_data(self):
        """
        Generate synthetic data with error handling
        """
        print("\n🔧 Generating synthetic data...")
        
        try:
            # Set seed for reproducibility
            np.random.seed(42)
            
            # Generate base signal
            x = np.linspace(0, 4*np.pi, self.data_size)
            signal = np.sin(x) + 0.5*np.cos(2*x)
            
            # Add noise
            noise = np.random.normal(0, self.noise_level, self.data_size)
            self.data = signal + noise
            
            # Create additional features
            self.features = {
                'x': x,
                'signal': signal,
                'noise': noise,
                'final_data': self.data
            }
            
            # Store generation info
            self.generation_info = {
                'data_points_generated': len(self.data),
                'signal_range': [float(signal.min()), float(signal.max())],
                'noise_std': float(noise.std()),
                'final_data_range': [float(self.data.min()), float(self.data.max())]
            }
            
            print(f"   ✅ Generated {len(self.data)} data points")
            print(f"   📊 Data range: [{self.data.min():.3f}, {self.data.max():.3f}]")
            print(f"   🔊 Noise std: {noise.std():.3f}")
            
            self.generation_error = None  # No error occurred
            
        except Exception as e:
            print(f"   ❌ Data generation failed: {e}")
            self.generation_error = str(e)
            # Set default empty data
            self.data = np.array([])
            self.features = {}
            self.generation_info = {'error': str(e)}
        
        self.next(self.analyze_data)
    
    @step
    def analyze_data(self):
        """
        Comprehensive data analysis
        """
        print("\n📊 Analyzing data...")
        
        if self.generation_error:
            print(f"   ⚠️ Skipping analysis due to generation error: {self.generation_error}")
            self.analysis_results = {'error': 'No data to analyze'}
        else:
            # Comprehensive statistical analysis
            self.analysis_results = {
                'basic_stats': {
                    'mean': float(np.mean(self.data)),
                    'std': float(np.std(self.data)),
                    'median': float(np.median(self.data)),
                    'min': float(np.min(self.data)),
                    'max': float(np.max(self.data))
                },
                'distribution_stats': {
                    'skewness': float(self._calculate_skewness(self.data)),
                    'kurtosis': float(self._calculate_kurtosis(self.data))
                },
                'percentiles': {
                    '10th': float(np.percentile(self.data, 10)),
                    '25th': float(np.percentile(self.data, 25)),
                    '75th': float(np.percentile(self.data, 75)),
                    '90th': float(np.percentile(self.data, 90))
                }
            }
            
            print(f"   📈 Mean: {self.analysis_results['basic_stats']['mean']:.3f}")
            print(f"   📊 Std: {self.analysis_results['basic_stats']['std']:.3f}")
            print(f"   📏 Range: [{self.analysis_results['basic_stats']['min']:.3f}, {self.analysis_results['basic_stats']['max']:.3f}]")
        
        self.next(self.end)
    
    def _calculate_skewness(self, data):
        """Calculate skewness of data"""
        mean = np.mean(data)
        std = np.std(data)
        return np.mean(((data - mean) / std) ** 3)
    
    def _calculate_kurtosis(self, data):
        """Calculate kurtosis of data"""
        mean = np.mean(data)
        std = np.std(data)
        return np.mean(((data - mean) / std) ** 4) - 3
    
    @step
    def end(self):
        """
        Finalize workflow with comprehensive summary
        """
        print("\n🎉 ProductionReadyFlow completed!")
        print("=" * 40)
        
        # Create comprehensive final report
        self.final_report = {
            'workflow_info': {
                'name': 'ProductionReadyFlow',
                'version': self.run_metadata['version'],
                'completion_time': datetime.now().isoformat()
            },
            'execution_summary': {
                'parameters_used': self.run_metadata['parameters'],
                'data_generation_successful': self.generation_error is None,
                'analysis_completed': 'error' not in self.analysis_results
            },
            'results': self.analysis_results if hasattr(self, 'analysis_results') else {},
            'metadata': self.run_metadata
        }
        
        print("📋 Final Report Summary:")
        print(f"   ✅ Workflow: {self.final_report['workflow_info']['name']}")
        print(f"   📊 Data generation: {'✅ Success' if self.final_report['execution_summary']['data_generation_successful'] else '❌ Failed'}")
        print(f"   🔍 Analysis: {'✅ Complete' if self.final_report['execution_summary']['analysis_completed'] else '❌ Failed'}")
        
        if self.generation_error is None:
            print(f"   📈 Data points processed: {len(self.data)}")
            print(f"   🎯 Final data mean: {self.analysis_results['basic_stats']['mean']:.3f}")
        
        print("\n💾 All artifacts saved automatically by Metaflow!")

print("✅ ProductionReadyFlow defined successfully!")
print("🏆 This flow demonstrates production-ready patterns:")
print("   - Parameter validation")
print("   - Error handling with @catch")
print("   - Comprehensive logging")
print("   - Detailed artifact organization")
print("   - Professional reporting")

## 5. Running Flows and Next Steps

Now you understand Metaflow fundamentals! Here's how to run these flows and what comes next.

In [None]:
print("🎓 Metaflow Fundamentals Complete!")
print("=" * 40)

print("\n📝 What You've Learned:")
print("   ✅ Metaflow FlowSpec structure")
print("   ✅ Step definitions and flow control")
print("   ✅ Artifact management")
print("   ✅ Parameter handling")
print("   ✅ Error handling with @catch")
print("   ✅ Production-ready patterns")

print("\n🚀 Next Steps:")
print("   1. Save these flows as .py files")
print("   2. Run them with: python flow_name.py run")
print("   3. Explore results with: python flow_name.py show")
print("   4. Access artifacts programmatically")

print("\n💡 Tips for Success:")
print("   - Always use descriptive step names")
print("   - Store intermediate results as artifacts")
print("   - Add comprehensive docstrings")
print("   - Use parameters for configurable values")
print("   - Handle errors gracefully")

print("\n🎯 Coming Up Next:")
print("   - Data exploration with pandas")
print("   - Visualization techniques")
print("   - Complete ML pipeline with Metaflow")

print("\n🏆 You're now ready to build production ML workflows!")