# Parallel Computing with Dask and Ray

**MIS 769 - Advanced Data Analytics**

This notebook demonstrates how to parallelize Python code using two popular frameworks:
- **Dask**: Flexible parallel computing library for analytics
- **Ray**: General-purpose distributed computing framework

Both tools allow you to scale from a single laptop to a cluster of machines.

---

## Setup

First, let's install the required packages.

In [None]:
# Install required packages
!pip install -q dask[complete] ray[default] bokeh datasets hf_xet

# Install htop for CPU monitoring (run 'htop' in Terminal to watch cores)
!apt update -qq && apt install -y htop -qq

# Enable fast Hugging Face downloads
import os
os.environ["HF_HUB_ENABLE_HF_TRANSFER"] = "1"

print("‚úì All packages installed. Open Terminal and run 'htop' to monitor CPU cores.")

In [None]:
import numpy as np
import pandas as pd
import time
import warnings
warnings.filterwarnings('ignore')

# For timing comparisons
def timer(func):
    """Decorator to time function execution"""
    def wrapper(*args, **kwargs):
        start = time.time()
        result = func(*args, **kwargs)
        end = time.time()
        print(f"  Execution time: {end - start:.3f} seconds")
        return result
    return wrapper

---

# The "30-Second Demo" - Why Parallelization Matters

**Before we dive into Dask and Ray, let's see WHY you need these tools.**

We'll create a dataset large enough that Pandas takes ~30 seconds. Then we'll see how Dask handles the same operation.

**Pro tip:** Open your system's Activity Monitor (Mac), Task Manager (Windows), or `htop` (Linux) before running these cells. Watch what happens to your CPU cores!

In [None]:
# Load NYC Taxi dataset - using Hugging Face
from datasets import load_dataset
import pandas as pd

print("Loading NYC Taxi dataset from Hugging Face...")
print("This is a large dataset - may take a few minutes...\n")

# Load first 20 million rows for the demo
dataset = load_dataset(
    "JosephFeig/NYC-Taxi", 
    split="train[:20000000]"
)

taxi_df = dataset.to_pandas()

print(f"\nDataset shape: {taxi_df.shape}")
print(f"Memory usage: {taxi_df.memory_usage(deep=True).sum() / 1e9:.2f} GB")
print(f"\nColumns: {list(taxi_df.columns)}")
taxi_df.head()

In [None]:
# ============================================================
# CONFIGURATION - Adjust these to see different behaviors
# ============================================================
N_CHUNKS = 32           # More chunks = longer sequential time
ROLLING_ITERATIONS = 20  # More iterations = heavier computation per chunk

# ============================================================
# PANDAS - Sequential Processing
# ============================================================
# Process data in chunks sequentially (simulating real workload)

print("=" * 60)
print("PANDAS/SEQUENTIAL - One chunk at a time")
print("=" * 60)
print(f"\nProcessing {len(taxi_df):,} rows in {N_CHUNKS} chunks sequentially...")
print(f"Rolling iterations per chunk: {ROLLING_ITERATIONS}\n")

import numpy as np

def heavy_computation(df_chunk):
    """Simulate heavy per-row computation"""
    result = {
        'total_fares': df_chunk['fare_amount'].sum(),
        'avg_fare': df_chunk['fare_amount'].mean(),
        'std_fare': df_chunk['fare_amount'].std(),
        'avg_distance': np.sqrt(
            (df_chunk['dropoff_latitude'] - df_chunk['pickup_latitude'])**2 +
            (df_chunk['dropoff_longitude'] - df_chunk['pickup_longitude'])**2
        ).mean(),
        'fare_per_passenger': (df_chunk['fare_amount'] / df_chunk['passenger_count'].clip(1)).mean(),
        'row_count': len(df_chunk)
    }
    # Heavy computation - rolling windows are CPU intensive
    for _ in range(ROLLING_ITERATIONS):
        _ = df_chunk['fare_amount'].rolling(100, min_periods=1).mean().sum()
        _ = df_chunk['fare_amount'].rolling(50, min_periods=1).std().sum()
    return result

# Split into chunks and process sequentially
chunks = np.array_split(taxi_df, N_CHUNKS)

start_time = time.time()

