# Coffea-Casa Processor-Based Workflow Test

This notebook demonstrates the UnifiedProcessor workflow with coffea.processor.Runner on Coffea-Casa, including skimming, analysis, histogramming, and statistics steps.

## Workflow Overview

1. Setup Python path for intccms package
2. Install dependencies and register modules for cloud pickle
3. Acquire Dask client from Coffea-Casa environment
4. Configure analysis parameters
5. Run metadata extraction
6. Initialize UnifiedProcessor
7. Run processor with coffea.processor.Runner
8. Save histograms
9. Run statistical analysis (if enabled)

In [1]:
# Setup Python path to include intccms package
import sys
import time
from pathlib import Path

# Add src directory to Python path
repo_root = Path.cwd()
src_dir = repo_root / "src"
examples_dir = repo_root
if str(src_dir) not in sys.path:
    sys.path.insert(0, str(src_dir))
if str(examples_dir) not in sys.path:
    sys.path.insert(0, str(examples_dir))
print(f"‚úÖ Added {src_dir} to Python path")
print(f"‚úÖ Added {examples_dir} to Python path")

‚úÖ Added /home/cms-jovyan/intccms-agc-demo-10/src to Python path
‚úÖ Added /home/cms-jovyan/intccms-agc-demo-10 to Python path


In [None]:
COFFEA_VERSION = "2025.10.3.dev17+g2cde65fb6" # 2025.10.2
COFFEA_PIP = "coffea==2025.11.0" #"git+https://github.com/scikit-hep/coffea@master"
try:
    import omegaconf
except ImportError:
    print("‚ö†Ô∏è omegaconf not found, installing...")
    ! pip install omegaconf

Coffea version:  2025.11.0
‚ö†Ô∏è coffea not found or incorrect version, installing...
‚úÖ All dependencies are installed.


In [3]:
# Imports and cloudpickle registration
import copy
import os

os.environ['AWS_ACCESS_KEY_ID'] = ""
os.environ['AWS_SECRET_ACCESS_KEY'] = ""

from dask.distributed import Client, PipInstall
from coffea.processor import DaskExecutor, IterativeExecutor
from coffea.nanoevents import NanoAODSchema

import cloudpickle
import intccms
import example_cms

# Register modules for cloud pickle
cloudpickle.register_pickle_by_value(intccms)
cloudpickle.register_pickle_by_value(example_cms)

from example_cms.configs.configuration import config as original_config
from intccms.schema import Config, load_config_with_restricted_cli
from intccms.utils.output import OutputDirectoryManager
from intccms.metadata_extractor import DatasetMetadataManager
from intccms.datasets import DatasetManager
from intccms.analysis import run_processor_workflow

## Acquire Dask Client

Coffea-Casa provides a shared scheduler. Connect to it and register dependencies.

In [None]:
from dask.distributed import WorkerPlugin
from contextlib import contextmanager

class RedirectStderrToStdout(WorkerPlugin):
    def setup(self, worker):
        # crude but effective: route stderr to stdout
        sys.stderr = sys.stdout

@contextmanager
def acquire_client(af="gateway"):
    """Context manager to acquire and safely close a Dask client from a Coffea-Casa environment."""
    dependencies = [COFFEA_PIP]
    client = None
    cluster = None
    try:
        if af == "condor":
            client = Client("tls://localhost:8786")
            client.register_plugin(PipInstall(packages=dependencies))
            cluster = None

        elif af == "gateway":
            def set_env(dask_worker):
                config_path = str(Path(dask_worker.local_directory) / 'access_token')
                os.environ["BEARER_TOKEN_FILE"] = config_path
                os.chmod(config_path, 0o600)
                os.chmod("/etc/grid-security/certificates", 0o755)

            num_workers = 350   #number of workers desired
            from dask_gateway import Gateway
            gateway = Gateway()
            clusters = gateway.list_clusters()
            cluster = gateway.connect(clusters[0].name)
            client = cluster.get_client()
            cluster.scale(num_workers)
            client.wait_for_workers(num_workers)
            client.upload_file("/etc/cmsaf-secrets-chown/access_token")
            client.register_worker_callbacks(setup=set_env)
            client.register_plugin(PipInstall(packages=dependencies))
            client.register_plugin(RedirectStderrToStdout(), name="redirect-stderr")

        print(f"‚úÖ Connected to Dask scheduler")
        print(f"üìä Dashboard: {client.dashboard_link}")

        yield client, cluster
    finally:
        if client is not None:
            client.close()
            print("‚úÖ Client closed")

## Configuration Setup

Configure analysis parameters including which processes to run and output settings.

