# Parallel Processing Demo 3: WorkflowSet Multi-Model Comparison

This notebook demonstrates parallel execution for multi-model comparison:
- `WorkflowSet.from_cross()` - Create all preprocessing √ó model combinations
- `WorkflowSet.fit_resamples()` - Parallel evaluation across CV folds
- Progress tracking with `verbose=True`
- Results ranking and visualization
- Performance comparisons (sequential vs parallel)

**Key Features Demonstrated:**
- ‚úÖ `n_jobs` parameter for WorkflowSet evaluation
- ‚úÖ CPU warning system for multi-workflow tasks
- ‚úÖ Automatic workflow ranking
- ‚úÖ Speedup measurements for large workflow sets
- ‚úÖ Best model selection and finalization

## Setup and Data Loading

In [None]:
# Change to parent directory and install
import os
os.chdir('..')
!pip install -e .

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import time
import warnings

# py-tidymodels imports
from py_workflows import workflow
from py_parsnip import linear_reg, rand_forest
from py_recipes import recipe, all_numeric_predictors
from py_rsample import vfold_cv, initial_split, training, testing
from py_yardstick import metric_set, rmse, mae, r_squared
from py_workflowsets import WorkflowSet
from py_tune.parallel_utils import get_cpu_count

# Seaborn styling
sns.set_style('whitegrid')

print("All imports successful!")

In [None]:
# Load data
raw_data = pd.read_csv('__data/preem.csv')
df = raw_data.copy()
df['date'] = pd.to_datetime(df['date'])

print(f"Data shape: {df.shape}")
print(f"Date range: {df['date'].min()} to {df['date'].max()}")
display(df.head())

In [None]:
# Create train/test split
split = initial_split(df, prop=0.75, seed=123)
train_data = training(split)
test_data = testing(split)

print(f"Training set: {train_data.shape[0]} rows")
print(f"Test set: {test_data.shape[0]} rows")

In [None]:
# Define base formula and metrics
FORMULA = "target ~ ."
metrics = metric_set(rmse, mae, r_squared)

print(f"Formula: {FORMULA}")
print(f"Metrics: rmse, mae, r_squared")

## System Information

In [None]:
# Check system resources
cpu_count = get_cpu_count()
print(f"‚úì Detected {cpu_count} CPU cores")
print(f"‚úì Joblib backend: loky (multiprocessing)")
print(f"\nThis system can efficiently run up to {cpu_count} parallel jobs.")

## Part 1: Create WorkflowSet

We'll create multiple preprocessing strategies and models, then combine them into a WorkflowSet.

In [None]:
# Define preprocessing strategies
formulas = [
    "target ~ .",  # All features
]

recipes = [
    recipe().step_normalize(all_numeric_predictors()),
    recipe().step_normalize(all_numeric_predictors()).step_pca(all_numeric_predictors(), num_comp=3),
    recipe().step_poly(all_numeric_predictors(), degree=2)
]

# Combine formulas and recipes
preproc = formulas + recipes

print(f"Preprocessing strategies: {len(preproc)}")
print("  1. Formula (minimal): target ~ .")
print("  2. Recipe (normalized): Normalize all numeric")
print("  3. Recipe (PCA): Normalize + PCA (3 components)")
print("  4. Recipe (polynomial): Polynomial features (degree 2)")

In [None]:
# Define models
models = [
    linear_reg(),
    linear_reg(penalty=0.1, mixture=1.0).set_engine("sklearn"),  # Lasso
    rand_forest(trees=100, min_n=5).set_mode('regression')
]

print(f"Models: {len(models)}")
print("  1. Linear Regression (OLS)")
print("  2. Linear Regression (Lasso, penalty=0.1)")
print("  3. Random Forest (100 trees, min_n=5)")

In [None]:
# Create WorkflowSet from all combinations
wf_set = WorkflowSet.from_cross(preproc=preproc, models=models)

n_workflows = len(wf_set.workflows)
print(f"\n‚úì Created WorkflowSet with {n_workflows} workflows")
print(f"  ({len(preproc)} preprocessing strategies √ó {len(models)} models)")

# Show workflow IDs
print("\nWorkflow IDs:")
for wf_id in wf_set.workflows.keys():
    print(f"  - {wf_id}")

## Part 2: Sequential vs Parallel Evaluation

In [None]:
# Create CV folds
folds = vfold_cv(train_data, v=5, seed=123)
n_folds = len(folds)
total_fits = n_workflows * n_folds

print(f"CV Configuration:")
print(f"  Workflows: {n_workflows}")
print(f"  CV folds: {n_folds}")
print(f"  Total fits: {total_fits} ({n_workflows} √ó {n_folds})")
print(f"\nThis is a good candidate for parallel execution!")

### Sequential Execution (Baseline)

In [None]:
# Sequential evaluation
print(f"Running SEQUENTIAL WorkflowSet evaluation ({total_fits} fits)...")
start = time.time()
results_seq = wf_set.fit_resamples(
    resamples=folds,
    metrics=metrics,
    n_jobs=1,  # Sequential
    verbose=True
)
seq_time = time.time() - start