sequential_results = []
for i, chunk in enumerate(chunks):
    result = heavy_computation(chunk)
    sequential_results.append(result)
    if (i + 1) % 8 == 0:  # Print every 8 chunks
        print(f"  Chunks 1-{i+1} of {N_CHUNKS} done...")

pandas_time = time.time() - start_time
print(f"\n‚úì Sequential execution time: {pandas_time:.2f} seconds")

# Aggregate results
total_fares = sum(r['total_fares'] for r in sequential_results)
print(f"  Total fares processed: ${total_fares:,.0f}")

In [None]:
# ============================================================
# DASK - Parallel Processing
# ============================================================
# Process ALL chunks at the same time!

from dask import delayed, compute

print("=" * 60)
print("DASK - All chunks in PARALLEL")
print("=" * 60)
print(f"\nProcessing {len(taxi_df):,} rows in {N_CHUNKS} chunks in parallel...\n")

# Same function, wrapped with delayed for parallel execution
@delayed
def heavy_computation_delayed(df_chunk):
    """Same computation, but will run in parallel"""
    result = {
        'total_fares': df_chunk['fare_amount'].sum(),
        'avg_fare': df_chunk['fare_amount'].mean(),
        'std_fare': df_chunk['fare_amount'].std(),
        'avg_distance': np.sqrt(
            (df_chunk['dropoff_latitude'] - df_chunk['pickup_latitude'])**2 +
            (df_chunk['dropoff_longitude'] - df_chunk['pickup_longitude'])**2
        ).mean(),
        'fare_per_passenger': (df_chunk['fare_amount'] / df_chunk['passenger_count'].clip(1)).mean(),
        'row_count': len(df_chunk)
    }
    # Same heavy computation
    for _ in range(ROLLING_ITERATIONS):
        _ = df_chunk['fare_amount'].rolling(100, min_periods=1).mean().sum()
        _ = df_chunk['fare_amount'].rolling(50, min_periods=1).std().sum()
    return result

start_time = time.time()

# Launch ALL chunks in parallel (this is the key difference!)
parallel_tasks = [heavy_computation_delayed(chunk) for chunk in chunks]

# Execute all at once
dask_results = compute(*parallel_tasks)

dask_time = time.time() - start_time

print(f"‚úì All {N_CHUNKS} chunks completed!")
print(f"\n‚úì Dask parallel execution time: {dask_time:.2f} seconds")

# Aggregate results
total_fares = sum(r['total_fares'] for r in dask_results)
print(f"  Total fares processed: ${total_fares:,.0f}")

In [None]:
# ============================================================
# RAY - Parallel Processing (Alternative to Dask)
# ============================================================
# Same task, but using Ray's @ray.remote decorator

import ray
ray.init(ignore_reinit_error=True)

print("=" * 60)
print("RAY - All chunks in PARALLEL")
print("=" * 60)
print(f"\nProcessing {len(taxi_df):,} rows in {N_CHUNKS} chunks in parallel...\n")

# Same function, but with Ray's remote decorator
@ray.remote
def heavy_computation_ray(df_chunk, n_iterations):
    """Same computation, but runs as a Ray task"""
    import numpy as np
    result = {
        'total_fares': df_chunk['fare_amount'].sum(),
        'avg_fare': df_chunk['fare_amount'].mean(),
        'std_fare': df_chunk['fare_amount'].std(),
        'avg_distance': np.sqrt(
            (df_chunk['dropoff_latitude'] - df_chunk['pickup_latitude'])**2 +
            (df_chunk['dropoff_longitude'] - df_chunk['pickup_longitude'])**2
        ).mean(),
        'fare_per_passenger': (df_chunk['fare_amount'] / df_chunk['passenger_count'].clip(1)).mean(),
        'row_count': len(df_chunk)
    }
    # Same heavy computation
    for _ in range(n_iterations):
        _ = df_chunk['fare_amount'].rolling(100, min_periods=1).mean().sum()
        _ = df_chunk['fare_amount'].rolling(50, min_periods=1).std().sum()
    return result

start_time = time.time()

# Launch ALL chunks in parallel with Ray
futures = [heavy_computation_ray.remote(chunk, ROLLING_ITERATIONS) for chunk in chunks]

# Wait for all results
ray_results = ray.get(futures)

ray_time = time.time() - start_time

print(f"‚úì All {N_CHUNKS} chunks completed!")
print(f"\n‚úì Ray parallel execution time: {ray_time:.2f} seconds")