In [None]:
# Configuration setup
config = copy.deepcopy(original_config)

# Limit files for testing
config["datasets"]["max_files"] = None

# Use local output directory
config["general"]["output_dir"] = "example_cms/outputs/"

# Configuration flags
config["general"]["read_from_cache"] = False
config["general"]["run_metadata_generation"] = False
config["general"]["run_processor"] = True  # Set to False to skip processor and load saved histograms
config["general"]["save_skimmed_output"] = False  # Set to True to save filtered events to disk
config["general"]["run_analysis"] = True
config["general"]["run_histogramming"] = True
config["general"]["run_systematics"] = True
config["general"]["run_statistics"] = True

# Test only signal dataset
#config["general"]["processes"] = ["data"]

cli_args = []
full_config = load_config_with_restricted_cli(config, cli_args)
validated_config = Config(**full_config)

‚úÖ Configuration loaded with max_files=None
   - run_processor: True
   - save_skimmed_output: False
   - run_analysis: True
   - run_histogramming: True
   - run_systematics: True
   - run_statistics: True


## Run Complete Workflow

Execute the full processor workflow with proper cleanup in a try/finally block.

In [None]:
# Set up output manager
output_manager = OutputDirectoryManager(
    root_output_dir=validated_config.general.output_dir,
    cache_dir=validated_config.general.cache_dir,
    metadata_dir=validated_config.general.metadata_dir,
    skimmed_dir=validated_config.general.skimmed_dir
)
print(f"‚úÖ Output directory: {output_manager.root_output_dir}")

In [None]:
# Set up dataset manager
dataset_manager = DatasetManager(validated_config.datasets)

In [None]:
# Extract metadata using Dask client
with acquire_client(af="gateway") as (client, cluster):
    metadata_generator = DatasetMetadataManager(
        dataset_manager=dataset_manager,
        output_manager=output_manager,
        executor=DaskExecutor(client=client),
        config=validated_config,
    )
    metadata_generator.run(
        generate_metadata=validated_config.general.run_metadata_generation,
        processes_filter=validated_config.general.processes if hasattr(validated_config.general, 'processes') else None
    )

In [None]:
# Build metadata lookup and extract workitems
metadata_lookup = metadata_generator.build_metadata_lookup()
workitems = metadata_generator.workitems
print(f"Generated {len(workitems)} workitems...")

In [None]:
# Run processor workflow with roastcoffea metrics collection
from roastcoffea import MetricsCollector
from intccms.analysis.processor import UnifiedProcessor

with acquire_client(af="gateway") as (client, cluster):
    print("\nüöÄ Running processor workflow...")

    # Create processor instance for MetricsCollector
    unified_processor = UnifiedProcessor(
        config=validated_config,
        output_manager=output_manager,
        metadata_lookup=metadata_lookup,
    )

    # Wrap workflow in MetricsCollector for comprehensive metrics
    with MetricsCollector(
        client=client,
        processor_instance=unified_processor,
        track_workers=True,
        worker_tracking_interval=1.0,
    ) as collector:
        t0 = time.perf_counter()
        output, report = run_processor_workflow(
            config=validated_config,
            output_manager=output_manager,
            metadata_lookup=metadata_lookup,
            workitems=workitems,
            executor=DaskExecutor(client=client, treereduction=8, retries=0),
            schema=NanoAODSchema,
        )
        t1 = time.perf_counter()

        # Extract chunk metrics from output (injected by @track_metrics decorator)
        collector.extract_metrics_from_output(output)

        # Set coffea report for aggregation
        collector.set_coffea_report(report)

    # Get aggregated metrics after context exits
    metrics = collector.get_metrics()
    tracking_data = collector.tracking_data
    span_metrics = getattr(collector, 'span_metrics', None)

print(f"‚úÖ Processor workflow complete in {t1-t0:.1f} seconds!")

# Print summary
if validated_config.general.run_processor:
    print(f"üìä Total events processed: {output.get('processed_events', 0):,}")
    if 'skimmed_events' in output:
        print(f"‚úÇÔ∏è  Events after skim: {output.get('skimmed_events', 0):,}")

## Performance Metrics (roastcoffea)

This section displays comprehensive performance metrics collected during the workflow execution using roastcoffea.

In [None]:
# Import roastcoffea reporting and visualization
from rich.console import Console
from roastcoffea.export.reporter import (
    format_throughput_table,
    format_event_processing_table,
    format_resources_table,
    format_timing_table,
    format_fine_metrics_table,
)

console = Console()