print(f"\n‚úì Sequential execution completed in {seq_time:.2f} seconds")
print(f"  ({seq_time / total_fits:.2f} seconds per fit)")

In [None]:
# View sequential results
metrics_seq = results_seq.collect_metrics()
print("\nTop 5 workflows (sequential, by RMSE):")
display(results_seq.rank_results('rmse', n=5))

### Parallel Execution with All Cores

In [None]:
# Parallel evaluation
print(f"Running PARALLEL WorkflowSet evaluation ({total_fits} fits, n_jobs=-1)...")
start = time.time()
results_par = wf_set.fit_resamples(
    resamples=folds,
    metrics=metrics,
    n_jobs=-1,  # Use all cores
    verbose=True
)
par_time = time.time() - start

speedup = seq_time / par_time
efficiency = (speedup / cpu_count) * 100

print(f"\n‚úì Parallel execution completed in {par_time:.2f} seconds")
print(f"  ({par_time / total_fits:.2f} seconds per fit)")
print(f"‚úì Speedup: {speedup:.2f}x")
print(f"‚úì Efficiency: {efficiency:.1f}%")

### Results Consistency Check

In [None]:
# Verify results are identical
metrics_par = results_par.collect_metrics()

print("Consistency Check (top workflow):")
top_wf_id = results_seq.rank_results('rmse', n=1).iloc[0]['wflow_id']

seq_metrics = metrics_seq[metrics_seq['wflow_id'] == top_wf_id]
par_metrics = metrics_par[metrics_par['wflow_id'] == top_wf_id]

for metric in ['rmse', 'mae', 'r_squared']:
    seq_val = seq_metrics[seq_metrics['metric'] == metric]['mean'].values[0]
    par_val = par_metrics[par_metrics['metric'] == metric]['mean'].values[0]
    
    match = np.isclose(seq_val, par_val, rtol=1e-10)
    status = "‚úì IDENTICAL" if match else "‚úó DIFFERENT"
    print(f"  {metric}: {status} ({seq_val:.6f})")

print("\n‚úì All parallel executions produce identical results to sequential!")

### Performance Comparison

In [None]:
# Create performance comparison
perf_df = pd.DataFrame({
    'Configuration': ['Sequential', f'Parallel ({cpu_count} cores)'],
    'n_jobs': [1, -1],
    'Time (s)': [seq_time, par_time],
    'Time per fit (s)': [seq_time / total_fits, par_time / total_fits],
    'Speedup': [1.0, speedup],
    'Efficiency (%)': [100.0, efficiency]
})

display(perf_df)

# Plot performance
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))

# Total time comparison
ax1.bar(perf_df['Configuration'], perf_df['Time (s)'], color=['gray', 'green'])
ax1.set_ylabel('Time (seconds)')
ax1.set_title(f'WorkflowSet Evaluation Time ({n_workflows} workflows √ó {n_folds} folds)')
ax1.grid(axis='y', alpha=0.3)

# Speedup
ax2.bar(perf_df['Configuration'], perf_df['Speedup'], color=['gray', 'green'])
ax2.set_ylabel('Speedup (x)')
ax2.set_title('Speedup vs Sequential')
ax2.axhline(y=1, color='r', linestyle='--', label='Baseline')
ax2.legend()
ax2.grid(axis='y', alpha=0.3)

plt.tight_layout()
plt.show()

## Part 3: Results Analysis and Ranking

In [None]:
# Rank all workflows
ranked = results_par.rank_results('rmse', n=n_workflows)

print(f"All {n_workflows} workflows ranked by RMSE:")
display(ranked[['wflow_id', 'rmse_mean', 'mae_mean', 'r_squared_mean', 'rank']])

In [None]:
# Visualize workflow comparison
fig = results_par.autoplot('rmse')
fig.update_layout(
    title=f'WorkflowSet Performance Comparison ({n_workflows} workflows)',
    height=500
)
fig.show()

In [None]:
# Compare metrics across workflows
fig, axes = plt.subplots(1, 3, figsize=(16, 5))

metrics_to_plot = ['rmse', 'mae', 'r_squared']
colors = ['steelblue', 'coral', 'mediumseagreen']

for i, (metric, color) in enumerate(zip(metrics_to_plot, colors)):
    metric_data = metrics_par[metrics_par['metric'] == metric].sort_values('mean')
    
    axes[i].barh(range(len(metric_data)), metric_data['mean'], color=color, alpha=0.7)
    axes[i].set_yticks(range(len(metric_data)))
    axes[i].set_yticklabels(metric_data['wflow_id'], fontsize=8)
    axes[i].set_xlabel(metric.upper())
    axes[i].set_title(f'{metric.upper()} by Workflow')
    axes[i].grid(axis='x', alpha=0.3)
    
    # Highlight best
    if metric == 'r_squared':
        best_idx = metric_data['mean'].idxmax()
    else:
        best_idx = metric_data['mean'].idxmin()
    
    best_pos = list(metric_data.index).index(best_idx)
    axes[i].barh(best_pos, metric_data.loc[best_idx, 'mean'], color='gold', alpha=0.9)