# Aggregate results
total_fares = sum(r['total_fares'] for r in ray_results)
print(f"  Total fares processed: ${total_fares:,.0f}")

In [None]:
# ============================================================
# RESULTS COMPARISON - Sequential vs Dask vs Ray
# ============================================================

import os
n_cores = os.cpu_count() or 8

dask_speedup = pandas_time / dask_time
ray_speedup = pandas_time / ray_time
best_time = min(dask_time, ray_time)
best_method = "Dask" if dask_time < ray_time else "Ray"
best_speedup = pandas_time / best_time

print()
print("‚îå" + "‚îÄ" * 58 + "‚îê")
print("‚îÇ" + "  RESULTS: Sequential vs Parallel".center(58) + "‚îÇ")
print("‚îú" + "‚îÄ" * 58 + "‚î§")
print(f"‚îÇ  Config: {N_CHUNKS} chunks, {ROLLING_ITERATIONS} iterations, {n_cores} cores".ljust(59) + "‚îÇ")
print("‚îú" + "‚îÄ" * 58 + "‚î§")
print(f"‚îÇ  Sequential (for loop):   {pandas_time:>8.2f}s    (baseline)".ljust(59) + "‚îÇ")
print(f"‚îÇ  Dask (@delayed):         {dask_time:>8.2f}s    {dask_speedup:>5.1f}x faster".ljust(59) + "‚îÇ")
print(f"‚îÇ  Ray (@ray.remote):       {ray_time:>8.2f}s    {ray_speedup:>5.1f}x faster".ljust(59) + "‚îÇ")
print("‚îú" + "‚îÄ" * 58 + "‚î§")
print(f"‚îÇ  üèÜ Winner: {best_method} at {best_speedup:.1f}x speedup!".ljust(59) + "‚îÇ")
print("‚îî" + "‚îÄ" * 58 + "‚îò")

# Quick visual
bar_width = 40
seq_bar = bar_width
dask_bar = int(bar_width * dask_time / pandas_time)
ray_bar = int(bar_width * ray_time / pandas_time)

print(f"\n  Sequential: {'‚ñà' * seq_bar} {pandas_time:.1f}s")
print(f"  Dask:       {'‚ñà' * dask_bar}{' ' * (seq_bar - dask_bar)} {dask_time:.1f}s")
print(f"  Ray:        {'‚ñà' * ray_bar}{' ' * (seq_bar - ray_bar)} {ray_time:.1f}s")

print("\n  ‚Üì Scroll down for detailed Dask & Ray tutorials ‚Üì")
print("  ‚Üì Or jump to the FINAL SUMMARY at the end ‚Üì")

---

# Part 1: Dask

Dask is a flexible parallel computing library that integrates seamlessly with the Python ecosystem. It provides:

- **Dask Arrays**: Parallel NumPy arrays
- **Dask DataFrames**: Parallel Pandas DataFrames
- **Dask Delayed**: Parallelize custom Python functions
- **Dask Bag**: Parallel lists for semi-structured data

Key concept: Dask builds a **task graph** of operations and executes them in parallel.

In [None]:
import dask
import dask.array as da
import dask.dataframe as dd
from dask import delayed
from dask.distributed import Client

print(f"Dask version: {dask.__version__}")

## 1.1 Dask Distributed Client

The Dask distributed scheduler provides a dashboard for monitoring tasks.

In [None]:
# Start a local Dask cluster
client = Client(n_workers=4, threads_per_worker=2, memory_limit='2GB')
print(client)
print(f"\nDashboard link: {client.dashboard_link}")

## 1.2 Dask Arrays (Parallel NumPy)

Dask arrays work like NumPy arrays but are split into chunks that can be processed in parallel.

In [None]:
# Create a large array with NumPy (standard approach)
print("NumPy (sequential):")

@timer
def numpy_computation():
    x = np.random.random((20000, 20000))
    result = (x + x.T).mean()
    return result

numpy_result = numpy_computation()
print(f"  Result: {numpy_result:.6f}")

In [None]:
# Same computation with Dask (parallel)
print("Dask Array (parallel):")

@timer
def dask_computation():
    # Create a Dask array with chunks of 5000x5000
    x = da.random.random((20000, 20000), chunks=(5000, 5000))
    result = (x + x.T).mean()
    # .compute() triggers actual execution
    return result.compute()

