# Pipeline Monitor Dashboard Tutorial

This notebook demonstrates how to use the real-time monitoring dashboard to track pipeline performance.

In [1]:
from pipeline_monitor import monitor, MonitoringBlock, setup_logging
from pipeline_monitor.dashboard import start_dashboard
import threading
import time

# Setup logging
setup_logging(log_file='pipeline.log', json_format=True)

Server initialized for threading.


## Starting the Dashboard

Start the monitoring dashboard in a separate thread.

In [2]:
# Start dashboard in background thread
dashboard_thread = threading.Thread(
    target=start_dashboard,
    kwargs={'port': 5000},
    daemon=True
)
dashboard_thread.start()

print("Dashboard started at http://localhost:5000")
time.sleep(2)  # Wait for dashboard to start

{"timestamp": "2025-02-12 21:50:12,591", "level": "INFO", "message": "Starting dashboard on 0.0.0.0:5000", "logger_name": "pipeline_monitor.dashboard.app"}
Dashboard started at http://localhost:5000


{"timestamp": "2025-02-12 21:50:12,595", "level": "ERROR", "message": "Failed to start dashboard: The Werkzeug web server is not designed to run in production. Pass allow_unsafe_werkzeug=True to the run() method to disable this error.", "logger_name": "pipeline_monitor.dashboard.app"}