# Display metrics tables
if metrics:
    print("\n" + "=" * 60)
    print("üìä Processing Metrics")
    print("=" * 60)

    print("\nüìà Throughput Metrics")
    console.print(format_throughput_table(metrics))

    print("\n‚ö° Event Processing Metrics")
    console.print(format_event_processing_table(metrics))

    print("\nüñ•Ô∏è  Resource Utilization")
    console.print(format_resources_table(metrics))

    print("\n‚è±Ô∏è  Timing Breakdown")
    console.print(format_timing_table(metrics))

    # Fine-grained metrics from Dask Spans (CPU/IO breakdown)
    fine_table = format_fine_metrics_table(metrics)
    if fine_table:
        print("\nüî¨ Fine-Grained Metrics (CPU/IO Breakdown)")
        console.print(fine_table)
else:
    print("‚ö†Ô∏è  Metrics collection was not enabled")

### Performance Visualizations

Timeline plots and efficiency summaries from roastcoffea metrics collection.

In [None]:
# Performance Visualizations
import matplotlib.pyplot as plt
from roastcoffea.visualization.plots import (
    plot_worker_count_timeline,
    plot_memory_utilization_timeline,
    plot_occupancy_timeline,
    plot_executing_tasks_timeline,
    plot_total_active_tasks_timeline,
    plot_throughput_timeline,
    plot_per_task_cpu_io,
    plot_per_task_bytes_read,
    plot_efficiency_summary,
    plot_resource_utilization,
)

print("\n" + "=" * 60)
print("üìä Performance Visualizations")
print("=" * 60)

# Data throughput over time (with worker count overlay)
if report and "chunk_info" in report:
    print("\nüìä Data Throughput Over Time")
    try:
        fig, ax = plot_throughput_timeline(report, tracking_data)
        plt.show()
    except Exception as e:
        print(f"‚ö†Ô∏è  Throughput plot unavailable: {e}")

# Worker count timeline
if tracking_data and "worker_counts" in tracking_data:
    print("\nüìä Worker Count Over Time")
    fig, ax = plot_worker_count_timeline(tracking_data)
    plt.show()
else:
    print("‚ö†Ô∏è  Worker count data not available")

# Memory utilization
if tracking_data and "worker_memory" in tracking_data:
    print("\nüìä Memory Utilization")
    try:
        fig, ax = plot_memory_utilization_timeline(tracking_data)
        plt.show()
    except Exception as e:
        print(f"‚ö†Ô∏è  Memory plot unavailable: {e}")

# Worker occupancy (task saturation)
if tracking_data and "worker_occupancy" in tracking_data:
    print("\nüìä Worker Occupancy")
    try:
        fig, ax = plot_occupancy_timeline(tracking_data)
        plt.show()
    except Exception as e:
        print(f"‚ö†Ô∏è  Occupancy plot unavailable: {e}")

# Executing tasks per worker
if tracking_data and "worker_executing" in tracking_data:
    print("\nüìä Executing Tasks Per Worker")
    try:
        fig, ax = plot_executing_tasks_timeline(tracking_data)
        plt.show()
    except Exception as e:
        print(f"‚ö†Ô∏è  Executing tasks plot unavailable: {e}")

# Total active tasks
if tracking_data and "worker_active_tasks" in tracking_data:
    print("\nüìä Total Active Tasks")
    try:
        fig, ax = plot_total_active_tasks_timeline(tracking_data)
        plt.show()
    except Exception as e:
        print(f"‚ö†Ô∏è  Active tasks plot unavailable: {e}")

# Per-task CPU vs I/O (from Dask Spans)
if span_metrics:
    print("\nüìä Per-Task CPU vs I/O Breakdown")
    try:
        fig, ax = plot_per_task_cpu_io(span_metrics)
        plt.show()
    except Exception as e:
        print(f"‚ö†Ô∏è  CPU/IO plot unavailable: {e}")

    print("\nüìä Per-Task Bytes Read")
    try:
        fig, ax = plot_per_task_bytes_read(span_metrics)
        plt.show()
    except Exception as e:
        print(f"‚ö†Ô∏è  Bytes read plot unavailable: {e}")

# Efficiency summary
if metrics:
    print("\nüìä Efficiency Summary")
    try:
        fig, ax = plot_efficiency_summary(metrics)
        plt.show()
    except Exception as e:
        print(f"‚ö†Ô∏è  Efficiency plot unavailable: {e}")

    print("\nüìä Resource Utilization")
    try:
        fig, ax = plot_resource_utilization(metrics)
        plt.show()
    except Exception as e:
        print(f"‚ö†Ô∏è  Resource plot unavailable: {e}")