dask_result = dask_computation()
print(f"  Result: {dask_result:.6f}")

In [None]:
# Visualize the task graph (for a smaller example)
x = da.random.random((1000, 1000), chunks=(500, 500))
y = (x + x.T).mean()

print("Task graph for: (x + x.T).mean()")
print(f"Number of tasks: {len(y.__dask_graph__())}")

# Uncomment to visualize (requires graphviz)
# y.visualize(filename='dask_graph.png')

## 1.3 Dask DataFrames (Parallel Pandas)

Dask DataFrames are partitioned Pandas DataFrames that can be larger than memory.

In [None]:
# Create a sample dataset
n_rows = 5_000_000

print(f"Creating dataset with {n_rows:,} rows...")
df = pd.DataFrame({
    'id': np.arange(n_rows),
    'category': np.random.choice(['A', 'B', 'C', 'D'], n_rows),
    'value1': np.random.randn(n_rows) * 100,
    'value2': np.random.randn(n_rows) * 50,
    'timestamp': pd.date_range('2020-01-01', periods=n_rows, freq='s')
})

print(f"DataFrame shape: {df.shape}")
print(f"Memory usage: {df.memory_usage(deep=True).sum() / 1e6:.1f} MB")
df.head()

In [None]:
# Pandas groupby (sequential)
print("Pandas GroupBy (sequential):")

@timer
def pandas_groupby():
    return df.groupby('category').agg({
        'value1': ['mean', 'std', 'min', 'max'],
        'value2': ['mean', 'std', 'min', 'max']
    })

pandas_result = pandas_groupby()
print(pandas_result)

In [None]:
# Convert to Dask DataFrame
ddf = dd.from_pandas(df, npartitions=8)
print(f"Dask DataFrame: {ddf.npartitions} partitions")

In [None]:
# Dask groupby (parallel)
print("Dask GroupBy (parallel):")

@timer
def dask_groupby():
    return ddf.groupby('category').agg({
        'value1': ['mean', 'std', 'min', 'max'],
        'value2': ['mean', 'std', 'min', 'max']
    }).compute()

dask_result = dask_groupby()
print(dask_result)

## 1.4 Dask Delayed (Custom Parallelization)

`dask.delayed` lets you parallelize any Python function by building a task graph.

In [None]:
# Simulate a slow computation
def slow_square(x):
    """Simulates a slow computation"""
    time.sleep(1)  # Simulate work
    return x ** 2

def slow_sum(values):
    """Simulates aggregation"""
    time.sleep(0.5)
    return sum(values)

In [None]:
# Sequential execution
print("Sequential execution:")

@timer
def sequential_computation():
    results = []
    for i in range(8):
        results.append(slow_square(i))
    return slow_sum(results)

seq_result = sequential_computation()
print(f"  Result: {seq_result}")

In [None]:
# Parallel execution with dask.delayed
print("Parallel execution with Dask Delayed:")

@timer
def parallel_computation():
    # Wrap functions with delayed
    delayed_square = delayed(slow_square)
    delayed_sum = delayed(slow_sum)
    
    # Build task graph (no computation yet)
    results = []
    for i in range(8):
        results.append(delayed_square(i))
    
    total = delayed_sum(results)
    
    # Execute in parallel
    return total.compute()

par_result = parallel_computation()
print(f"  Result: {par_result}")

In [None]:
# Close Dask client
client.close()

---

# Part 2: Ray

Ray is a general-purpose distributed computing framework that makes it easy to scale Python applications. Key features:

- **Ray Core**: Remote functions and actors
- **Ray Data**: Scalable data processing
- **Ray Train**: Distributed ML training
- **Ray Serve**: Model serving

Key concept: Ray uses **tasks** (stateless) and **actors** (stateful) as building blocks.

In [None]:
import ray

print(f"Ray version: {ray.__version__}")

In [None]:
# Initialize Ray
ray.init(num_cpus=4, ignore_reinit_error=True)
print(f"Ray initialized with {ray.available_resources()}")

## 2.1 Ray Remote Functions (Tasks)

The `@ray.remote` decorator turns a Python function into a distributed task.

In [None]:
# Define a remote function
@ray.remote
def ray_slow_square(x):
    """Remote version of slow_square"""
    time.sleep(1)
    return x ** 2

