# Tutorial 2: Function Composition and Caching

In this tutorial, we'll explore how to build more complex computational workflows by composing functions together and understanding DaggerML's powerful caching system.

## Prerequisites

Make sure you've completed Tutorial 1 first! We'll be building on those concepts.

In [None]:
# Import required modules
import os
import time

from daggerml import Dml

from dml_util import funkify

# Create a DaggerML instance
dml = Dml(repo="tutorial", branch="main")
os.environ.update({"DML_S3_BUCKET": "does-not-matter", "DML_S3_PREFIX": "does-not-matter"})
print("DaggerML instance ready!")

## Function Composition

One of DaggerML's strengths is the ability to chain functions together, where the output of one function becomes the input to another, regardless of where each function is computed.

In [None]:
# Let's create a data processing pipeline

@funkify
def generate_data(dag):
    """Generate a list of random numbers."""
    import random

    # Get parameters from arguments
    size = dag.argv[1].value()
    min_val = dag.argv[2].value() if len(dag.argv) > 2 else 1
    max_val = dag.argv[3].value() if len(dag.argv) > 3 else 100

    # Generate random data
    random.seed(42)  # For reproducible results
    dag.data = [random.randint(min_val, max_val) for _ in range(size)]
    dag.size = size
    dag.range = (min_val, max_val)

    print(f"Generated {size} numbers between {min_val} and {max_val}")
    return dag.data

@funkify
def filter_data(dag):
    """Filter numbers based on a condition."""
    data = dag.argv[1].value()
    threshold = dag.argv[2].value()

    # Filter the data
    dag.original_count = len(data)
    dag.filtered_data = [x for x in data if x >= threshold]
    dag.filtered_count = len(dag.filtered_data)
    dag.threshold = threshold

    print(f"Filtered {dag.original_count} numbers, kept {dag.filtered_count} >= {threshold}")
    return dag.filtered_data