Exception in thread Thread-5 (start_dashboard):
Traceback (most recent call last):
  File "/nix/store/clx0mcir7qw8zk36zbr4jra789g3knf6-python3-3.11.10/lib/python3.11/threading.py", line 1045, in _bootstrap_inner
    self.run()
  File "/home/runner/workspace/.pythonlibs/lib/python3.11/site-packages/ipykernel/ipkernel.py", line 766, in run_closure
    _threading_Thread_run(self)
  File "/nix/store/clx0mcir7qw8zk36zbr4jra789g3knf6-python3-3.11.10/lib/python3.11/threading.py", line 982, in run
    self._target(*self._args, **self._kwargs)
  File "/home/runner/workspace/pipeline_monitor/dashboard/app.py", line 71, in start_dashboard
    socketio.run(
  File "/home/runner/workspace/.pythonlibs/lib/python3.11/site-packages/flask_socketio/__init__.py", line 650, in run
    raise RuntimeError('The Werkzeug web server is not '
RuntimeError: The Werkzeug web server is not designed to run in production. Pass allow_unsafe_werkzeug=True to the run() method to disable this error.


## Monitoring Pipeline Performance

Create a sample pipeline with performance monitoring.

In [3]:
@monitor(alert_threshold=1.0)
def process_batch(size):
    """Process a batch of data with simulated work."""
    time.sleep(0.1)  # Simulate processing
    return [i * i for i in range(size)]

# Process multiple batches with monitoring
for i in range(5):
    with MonitoringBlock(f"batch_{i}"):
        data = process_batch(10)  # Explicitly call the function
        print(f"Processed batch {i}: {len(data)} items")
        time.sleep(1)  # Wait to see updates in dashboard

{"timestamp": "2025-02-12 21:50:14,601", "level": "INFO", "message": "Starting monitoring block: batch_0", "logger_name": "pipeline_monitor.context"}


{"timestamp": "2025-02-12 21:50:14,703", "level": "INFO", "message": "{\"function_name\": \"process_batch\", \"execution_time\": 0.10019397735595703, \"memory_usage_mb\": 0.0, \"success\": true, \"timestamp\": \"2025-02-12 21:50:14\"}", "logger_name": "pipeline_monitor.decorators"}


emitting event "metric_update" to all [/]


{"timestamp": "2025-02-12 21:50:14,704", "level": "INFO", "message": "emitting event \"metric_update\" to all [/]", "logger_name": "socketio.server"}


emitting event "metric_update" to all [/]


{"timestamp": "2025-02-12 21:50:14,705", "level": "INFO", "message": "emitting event \"metric_update\" to all [/]", "logger_name": "socketio.server"}


Processed batch 0: 10 items


{"timestamp": "2025-02-12 21:50:15,707", "level": "INFO", "message": "{\"block_name\": \"batch_0\", \"execution_time\": 1.1053102016448975, \"memory_usage_mb\": 0.125, \"success\": true, \"timestamp\": \"2025-02-12 21:50:15\"}", "logger_name": "pipeline_monitor.context"}


emitting event "metric_update" to all [/]


{"timestamp": "2025-02-12 21:50:15,708", "level": "INFO", "message": "emitting event \"metric_update\" to all [/]", "logger_name": "socketio.server"}


emitting event "metric_update" to all [/]


{"timestamp": "2025-02-12 21:50:15,709", "level": "INFO", "message": "emitting event \"metric_update\" to all [/]", "logger_name": "socketio.server"}


{"timestamp": "2025-02-12 21:50:15,711", "level": "INFO", "message": "Starting monitoring block: batch_1", "logger_name": "pipeline_monitor.context"}


{"timestamp": "2025-02-12 21:50:15,812", "level": "INFO", "message": "{\"function_name\": \"process_batch\", \"execution_time\": 0.10016798973083496, \"memory_usage_mb\": 0.0, \"success\": true, \"timestamp\": \"2025-02-12 21:50:15\"}", "logger_name": "pipeline_monitor.decorators"}


emitting event "metric_update" to all [/]


{"timestamp": "2025-02-12 21:50:15,814", "level": "INFO", "message": "emitting event \"metric_update\" to all [/]", "logger_name": "socketio.server"}


emitting event "metric_update" to all [/]


{"timestamp": "2025-02-12 21:50:15,815", "level": "INFO", "message": "emitting event \"metric_update\" to all [/]", "logger_name": "socketio.server"}


Processed batch 1: 10 items


{"timestamp": "2025-02-12 21:50:16,816", "level": "INFO", "message": "{\"block_name\": \"batch_1\", \"execution_time\": 1.1052234172821045, \"memory_usage_mb\": 0.0, \"success\": true, \"timestamp\": \"2025-02-12 21:50:16\"}", "logger_name": "pipeline_monitor.context"}


emitting event "metric_update" to all [/]


{"timestamp": "2025-02-12 21:50:16,817", "level": "INFO", "message": "emitting event \"metric_update\" to all [/]", "logger_name": "socketio.server"}


emitting event "metric_update" to all [/]


{"timestamp": "2025-02-12 21:50:16,818", "level": "INFO", "message": "emitting event \"metric_update\" to all [/]", "logger_name": "socketio.server"}


{"timestamp": "2025-02-12 21:50:16,820", "level": "INFO", "message": "Starting monitoring block: batch_2", "logger_name": "pipeline_monitor.context"}


{"timestamp": "2025-02-12 21:50:16,921", "level": "INFO", "message": "{\"function_name\": \"process_batch\", \"execution_time\": 0.1002192497253418, \"memory_usage_mb\": 0.0, \"success\": true, \"timestamp\": \"2025-02-12 21:50:16\"}", "logger_name": "pipeline_monitor.decorators"}


emitting event "metric_update" to all [/]


{"timestamp": "2025-02-12 21:50:16,923", "level": "INFO", "message": "emitting event \"metric_update\" to all [/]", "logger_name": "socketio.server"}


emitting event "metric_update" to all [/]


{"timestamp": "2025-02-12 21:50:16,926", "level": "INFO", "message": "emitting event \"metric_update\" to all [/]", "logger_name": "socketio.server"}


Processed batch 2: 10 items


{"timestamp": "2025-02-12 21:50:17,928", "level": "INFO", "message": "{\"block_name\": \"batch_2\", \"execution_time\": 1.1077470779418945, \"memory_usage_mb\": 0.125, \"success\": true, \"timestamp\": \"2025-02-12 21:50:17\"}", "logger_name": "pipeline_monitor.context"}


emitting event "metric_update" to all [/]


{"timestamp": "2025-02-12 21:50:17,929", "level": "INFO", "message": "emitting event \"metric_update\" to all [/]", "logger_name": "socketio.server"}


emitting event "metric_update" to all [/]


{"timestamp": "2025-02-12 21:50:17,930", "level": "INFO", "message": "emitting event \"metric_update\" to all [/]", "logger_name": "socketio.server"}


{"timestamp": "2025-02-12 21:50:17,931", "level": "INFO", "message": "Starting monitoring block: batch_3", "logger_name": "pipeline_monitor.context"}


{"timestamp": "2025-02-12 21:50:18,032", "level": "INFO", "message": "{\"function_name\": \"process_batch\", \"execution_time\": 0.10019397735595703, \"memory_usage_mb\": 0.0, \"success\": true, \"timestamp\": \"2025-02-12 21:50:18\"}", "logger_name": "pipeline_monitor.decorators"}


emitting event "metric_update" to all [/]


{"timestamp": "2025-02-12 21:50:18,033", "level": "INFO", "message": "emitting event \"metric_update\" to all [/]", "logger_name": "socketio.server"}


emitting event "metric_update" to all [/]


{"timestamp": "2025-02-12 21:50:18,035", "level": "INFO", "message": "emitting event \"metric_update\" to all [/]", "logger_name": "socketio.server"}


Processed batch 3: 10 items


{"timestamp": "2025-02-12 21:50:19,036", "level": "INFO", "message": "{\"block_name\": \"batch_3\", \"execution_time\": 1.105074405670166, \"memory_usage_mb\": 0.0, \"success\": true, \"timestamp\": \"2025-02-12 21:50:19\"}", "logger_name": "pipeline_monitor.context"}


emitting event "metric_update" to all [/]


{"timestamp": "2025-02-12 21:50:19,038", "level": "INFO", "message": "emitting event \"metric_update\" to all [/]", "logger_name": "socketio.server"}


emitting event "metric_update" to all [/]


{"timestamp": "2025-02-12 21:50:19,041", "level": "INFO", "message": "emitting event \"metric_update\" to all [/]", "logger_name": "socketio.server"}


{"timestamp": "2025-02-12 21:50:19,042", "level": "INFO", "message": "Starting monitoring block: batch_4", "logger_name": "pipeline_monitor.context"}


{"timestamp": "2025-02-12 21:50:19,143", "level": "INFO", "message": "{\"function_name\": \"process_batch\", \"execution_time\": 0.10024809837341309, \"memory_usage_mb\": 0.0, \"success\": true, \"timestamp\": \"2025-02-12 21:50:19\"}", "logger_name": "pipeline_monitor.decorators"}


emitting event "metric_update" to all [/]


{"timestamp": "2025-02-12 21:50:19,145", "level": "INFO", "message": "emitting event \"metric_update\" to all [/]", "logger_name": "socketio.server"}


emitting event "metric_update" to all [/]


{"timestamp": "2025-02-12 21:50:19,146", "level": "INFO", "message": "emitting event \"metric_update\" to all [/]", "logger_name": "socketio.server"}


Processed batch 4: 10 items


{"timestamp": "2025-02-12 21:50:20,148", "level": "INFO", "message": "{\"block_name\": \"batch_4\", \"execution_time\": 1.1057491302490234, \"memory_usage_mb\": 0.0, \"success\": true, \"timestamp\": \"2025-02-12 21:50:20\"}", "logger_name": "pipeline_monitor.context"}


emitting event "metric_update" to all [/]


{"timestamp": "2025-02-12 21:50:20,149", "level": "INFO", "message": "emitting event \"metric_update\" to all [/]", "logger_name": "socketio.server"}


emitting event "metric_update" to all [/]


{"timestamp": "2025-02-12 21:50:20,150", "level": "INFO", "message": "emitting event \"metric_update\" to all [/]", "logger_name": "socketio.server"}


## Viewing Metrics

Open the dashboard at http://localhost:5000 to see:
1. Active pipeline counts
2. Memory usage metrics
3. Performance alerts

The metrics update in real-time as the pipeline processes data.