@ray.remote
def ray_slow_sum(values):
    """Remote version of slow_sum"""
    time.sleep(0.5)
    return sum(values)

In [None]:
# Sequential execution (for comparison)
print("Sequential execution:")

@timer
def sequential_ray():
    results = []
    for i in range(8):
        results.append(slow_square(i))
    return slow_sum(results)

seq_result = sequential_ray()
print(f"  Result: {seq_result}")

In [None]:
# Parallel execution with Ray
print("Parallel execution with Ray:")

@timer
def parallel_ray():
    # Launch tasks (returns futures immediately)
    futures = [ray_slow_square.remote(i) for i in range(8)]
    
    # Get results (blocks until done)
    results = ray.get(futures)
    
    # Sum the results
    total_future = ray_slow_sum.remote(results)
    return ray.get(total_future)

par_result = parallel_ray()
print(f"  Result: {par_result}")

## 2.2 Ray Actors (Stateful Computation)

Actors are stateful workers that can maintain state across method calls.

In [None]:
@ray.remote
class Counter:
    """A simple counter actor"""
    def __init__(self, initial_value=0):
        self.value = initial_value
    
    def increment(self, amount=1):
        self.value += amount
        return self.value
    
    def get_value(self):
        return self.value

In [None]:
# Create multiple counter actors
counters = [Counter.remote(i * 10) for i in range(4)]

# Increment each counter in parallel
futures = [c.increment.remote(5) for c in counters]
results = ray.get(futures)
print(f"After increment: {results}")

# Get final values
final_futures = [c.get_value.remote() for c in counters]
final_values = ray.get(final_futures)
print(f"Final values: {final_values}")

## 2.3 Ray for Data Processing

Ray can efficiently parallelize data processing tasks.

In [None]:
@ray.remote
def process_chunk(chunk_data):
    """Process a chunk of data"""
    # Simulate some processing
    result = {
        'count': len(chunk_data),
        'mean': chunk_data['value1'].mean(),
        'std': chunk_data['value1'].std(),
        'sum': chunk_data['value1'].sum()
    }
    return result

In [None]:
# Use the DataFrame we created earlier
print(f"Processing DataFrame with {len(df):,} rows")

# Sequential processing
print("\nSequential processing:")

@timer
def sequential_process():
    chunks = np.array_split(df, 8)
    results = []
    for chunk in chunks:
        result = {
            'count': len(chunk),
            'mean': chunk['value1'].mean(),
            'std': chunk['value1'].std(),
            'sum': chunk['value1'].sum()
        }
        results.append(result)
    return results

seq_results = sequential_process()

In [None]:
# Parallel processing with Ray
print("Parallel processing with Ray:")

@timer
def parallel_process():
    # Split data into chunks
    chunks = np.array_split(df, 8)
    
    # Put chunks in Ray's object store
    chunk_refs = [ray.put(chunk) for chunk in chunks]
    
    # Process chunks in parallel
    futures = [process_chunk.remote(ref) for ref in chunk_refs]
    
    # Collect results
    return ray.get(futures)

par_results = parallel_process()

In [None]:
# Aggregate results
total_count = sum(r['count'] for r in par_results)
weighted_mean = sum(r['mean'] * r['count'] for r in par_results) / total_count
total_sum = sum(r['sum'] for r in par_results)

print(f"\nAggregated Results:")
print(f"  Total rows processed: {total_count:,}")
print(f"  Weighted mean: {weighted_mean:.4f}")
print(f"  Total sum: {total_sum:.2f}")

## 2.4 Ray for Monte Carlo Simulation

A practical example: estimating Pi using Monte Carlo simulation.

In [None]:
def estimate_pi_sequential(n_samples):
    """Estimate Pi using Monte Carlo (sequential)"""
    inside_circle = 0
    for _ in range(n_samples):
        x, y = np.random.random(), np.random.random()
        if x**2 + y**2 <= 1:
            inside_circle += 1
    return 4 * inside_circle / n_samples

@ray.remote
def estimate_pi_chunk(n_samples):
    """Estimate Pi for a chunk of samples (vectorized)"""
    x = np.random.random(n_samples)
    y = np.random.random(n_samples)
    inside = np.sum(x**2 + y**2 <= 1)
    return inside, n_samples