@funkify
def compute_statistics(dag):
    """Compute comprehensive statistics."""
    data = dag.argv[1].value()

    if not data:
        dag.result = {"error": "No data to process"}
        return dag.result

    # Calculate statistics
    dag.count = len(data)
    dag.sum = sum(data)
    dag.mean = dag.sum.value() / dag.count.value()
    dag.sorted_data = sorted(data)

    # Quartiles
    n = dag.count.value()
    if n % 2 == 1:
        dag.median = dag.sorted_data[n//2]
    else:
        dag.median = (dag.sorted_data[n//2 - 1].value() + dag.sorted_data[n//2].value()) / 2
    dag.q1 = dag.sorted_data[n//4]
    dag.q3 = dag.sorted_data[3*n//4]

    # Standard deviation
    dag.std_dev = (sum((x - dag.mean.value()) ** 2 for x in data) / dag.count.value()) ** 0.5

    dag.result = {
        "count": dag.count,
        "sum": dag.sum,
        "mean": dag.mean,
        "median": dag.median,
        "std_dev": dag.std_dev,
        "q1": dag.q1,
        "q3": dag.q3,
        "min": min(data),
        "max": max(data)
    }

    return dag.result

# Create a new DAG for our pipeline
dag = dml.new("02-function-composition", "Demonstrating function composition and caching")

## Building the Pipeline

Now let's chain these functions together to create a data processing pipeline.

In [None]:
# Add our functions to the DAG
dag.generate_fn = generate_data
dag.filter_fn = filter_data
dag.stats_fn = compute_statistics

# Build the pipeline step by step
print("=== Step 1: Generate Data ===")
dag.raw_data = dag.generate_fn(50, 1, 100)  # 50 numbers between 1-100

print("\n=== Step 2: Filter Data ===")
dag.filtered_data = dag.filter_fn(dag.raw_data, 50)  # Keep numbers >= 50

print("\n=== Step 3: Compute Statistics ===")
dag.final_stats = dag.stats_fn(dag.filtered_data)

print("\n=== Pipeline Results ===")
print(f"Raw data count: {len(dag.raw_data.value())}")
print(f"Filtered data count: {len(dag.filtered_data.value())}")
print(f"Final statistics: {dag.final_stats.value()}")

## Understanding Caching

DaggerML automatically caches function results based on their inputs. This means if you call the same function with the same arguments, it will return the cached result instead of recomputing it.

Let's demonstrate this with timing measurements.

In [None]:
@funkify
def slow_computation(dag):
    """A deliberately slow computation to demonstrate caching."""
    import time

    input_value = dag.argv[1].value()

    print(f"Starting slow computation for input: {input_value}")

    # Simulate a slow computation
    time.sleep(2)  # Wait 2 seconds

    # Do some computation
    dag.result = input_value ** 2 + input_value * 10

    print(f"Finished slow computation, result: {dag.result}")
    return dag.result

# Add the slow function to our DAG
dag.slow_fn = slow_computation

print("=== First call (will be slow) ===")
start_time = time.time()
dag.slow_result1 = dag.slow_fn(5)
first_duration = time.time() - start_time
print(f"First call took: {first_duration:.2f} seconds")
print(f"Result: {dag.slow_result1.value()}")

In [None]:
print("\n=== Second call with same input (should be fast due to caching) ===")
start_time = time.time()
dag.slow_result2 = dag.slow_fn(5)  # Same input!
second_duration = time.time() - start_time
print(f"Second call took: {second_duration:.2f} seconds")
print(f"Result: {dag.slow_result2.value()}")

print(f"\nSpeedup: {first_duration/second_duration:.1f}x faster!")

# Verify the results are the same
print(f"Results are identical: {dag.slow_result1.value() == dag.slow_result2.value()}")

In [None]:
print("\n=== Third call with different input (will be slow again) ===")
start_time = time.time()
dag.slow_result3 = dag.slow_fn(7)  # Different input
third_duration = time.time() - start_time
print(f"Third call took: {third_duration:.2f} seconds")
print(f"Result: {dag.slow_result3.value()}")

## Parallel Processing with Composition

You can also create multiple parallel branches in your DAG that later combine.

In [None]:
from concurrent.futures import ThreadPoolExecutor


@funkify
def process_branch_a(dag):
    """Process data one way."""
    data = dag.argv[1].value()
    dag.result = [x * 2 for x in data]  # Double each number
    return dag.result

@funkify
def process_branch_b(dag):
    """Process data another way."""
    data = dag.argv[1].value()
    dag.result = [x + 10 for x in data]  # Add 10 to each number
    return dag.result

@funkify
def combine_branches(dag):
    """Combine results from two branches."""
    branch_a = dag.argv[1].value()
    branch_b = dag.argv[2].value()

    # Combine by taking alternating elements
    dag.combined = []
    for i in range(max(len(branch_a), len(branch_b))):
        if i < len(branch_a):
            dag.combined.append(branch_a[i])
        if i < len(branch_b):
            dag.combined.append(branch_b[i])

    dag.result = dag.combined
    return dag.result

# Add parallel processing functions
dag.process_a_fn = process_branch_a
dag.process_b_fn = process_branch_b
dag.combine_fn = combine_branches

# Create some test data
dag.test_data = [1, 2, 3, 4, 5]

# Process in parallel branches
with ThreadPoolExecutor(2) as executor:
    future_a = executor.submit(dag.process_a_fn, dag.test_data, name="branch_a_result")
    future_b = executor.submit(dag.process_b_fn, dag.test_data, name="branch_b_result")
    future_a.result()
    future_b.result()

# Combine the results
dag.combined_result = dag.combine_fn(dag.branch_a_result, dag.branch_b_result)

print("=== Parallel Processing Results ===")
print(f"Original data: {dag.test_data.value()}")
print(f"Branch A (×2): {dag.branch_a_result.value()}")
print(f"Branch B (+10): {dag.branch_b_result.value()}")
print(f"Combined: {dag.combined_result.value()}")

## Complex Data Flows

Let's create a more complex example that shows how multiple functions can work together.

In [None]:
@funkify
def create_datasets(dag):
    """Create multiple datasets for processing."""
    import random

    random.seed(42)

    dag.small_dataset = [random.randint(1, 20) for _ in range(10)]
    dag.medium_dataset = [random.randint(1, 100) for _ in range(50)]
    dag.large_dataset = [random.randint(1, 1000) for _ in range(100)]

    dag.result = {
        "small": dag.small_dataset,
        "medium": dag.medium_dataset,
        "large": dag.large_dataset
    }
    return dag.result

@funkify
def analyze_dataset(dag):
    """Analyze a single dataset."""
    name = dag.argv[1].value()
    data = dag.argv[2].value()

    dag.name = name
    dag.size = len(data)
    dag.mean = sum(data) / len(data)
    dag.std = (sum((x - dag.mean.value()) ** 2 for x in data) / len(data)) ** 0.5

    dag.result = {
        "name": name,
        "size": dag.size,
        "mean": dag.mean,
        "std": dag.std,
        "min": min(data),
        "max": max(data)
    }
    return dag.result

# Create the complex workflow
dag.create_datasets_fn = create_datasets
dag.analyze_fn = analyze_dataset

# Generate datasets
dag.datasets = dag.create_datasets_fn()

# Analyze each dataset
dag.small_analysis = dag.analyze_fn("small", dag.datasets["small"])
dag.medium_analysis = dag.analyze_fn("medium", dag.datasets["medium"])
dag.large_analysis = dag.analyze_fn("large", dag.datasets["large"])

# Collect all analyses
dag.all_analyses = [dag.small_analysis, dag.medium_analysis, dag.large_analysis]

print("=== Dataset Analysis Results ===")
for analysis in dag.all_analyses.value():
    print(f"{analysis['name']}: {analysis['size']} items, mean={analysis['mean']:.2f}, std={analysis['std']:.2f}")

## Working with Complex Data Structures

DaggerML handles complex nested data structures gracefully.

In [None]:
# Create a complex nested structure
dag.complex_data = {
    "metadata": {
        "version": "1.0",
        "created_by": "tutorial",
        "datasets": ["small", "medium", "large"]
    },
    "results": {
        "analyses": dag.all_analyses,
        "summary": {
            "total_datasets": 3,
            "total_data_points": sum(analysis["size"] for analysis in dag.all_analyses.value()),
            "average_mean": sum(analysis["mean"] for analysis in dag.all_analyses.value()) / 3
        }
    },
    "raw_data": dag.datasets
}

print("=== Complex Data Structure ===")
print(f"Metadata version: {dag.complex_data['metadata']['version'].value()}")
print(f"Total data points: {dag.complex_data['results']['summary']['total_data_points'].value()}")
print(f"Average mean across datasets: {dag.complex_data['results']['summary']['average_mean'].value():.2f}")

## DAG Inspection and Debugging

Let's look at what our DAG contains after all these operations.

In [None]:
# Inspect the DAG
print("=== DAG Contents ===")
all_keys = sorted(dag.keys())
print(f"Total nodes in DAG: {len(all_keys)}")

# Group keys by type
function_keys = [k for k in all_keys if k.endswith('_fn')]
data_keys = [k for k in all_keys if not k.endswith('_fn') and not k.endswith('_result')]
result_keys = [k for k in all_keys if k.endswith('_result')]

print(f"\nFunctions ({len(function_keys)}):")
for key in function_keys:
    print(f"  - {key}")

print(f"\nData nodes ({len(data_keys)}):")
for key in data_keys[:10]:  # Show first 10
    print(f"  - {key}")
if len(data_keys) > 10:
    print(f"  ... and {len(data_keys) - 10} more")

print(f"\nResult nodes ({len(result_keys)}):")
for key in result_keys:
    print(f"  - {key}")

## Best Practices for Function Composition

Here are some key insights from this tutorial:

### 1. Cache-Aware Design
- Functions with the same inputs will use cached results
- This makes experimentation and iteration very fast
- Design your functions to be pure (same input → same output)

### 2. Granular Functions
- Break complex operations into smaller, reusable functions
- This improves caching effectiveness and debugging

### 3. Clear Data Flow
- Name your intermediate results clearly
- Store important intermediate values in the DAG for debugging

In [None]:
# Create a final summary of our pipeline
dag.pipeline_summary = {
    "stages": [
        "Data Generation",
        "Data Filtering",
        "Statistical Analysis",
        "Parallel Processing",
        "Complex Workflow"
    ],
    "total_functions": len(function_keys),
    "total_nodes": len(all_keys),
    "caching_demonstrated": True,
    "composition_patterns": [
        "Sequential chaining",
        "Parallel branches",
        "Complex nested workflows"
    ]
}
dag.result = dag.pipeline_summary

print("=== Tutorial 2 Complete! ===")
print("You've learned:")
print("✅ Function composition and chaining")
print("✅ How DaggerML's caching system works")
print("✅ Parallel processing patterns")
print("✅ Working with complex data structures")
print("✅ DAG inspection and debugging")
print(f"\nFinal DAG size: {len(all_keys)} nodes")

## What We've Accomplished

In this tutorial, you've mastered:

1. ✅ **Function Composition**: Chaining functions where outputs feed into inputs
2. ✅ **Caching System**: Understanding how DaggerML caches results automatically
3. ✅ **Parallel Processing**: Creating multiple branches that process data independently
4. ✅ **Complex Workflows**: Building sophisticated data processing pipelines
5. ✅ **Data Structure Handling**: Working with nested and complex data types
6. ✅ **DAG Inspection**: Understanding and debugging your computational graphs

## Performance Benefits

The caching system provides several key benefits:
- **Development Speed**: Iterate quickly without recomputing unchanged parts
- **Reproducibility**: Same inputs always produce same outputs
- **Resource Efficiency**: Avoid redundant computations

## Next Tutorial Preview

In Tutorial 3, we'll explore:
- Error handling and recovery in DAGs
- Working with external data sources
- Different execution environments (local, cloud, containers)
- Real-world data processing examples

Great work! You're becoming a DaggerML expert! 🚀