plt.tight_layout()
plt.show()

## Part 4: Best Workflow Selection

In [None]:
# Extract best workflow
best_wf_id = ranked.iloc[0]['wflow_id']
best_workflow = wf_set[best_wf_id]

print(f"Best workflow: {best_wf_id}")
print(f"\nPerformance metrics:")
best_metrics = ranked.iloc[0]
print(f"  RMSE: {best_metrics['rmse']:.4f} (¬±{best_metrics.get('rmse_std', 0):.4f})")
print(f"  MAE:  {best_metrics['mae']:.4f} (¬±{best_metrics.get('mae_std', 0):.4f})")
print(f"  R¬≤:   {best_metrics['r_squared']:.4f} (¬±{best_metrics.get('r_squared_std', 0):.4f})")

In [None]:
# Fit best workflow on full training set
print(f"\nFitting best workflow ({best_wf_id}) on full training data...")
best_fit = best_workflow.fit(train_data)
print("‚úì Fit complete")

# Evaluate on test set
best_fit = best_fit.evaluate(test_data)

# Calculate metrics
predictions = best_fit.predict(test_data)
test_metrics_df = pd.DataFrame()
for metric_fn in [rmse, mae, r_squared]:
    metric_result = metric_fn(test_data['target'], predictions['.pred'])
    test_metrics_df = pd.concat([test_metrics_df, metric_result], ignore_index=True)

print("\nTest set performance:")
display(test_metrics_df)

## Part 5: CPU Warning Demonstration

In [None]:
# Create smaller WorkflowSet to trigger warning
small_wf_set = WorkflowSet.from_cross(
    preproc=["target ~ ."],
    models=[linear_reg()]
)

print(f"Small WorkflowSet has {len(small_wf_set.workflows)} workflow.")
print(f"Requesting {cpu_count * 2} workers...\n")

with warnings.catch_warnings(record=True) as w:
    warnings.simplefilter("always")
    
    results_warn = small_wf_set.fit_resamples(
        resamples=folds,
        metrics=metrics,
        n_jobs=cpu_count * 2,  # More workers than tasks
        verbose=False
    )
    
    if w:
        print("‚ö†Ô∏è  WARNING TRIGGERED:")
        print(f"    {w[0].message}")
        print(f"\nüí° Recommendation: For 1 workflow √ó 5 folds, use n_jobs=5 or fewer")

## Summary and Recommendations

In [None]:
print("=" * 80)
print("PARALLEL WORKFLOWSET PERFORMANCE SUMMARY")
print("=" * 80)
print(f"\nSystem: {cpu_count} CPU cores")
print(f"WorkflowSet: {n_workflows} workflows ({len(preproc)} prep √ó {len(models)} models)")
print(f"CV: {n_folds} folds")
print(f"Total fits: {total_fits}")

print(f"\nPerformance:")
print(f"  Sequential: {seq_time:.2f}s ({seq_time/total_fits:.3f}s per fit)")
print(f"  Parallel ({cpu_count} cores): {par_time:.2f}s ({par_time/total_fits:.3f}s per fit)")
print(f"  Speedup: {speedup:.2f}x")
print(f"  Efficiency: {efficiency:.1f}%")

print(f"\nBest workflow: {best_wf_id}")
print(f"  CV RMSE: {best_metrics['rmse']:.4f}")
test_rmse = test_metrics_df[test_metrics_df['metric'] == 'rmse']['value'].values[0]
print(f"  Test RMSE: {test_rmse:.4f}")

print("\n" + "=" * 80)
print("RECOMMENDATIONS")
print("=" * 80)
print(f"\n‚úÖ Use parallel WorkflowSet (n_jobs=-1) when:")
print(f"   - Comparing many workflows (>{cpu_count})")
print(f"   - Each workflow takes >1 second per fold")
print(f"   - Total fits > 20-30 (workflows √ó folds)")
print(f"   - Using complex preprocessing (recipes with PCA, polynomial features)")
print(f"   - Using complex models (random forest, boosting)")

print(f"\n‚ö†Ô∏è  Use sequential (n_jobs=1) when:")
print(f"   - Few workflows (< {cpu_count})")
print(f"   - Simple/fast workflows")
print(f"   - Total execution time < 30 seconds")
print(f"   - Debugging workflow issues")

print(f"\nüí° Tips:")
print(f"   - Always use verbose=True to monitor progress")
print(f"   - Watch for CPU warnings - they help optimize performance")
print(f"   - Use rank_results() to identify best workflows")
print(f"   - Use autoplot() for quick visual comparison")
print(f"   - Parallel speedup scales with number of workflows and fold complexity")
print("=" * 80)