In [None]:
n_total = 10_000_000
n_workers = 8
samples_per_worker = n_total // n_workers

print(f"Estimating Pi with {n_total:,} samples\n")

# Sequential (vectorized for fair comparison)
print("Sequential (vectorized):")

@timer
def sequential_pi():
    x = np.random.random(n_total)
    y = np.random.random(n_total)
    inside = np.sum(x**2 + y**2 <= 1)
    return 4 * inside / n_total

pi_seq = sequential_pi()
print(f"  Pi estimate: {pi_seq:.6f} (error: {abs(pi_seq - np.pi):.6f})")

In [None]:
# Parallel with Ray
print("Parallel with Ray:")

@timer
def parallel_pi():
    futures = [estimate_pi_chunk.remote(samples_per_worker) for _ in range(n_workers)]
    results = ray.get(futures)
    
    total_inside = sum(r[0] for r in results)
    total_samples = sum(r[1] for r in results)
    return 4 * total_inside / total_samples

pi_par = parallel_pi()
print(f"  Pi estimate: {pi_par:.6f} (error: {abs(pi_par - np.pi):.6f})")

---

# Part 3: Comparison and Best Practices

## When to Use Each Framework

| Feature | Dask | Ray |
|---------|------|-----|
| **Best for** | Data analytics, ETL | General distributed computing |
| **DataFrame support** | Native (Dask DataFrame) | Via Modin or pandas |
| **Array support** | Native (Dask Array) | Via NumPy |
| **ML Training** | Limited | Ray Train, Ray Tune |
| **Stateful computation** | Limited | Native (Actors) |
| **Learning curve** | Easy (pandas-like API) | Moderate |
| **Task graphs** | Explicit visualization | Implicit |
| **Integration** | NumPy, Pandas, Scikit-learn | TensorFlow, PyTorch, Hugging Face |

## Best Practices

### Dask
1. **Choose appropriate chunk sizes**: Too small = overhead, too large = memory issues
2. **Use `persist()` for intermediate results** that are reused
3. **Avoid eager computation**: Chain operations before calling `.compute()`
4. **Monitor the dashboard**: Watch for task distribution and memory usage

### Ray
1. **Use `ray.put()` for large objects** passed to multiple tasks
2. **Batch small tasks**: Ray has overhead per task
3. **Use actors for stateful computation**: Counters, caches, models
4. **Specify resource requirements**: `@ray.remote(num_cpus=2)` for accurate scheduling

In [None]:
# Shutdown Ray
ray.shutdown()
print("Ray shutdown complete.")

---

# Summary

In this notebook, we explored two powerful parallelization frameworks:

## Dask
- **Dask Arrays**: Drop-in replacement for NumPy with automatic chunking
- **Dask DataFrames**: Scale pandas workflows to larger-than-memory data
- **Dask Delayed**: Parallelize arbitrary Python functions
- Great for data analytics and ETL pipelines

## Ray
- **Remote Functions**: Turn any function into a distributed task with `@ray.remote`
- **Actors**: Stateful distributed objects for complex workflows
- **Object Store**: Efficient data sharing between tasks
- Great for general-purpose distributed computing and ML

## Key Takeaways
1. Both frameworks can significantly speed up computations
2. Dask is more "pandas-like" and easier for data analysts
3. Ray is more flexible and better for custom distributed applications
4. Both can scale from a laptop to a cluster without code changes

---

## Further Reading
- [Dask Documentation](https://docs.dask.org/)
- [Ray Documentation](https://docs.ray.io/)
- [Dask vs Ray Comparison](https://docs.ray.io/en/latest/ray-more-libs/dask-on-ray.html)

---

# Final Benchmark Summary

**Recall the results from our "30-Second Demo" at the beginning of this notebook.**

Run the cell below to see the final comparison of Sequential vs Dask vs Ray on the NYC Taxi dataset.

In [None]:
# ============================================================
# üèÅ FINAL BENCHMARK RESULTS - Complete Summary
# ============================================================

try:
    import os
    n_cores = os.cpu_count() or 8
    
    # Calculate speedups
    dask_speedup = pandas_time / dask_time
    ray_speedup = pandas_time / ray_time
    best_parallel_time = min(dask_time, ray_time)
    best_speedup = pandas_time / best_parallel_time
    winner = "DASK" if dask_time < ray_time else "RAY"
    
    # Time saved
    time_saved_dask = pandas_time - dask_time
    time_saved_ray = pandas_time - ray_time
    
    print()
    print("‚ïî" + "‚ïê" * 63 + "‚ïó")
    print("‚ïë" + "  üèÅ FINAL BENCHMARK RESULTS - NYC Taxi Dataset".center(63) + "‚ïë")
    print("‚ï†" + "‚ïê" * 63 + "‚ï£")
    print(f"‚ïë  Dataset:        {len(taxi_df):>15,} rows".ljust(64) + "‚ïë")
    print(f"‚ïë  Chunks:         {N_CHUNKS:>15}".ljust(64) + "‚ïë")
    print(f"‚ïë  CPU Cores:      {n_cores:>15}".ljust(64) + "‚ïë")
    print(f"‚ïë  Iterations:     {ROLLING_ITERATIONS:>15} (rolling windows per chunk)".ljust(64) + "‚ïë")
    print("‚ï†" + "‚ïê" * 63 + "‚ï£")
    print("‚ïë" + "  METHOD                    TIME         SPEEDUP".ljust(63) + "‚ïë")
    print("‚ï†" + "‚ïê" * 63 + "‚ï£")
    print(f"‚ïë  Sequential (for loop)   {pandas_time:>8.2f}s        1.0x  (baseline)".ljust(64) + "‚ïë")
    print(f"‚ïë  Dask (@delayed)         {dask_time:>8.2f}s       {dask_speedup:>5.1f}x  ‚ö°".ljust(64) + "‚ïë")
    print(f"‚ïë  Ray (@ray.remote)       {ray_time:>8.2f}s       {ray_speedup:>5.1f}x  ‚ö°".ljust(64) + "‚ïë")
    print("‚ï†" + "‚ïê" * 63 + "‚ï£")
    print(f"‚ïë  üèÜ WINNER: {winner} at {best_speedup:.1f}x faster!".ljust(64) + "‚ïë")
    print("‚ïö" + "‚ïê" * 63 + "‚ïù")
    
    # Visual bar chart
    print("\n  üìä VISUAL COMPARISON:")
    print("  " + "‚îÄ" * 55)
    
    max_time = pandas_time
    bar_width = 45
    
    seq_bar = int(bar_width * pandas_time / max_time)
    dask_bar = int(bar_width * dask_time / max_time)
    ray_bar = int(bar_width * ray_time / max_time)
    
    print(f"  Sequential ‚îÇ{'‚ñà' * seq_bar}‚îÇ {pandas_time:.1f}s")
    print(f"  Dask       ‚îÇ{'‚ñà' * dask_bar}{'‚ñë' * (seq_bar - dask_bar)}‚îÇ {dask_time:.1f}s (-{time_saved_dask:.1f}s)")
    print(f"  Ray        ‚îÇ{'‚ñà' * ray_bar}{'‚ñë' * (seq_bar - ray_bar)}‚îÇ {ray_time:.1f}s (-{time_saved_ray:.1f}s)")
    print("  " + "‚îÄ" * 55)
    
    # What this means
    print("\n  üí° WHAT THIS MEANS:")
    print("  " + "‚îÄ" * 55)
    print(f"  ‚Ä¢ Sequential processed {N_CHUNKS} chunks ONE AT A TIME")
    print(f"  ‚Ä¢ Dask/Ray processed up to {n_cores} chunks SIMULTANEOUSLY")
    print(f"  ‚Ä¢ You saved {max(time_saved_dask, time_saved_ray):.1f} seconds with parallelization!")
    
    if N_CHUNKS > n_cores:
        waves = N_CHUNKS // n_cores
        print(f"  ‚Ä¢ With {N_CHUNKS} chunks and {n_cores} cores, that's ~{waves} waves of parallel work")
    
    print("\n  üéØ KEY TAKEAWAY:")
    print("  " + "‚îÄ" * 55)
    print("  Same data. Same computation. Same results.")
    print(f"  But {best_speedup:.0f}x FASTER with parallel processing!")
    print()
    print("  This is why tools like Dask and Ray matter for Big Data.")
    print()

except NameError as e:
    print("‚ö†Ô∏è  Benchmark variables not found!")
    print("   Please run the '30-Second Demo' cells first (cells 5-9)")
    print(f"   Missing: {e}")