# Real-time Process Monitoring and Control

## Purpose

This notebook teaches you how to implement real-time process monitoring and control for additive manufacturing processes. You'll learn to set up streaming data processing, configure live quality dashboards, manage alerts and notifications, integrate SPC for real-time monitoring, and monitor system health using a unified interactive interface with real-time progress tracking and detailed logging.

## Learning Objectives

By the end of this notebook, you will:
- ‚úÖ Set up Kafka-based streaming data processing
- ‚úÖ Configure real-time data streaming with incremental processing
- ‚úÖ Create live quality dashboards with WebSocket updates
- ‚úÖ Configure alert thresholds and notification channels (Email, SMS, Dashboard)
- ‚úÖ Integrate SPC for real-time process monitoring
- ‚úÖ Monitor system and process health
- ‚úÖ Execute complete end-to-end real-time monitoring workflows
- ‚úÖ Monitor streaming and monitoring progress with real-time status and logs

## Estimated Duration

90-120 minutes

---

## Overview

Real-time Process Monitoring and Control enables live monitoring of manufacturing processes as they occur. The AM-QADF streaming and monitoring modules provide:

- üì° **Data Streaming**: Kafka integration for real-time data consumption
- ‚öôÔ∏è **Incremental Processing**: Process streaming data incrementally to update voxel grids
- ‚è±Ô∏è **Buffer Management**: Temporal windows and buffer management for streaming data
- ‚ö° **Low-Latency Processing**: Stream processing pipelines with quality checkpoints
- üö® **Alert System**: Multi-channel alert generation and management
- üìä **Threshold Management**: Dynamic threshold checking (absolute, relative, rate-of-change, SPC-based)
- üíö **Health Monitoring**: System and process health monitoring
- üìß **Notifications**: Multi-channel notifications (Email, SMS, Dashboard via WebSocket)
- üìà **SPC Integration**: Real-time SPC monitoring with control charts
- üîÑ **Live Dashboards**: Real-time quality dashboards with WebSocket updates

The notebook features a unified interactive interface with:
- **Progress Tracking**: Visual progress bars showing completion percentage
- **Status Monitoring**: Real-time status updates with elapsed time
- **Detailed Logging**: Timestamped logs with success/warning/error indicators for all operations
- **Error Handling**: Comprehensive error messages and tracebacks in the logs

Use the interactive widgets below to set up and manage real-time monitoring - no coding required! Monitor your streaming and monitoring operations progress in real-time using the status bar and logs section at the bottom.

## Interactive Real-time Monitoring Interface

Use the widgets below to configure real-time monitoring, set up streaming, create dashboards, configure alerts, integrate SPC, monitor health, and execute complete workflows. All real-time monitoring tasks are organized systematically in one unified interface!

In [1]:
# Setup: Import required libraries
import sys
from pathlib import Path
import warnings
warnings.filterwarnings('ignore')

# Add parent directory and src directory to path for imports
notebook_dir = Path().resolve()
project_root = notebook_dir.parent
src_dir = project_root / 'src'

# Add project root to path (for src.infrastructure imports)
if str(project_root) not in sys.path:
    sys.path.insert(0, str(project_root))

# Add src directory to path (for am_qadf imports)
if str(src_dir) not in sys.path:
    sys.path.insert(0, str(src_dir))

# Core imports
import ipywidgets as widgets
from ipywidgets import (
    VBox, HBox, Accordion, Tab, Dropdown, RadioButtons, 
    Checkbox, Button, Output, Text, IntSlider, FloatSlider,
    Layout, Box, Label, FloatText, IntText, SelectMultiple,
    HTML as WidgetHTML, Textarea, FileUpload, Valid, Play, jslink
)
from IPython.display import display, Markdown, HTML, clear_output, Javascript
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta
import time
import json
import threading
from typing import Optional, Tuple, Dict, Any, List
import asyncio
from collections import deque

# Set style for plots
plt.style.use('seaborn-v0_8-darkgrid')
sns.set_palette("husl")

# Load environment variables from development.env
import os
env_file = project_root / 'development.env'
if env_file.exists():
    with open(env_file, 'r') as f:
        for line in f:
            line = line.strip()
            if line and not line.startswith('#') and '=' in line:
                key, value = line.split('=', 1)
                value = value.strip('"\'')
                os.environ[key] = value
    print("‚úÖ Environment variables loaded from development.env")

# Try to import streaming classes
STREAMING_AVAILABLE = False
streaming_client = None

try:
    from am_qadf.streaming import (
        StreamingClient, StreamingConfig, StreamingResult,
        IncrementalProcessor, BufferManager, StreamProcessor, StreamStorage
    )
    STREAMING_AVAILABLE = True
    print("‚úÖ Streaming classes available")
except ImportError as e:
    print(f"‚ö†Ô∏è Streaming classes not available: {e} - using demo mode")

# Try to import monitoring classes
MONITORING_AVAILABLE = False
monitoring_client = None

try:
    from am_qadf.monitoring import (
        MonitoringClient, MonitoringConfig, AlertSystem, Alert,
        ThresholdManager, ThresholdConfig,
        HealthMonitor, HealthStatus,
        NotificationChannels, MonitoringStorage
    )
    MONITORING_AVAILABLE = True
    print("‚úÖ Monitoring classes available")
except ImportError as e:
    print(f"‚ö†Ô∏è Monitoring classes not available: {e} - using demo mode")

# Try to import SPC classes
SPC_AVAILABLE = False
spc_client = None

try:
    from am_qadf.analytics.spc import SPCClient, SPCConfig, BaselineStatistics
    SPC_AVAILABLE = True
    print("‚úÖ SPC classes available")
except ImportError as e:
    print(f"‚ö†Ô∏è SPC classes not available: {e}")

# Try to import quality assessment client
QUALITY_AVAILABLE = False
quality_client = None

try:
    from am_qadf.analytics.quality_assessment.client import QualityAssessmentClient
    quality_client = QualityAssessmentClient(enable_spc=SPC_AVAILABLE, mongo_client=None)
    QUALITY_AVAILABLE = True
    print("‚úÖ Quality assessment client available")
except ImportError as e:
    print(f"‚ö†Ô∏è Quality assessment client not available: {e}")

# Try to import voxel grid
VOXEL_AVAILABLE = False
try:
    from am_qadf.voxelization.voxel_grid import VoxelGrid
    VOXEL_AVAILABLE = True
    print("‚úÖ Voxel grid available")
except ImportError as e:
    print(f"‚ö†Ô∏è Voxel grid not available: {e}")

# MongoDB connection setup (optional, for persistence)
INFRASTRUCTURE_AVAILABLE = False
mongo_client = None

try:
    from src.infrastructure.config import MongoDBConfig
    from src.infrastructure.database import MongoDBClient
    
    config = MongoDBConfig.from_env()
    if not config.username:
        config.username = os.getenv('MONGO_ROOT_USERNAME', 'admin')
    if not config.password:
        config.password = os.getenv('MONGO_ROOT_PASSWORD', 'password')
    
    mongo_client = MongoDBClient(config=config)
    try:
        if mongo_client.is_connected():
            INFRASTRUCTURE_AVAILABLE = True
            print(f"‚úÖ Connected to MongoDB: {config.database}")
        else:
            mongo_client = None
            print("‚ö†Ô∏è MongoDB connection failed - using demo mode")
    except Exception as conn_error:
        mongo_client = None
        print(f"‚ö†Ô∏è MongoDB connection check failed: {conn_error} - using demo mode")
except ImportError as e:
    print(f"‚ö†Ô∏è MongoDB infrastructure not available: {e} - using demo mode")
    mongo_client = None
except Exception as e:
    print(f"‚ö†Ô∏è MongoDB not available: {e} - using demo mode")
    mongo_client = None

print("‚úÖ Setup complete!")

‚úÖ Environment variables loaded from development.env
‚úÖ Streaming classes available
‚úÖ Monitoring classes available
‚úÖ SPC classes available
‚úÖ Quality assessment client available
‚úÖ Voxel grid available
‚úÖ Connected to MongoDB: am_qadf_data
‚úÖ Setup complete!


In [2]:
# Create Interactive Real-time Monitoring Interface

# Global state
streaming_results = {}
monitoring_results = {}
alert_history = []
health_history = []
spc_streaming_results = {}
dashboard_data = {}
current_operation = None
operation_start_time = None
is_streaming_active = False
is_monitoring_active = False

# Initialize clients if available
if STREAMING_AVAILABLE:
    streaming_config = StreamingConfig(
        buffer_size=1000,
        processing_batch_size=100,
    )
    streaming_client = StreamingClient(config=streaming_config)
    print("‚úÖ Streaming client initialized")
else:
    streaming_client = None
    print("‚ö†Ô∏è Streaming client not available - using demo mode")

if MONITORING_AVAILABLE:
    monitoring_config = MonitoringConfig(
        enable_alerts=True,
        alert_cooldown_seconds=300.0,
        enable_dashboard_notifications=True,
        enable_health_monitoring=True,
    )
    monitoring_client = MonitoringClient(config=monitoring_config)
    print("‚úÖ Monitoring client initialized")
else:
    monitoring_client = None
    print("‚ö†Ô∏è Monitoring client not available - using demo mode")

if SPC_AVAILABLE:
    spc_config = SPCConfig(control_limit_sigma=3.0, subgroup_size=5)
    spc_client = SPCClient(config=spc_config, mongo_client=mongo_client if INFRASTRUCTURE_AVAILABLE else None)
    print("‚úÖ SPC client initialized")
else:
    spc_client = None
    print("‚ö†Ô∏è SPC client not available")

# Initialize voxel grid if available
voxel_grid = None
if VOXEL_AVAILABLE:
    voxel_grid = VoxelGrid(
        bbox_min=(0.0, 0.0, 0.0),
        bbox_max=(100.0, 100.0, 100.0),
        resolution=1.0,
        aggregation='mean'
    )
    print("‚úÖ Voxel grid initialized")
else:
    print("‚ö†Ô∏è Voxel grid not available - using demo mode")

# ============================================
# Helper Functions for Demo Data Generation
# ============================================

def generate_demo_streaming_data(n_points=100, signal_types=['temperature', 'power', 'velocity']):
    """Generate demo streaming data."""
    np.random.seed(42)
    
    data_points = []
    base_time = datetime.now() - timedelta(seconds=n_points)
    
    for i in range(n_points):
        point = {
            'timestamp': base_time + timedelta(seconds=i),
            'x': np.random.uniform(10.0, 90.0),
            'y': np.random.uniform(10.0, 90.0),
            'z': np.random.uniform(10.0, 90.0),
        }
        
        # Add signal values
        if 'temperature' in signal_types:
            point['temperature'] = 1000.0 + np.random.normal(0, 50.0)
        if 'power' in signal_types:
            point['power'] = 200.0 + np.random.normal(0, 10.0)
        if 'velocity' in signal_types:
            point['velocity'] = 100.0 + np.random.normal(0, 5.0)
        
        data_points.append(point)
    
    return data_points

def generate_demo_streaming_data_with_anomalies(n_points=100, anomaly_rate=0.1):
    """Generate demo streaming data with anomalies."""
    np.random.seed(42)
    
    data_points = []
    base_time = datetime.now() - timedelta(seconds=n_points)
    n_anomalies = int(n_points * anomaly_rate)
    anomaly_indices = np.random.choice(n_points, n_anomalies, replace=False)
    
    for i in range(n_points):
        point = {
            'timestamp': base_time + timedelta(seconds=i),
            'x': np.random.uniform(10.0, 90.0),
            'y': np.random.uniform(10.0, 90.0),
            'z': np.random.uniform(10.0, 90.0),
        }
        
        # Add signal values (with anomalies)
        if i in anomaly_indices:
            point['temperature'] = 1000.0 + np.random.uniform(150.0, 250.0)  # Hot anomaly
            point['power'] = 200.0 + np.random.uniform(30.0, 50.0)  # High power
            point['is_anomaly'] = True
        else:
            point['temperature'] = 1000.0 + np.random.normal(0, 50.0)
            point['power'] = 200.0 + np.random.normal(0, 10.0)
            point['is_anomaly'] = False
        
        point['velocity'] = 100.0 + np.random.normal(0, 5.0)
        data_points.append(point)
    
    return data_points

def generate_demo_health_metrics(component_name='system', healthy=True):
    """Generate demo health metrics."""
    np.random.seed(42)
    
    if healthy:
        metrics = {
            'cpu_percent': np.random.uniform(20.0, 60.0),
            'memory_percent': np.random.uniform(30.0, 70.0),
            'disk_percent': np.random.uniform(40.0, 80.0),
            'error_rate': np.random.uniform(0.0, 0.05),
        }
    else:
        metrics = {
            'cpu_percent': np.random.uniform(85.0, 95.0),
            'memory_percent': np.random.uniform(85.0, 95.0),
            'disk_percent': np.random.uniform(85.0, 95.0),
            'error_rate': np.random.uniform(0.2, 0.5),
        }
    
    return metrics

print("‚úÖ Helper functions initialized")

‚úÖ Streaming client initialized
‚úÖ Monitoring client initialized
‚úÖ SPC client initialized
‚úÖ Voxel grid initialized
‚úÖ Helper functions initialized


In [3]:
# ============================================
# Top Panel: Operation Type Selection and Actions
# ============================================

operation_type_label = WidgetHTML("<b>Operation Type:</b>")
operation_type = RadioButtons(
    options=[
        ('Real-time Setup', 'setup'),
        ('Stream Processing', 'streaming'),
        ('Live Dashboard', 'dashboard'),
        ('Alert Configuration', 'alerts'),
        ('Real-time SPC', 'spc'),
        ('Health Monitoring', 'health'),
        ('Complete Workflow', 'complete')
    ],
    value='setup',
    description='Type:',
    style={'description_width': 'initial'}
)

data_source_label = WidgetHTML("<b>Data Source:</b>")
data_source_mode = RadioButtons(
    options=[('Demo Data', 'demo'), ('MongoDB', 'mongodb'), ('Kafka', 'kafka')],
    value='demo',
    description='Source:',
    style={'description_width': 'initial'}
)

execute_button = Button(
    description='Execute Operation',
    button_style='success',
    icon='play',
    layout=Layout(width='200px', height='40px')
)

stop_button = Button(
    description='Stop Operation',
    button_style='danger',
    icon='stop',
    layout=Layout(width='180px', height='40px'),
    disabled=True
)

export_button = Button(
    description='Export Results',
    button_style='',
    icon='download',
    layout=Layout(width='150px', height='40px')
)

top_panel = VBox([
    HBox([operation_type_label, operation_type], layout=Layout(margin='10px')),
    HBox([data_source_label, data_source_mode, execute_button, stop_button, export_button], 
         layout=Layout(margin='10px'))
], layout=Layout(border='2px solid #0277bd', padding='10px', margin='5px'))

print("‚úÖ Top panel created")

‚úÖ Top panel created


In [4]:
# ============================================
# Left Panel: Configuration Accordion
# ============================================

# Kafka Configuration
kafka_bootstrap_servers = Text(
    value='localhost:9092',
    description='Bootstrap Servers:',
    layout=Layout(width='100%')
)
kafka_topic = Text(
    value='am_qadf_monitoring',
    description='Topic:',
    layout=Layout(width='100%')
)
kafka_consumer_group = Text(
    value='am_qadf_consumers',
    description='Consumer Group:',
    layout=Layout(width='100%')
)

kafka_config = VBox([
    WidgetHTML("<b>Kafka Configuration</b>"),
    kafka_bootstrap_servers,
    kafka_topic,
    kafka_consumer_group,
], layout=Layout(padding='10px'))

# Streaming Configuration
streaming_buffer_size = IntSlider(
    value=1000,
    min=100,
    max=10000,
    step=100,
    description='Buffer Size:',
    layout=Layout(width='100%')
)
streaming_batch_size = IntSlider(
    value=100,
    min=10,
    max=1000,
    step=10,
    description='Batch Size:',
    layout=Layout(width='100%')
)
enable_incremental = Checkbox(
    value=True,
    description='Enable Incremental Processing',
    layout=Layout(width='100%')
)
enable_buffer_management = Checkbox(
    value=True,
    description='Enable Buffer Management',
    layout=Layout(width='100%')
)

streaming_config = VBox([
    WidgetHTML("<b>Streaming Configuration</b>"),
    streaming_buffer_size,
    streaming_batch_size,
    enable_incremental,
    enable_buffer_management,
], layout=Layout(padding='10px'))

# Dashboard Configuration
dashboard_websocket_port = IntText(
    value=8765,
    description='WebSocket Port:',
    layout=Layout(width='100%')
)
dashboard_update_interval = FloatSlider(
    value=1.0,
    min=0.1,
    max=10.0,
    step=0.1,
    description='Update Interval (s):',
    layout=Layout(width='100%')
)
enable_live_updates = Checkbox(
    value=True,
    description='Enable Live Updates',
    layout=Layout(width='100%')
)

dashboard_config = VBox([
    WidgetHTML("<b>Dashboard Configuration</b>"),
    dashboard_websocket_port,
    dashboard_update_interval,
    enable_live_updates,
], layout=Layout(padding='10px'))

# Alert Configuration
alert_cooldown = FloatSlider(
    value=300.0,
    min=0.0,
    max=3600.0,
    step=60.0,
    description='Cooldown (s):',
    layout=Layout(width='100%')
)
enable_email = Checkbox(
    value=False,
    description='Enable Email Notifications',
    layout=Layout(width='100%')
)
enable_sms = Checkbox(
    value=False,
    description='Enable SMS Notifications',
    layout=Layout(width='100%')
)
enable_dashboard_alerts = Checkbox(
    value=True,
    description='Enable Dashboard Alerts',
    layout=Layout(width='100%')
)

alert_config = VBox([
    WidgetHTML("<b>Alert Configuration</b>"),
    alert_cooldown,
    enable_email,
    enable_sms,
    enable_dashboard_alerts,
], layout=Layout(padding='10px'))

# Threshold Configuration
threshold_metric_name = Text(
    value='temperature',
    description='Metric Name:',
    layout=Layout(width='100%')
)
threshold_type = Dropdown(
    options=['absolute', 'relative', 'rate_of_change', 'spc_limit'],
    value='absolute',
    description='Threshold Type:',
    layout=Layout(width='100%')
)
lower_threshold = FloatText(
    value=800.0,
    description='Lower Threshold:',
    layout=Layout(width='100%')
)
upper_threshold = FloatText(
    value=1200.0,
    description='Upper Threshold:',
    layout=Layout(width='100%')
)

threshold_config = VBox([
    WidgetHTML("<b>Threshold Configuration</b>"),
    threshold_metric_name,
    threshold_type,
    lower_threshold,
    upper_threshold,
], layout=Layout(padding='10px'))

# SPC Configuration
spc_sigma = FloatSlider(
    value=3.0,
    min=1.0,
    max=5.0,
    step=0.5,
    description='Control Limit (œÉ):',
    layout=Layout(width='100%')
)
spc_subgroup_size = IntSlider(
    value=5,
    min=2,
    max=20,
    step=1,
    description='Subgroup Size:',
    layout=Layout(width='100%')
)
enable_spc_monitoring = Checkbox(
    value=True,
    description='Enable SPC Monitoring',
    layout=Layout(width='100%')
)

spc_config = VBox([
    WidgetHTML("<b>SPC Configuration</b>"),
    spc_sigma,
    spc_subgroup_size,
    enable_spc_monitoring,
], layout=Layout(padding='10px'))

# Health Configuration
health_check_interval = FloatSlider(
    value=60.0,
    min=1.0,
    max=600.0,
    step=10.0,
    description='Check Interval (s):',
    layout=Layout(width='100%')
)
enable_system_health = Checkbox(
    value=True,
    description='Enable System Health',
    layout=Layout(width='100%')
)
enable_process_health = Checkbox(
    value=True,
    description='Enable Process Health',
    layout=Layout(width='100%')
)

health_config = VBox([
    WidgetHTML("<b>Health Configuration</b>"),
    health_check_interval,
    enable_system_health,
    enable_process_health,
], layout=Layout(padding='10px'))

# Configuration Accordion
config_accordion = Accordion(children=[
    kafka_config,
    streaming_config,
    dashboard_config,
    alert_config,
    threshold_config,
    spc_config,
    health_config,
], selected_index=None, layout=Layout(width='100%'))

config_accordion.set_title(0, 'Kafka')
config_accordion.set_title(1, 'Streaming')
config_accordion.set_title(2, 'Dashboard')
config_accordion.set_title(3, 'Alerts')
config_accordion.set_title(4, 'Thresholds')
config_accordion.set_title(5, 'SPC')
config_accordion.set_title(6, 'Health')

left_panel = VBox([
    WidgetHTML("<h3>Configuration</h3>"),
    config_accordion,
], layout=Layout(width='350px', border='2px solid #f57c00', padding='10px', margin='5px'))

print("‚úÖ Configuration accordion created")

‚úÖ Configuration accordion created


In [5]:
# ============================================
# Center Panel: Main Output (Plots, Dashboards, Logs)
# ============================================

center_output = Output(layout=Layout(
    width='100%',
    height='600px',
    border='2px solid #2e7d32',
    padding='10px',
    margin='5px',
    overflow='auto'
))

center_panel = VBox([
    WidgetHTML("<h3>Main Output</h3>"),
    center_output,
], layout=Layout(width='100%', min_height='650px'))

# ============================================
# Right Panel: Status, Metrics, Alerts, Health
# ============================================

# Status Display
status_output = Output(layout=Layout(width='100%', height='150px', border='1px solid #ccc', padding='5px', overflow='auto'))

# Metrics Display
metrics_output = Output(layout=Layout(width='100%', height='150px', border='1px solid #ccc', padding='5px', overflow='auto'))

# Alerts Display
alerts_output = Output(layout=Layout(width='100%', height='150px', border='1px solid #ccc', padding='5px', overflow='auto'))

# Health Status Display
health_output = Output(layout=Layout(width='100%', height='150px', border='1px solid #ccc', padding='5px', overflow='auto'))

right_panel = VBox([
    WidgetHTML("<h3>Status & Monitoring</h3>"),
    WidgetHTML("<b>Status:</b>"),
    status_output,
    WidgetHTML("<b>Metrics:</b>"),
    metrics_output,
    WidgetHTML("<b>Active Alerts:</b>"),
    alerts_output,
    WidgetHTML("<b>Health Status:</b>"),
    health_output,
], layout=Layout(width='350px', border='2px solid #7b1fa2', padding='10px', margin='5px', min_height='650px'))

# ============================================
# Bottom Panel: Logs, Progress, Warnings
# ============================================

# Progress Bar
progress_bar = widgets.IntProgress(
    value=0,
    min=0,
    max=100,
    description='Progress:',
    bar_style='info',
    layout=Layout(width='100%', height='30px')
)

# Status Text
status_text = WidgetHTML("<b>Status:</b> Ready")

# Logs Output
logs_output = Output(layout=Layout(
    width='100%',
    height='200px',
    border='1px solid #424242',
    padding='10px',
    margin='5px',
    overflow='auto'
))

bottom_panel = VBox([
    WidgetHTML("<h3>Logs & Progress</h3>"),
    progress_bar,
    status_text,
    logs_output,
], layout=Layout(width='100%', border='2px solid #424242', padding='10px', margin='5px', min_height='250px'))

print("‚úÖ Output panels created")

‚úÖ Output panels created


In [6]:
# ============================================
# Execution Handlers
# ============================================

def log_message(message, level='info'):
    """Log a message to the logs output."""
    timestamp = datetime.now().strftime('%H:%M:%S')
    icon = '‚úÖ' if level == 'info' else '‚ö†Ô∏è' if level == 'warning' else '‚ùå'
    with logs_output:
        print(f"[{timestamp}] {icon} {message}")

def update_status(message):
    """Update status text."""
    status_text.value = f"<b>Status:</b> {message}"

def update_progress(value, max_value=100):
    """Update progress bar."""
    progress_bar.value = int((value / max_value) * 100) if max_value > 0 else 0

def execute_operation(button):
    """Execute the selected operation."""
    global current_operation, operation_start_time, is_streaming_active, is_monitoring_active
    
    op_type = operation_type.value
    data_source = data_source_mode.value
    
    current_operation = op_type
    operation_start_time = time.time()
    execute_button.disabled = True
    stop_button.disabled = False
    
    log_message(f"Starting operation: {op_type} (Data source: {data_source})")
    update_status(f"Executing: {op_type}")
    update_progress(0)
    
    with center_output:
        clear_output(wait=True)
        print(f"Executing: {op_type}")
        print(f"Data source: {data_source}")
        print("-" * 80)
    
    try:
        if op_type == 'setup':
            execute_setup()
        elif op_type == 'streaming':
            execute_stream_processing(data_source)
        elif op_type == 'dashboard':
            execute_dashboard_setup()
        elif op_type == 'alerts':
            execute_alert_configuration()
        elif op_type == 'spc':
            execute_spc_integration(data_source)
        elif op_type == 'health':
            execute_health_monitoring()
        elif op_type == 'complete':
            execute_complete_workflow(data_source)
        
        update_progress(100)
        log_message(f"Operation '{op_type}' completed successfully")
        update_status("Ready")
        
    except Exception as e:
        log_message(f"Error during operation: {str(e)}", level='error')
        update_status(f"Error: {str(e)}")
        with center_output:
            import traceback
            traceback.print_exc()
    finally:
        execute_button.disabled = False
        stop_button.disabled = True

def stop_operation(button):
    """Stop the current operation."""
    global is_streaming_active, is_monitoring_active
    log_message("Stopping operation...")
    is_streaming_active = False
    is_monitoring_active = False
    
    if monitoring_client and MONITORING_AVAILABLE:
        try:
            monitoring_client.stop_monitoring()
        except:
            pass
    
    update_status("Stopped")
    execute_button.disabled = False
    stop_button.disabled = True

def execute_setup():
    """Execute real-time monitoring setup."""
    log_message("Setting up real-time monitoring...")
    update_progress(10)
    
    # Configure streaming
    if STREAMING_AVAILABLE and streaming_client:
        streaming_client.config.buffer_size = streaming_buffer_size.value
        streaming_client.config.processing_batch_size = streaming_batch_size.value
        log_message("Streaming configuration updated")
        update_progress(30)
    
    # Configure monitoring
    if MONITORING_AVAILABLE and monitoring_client:
        monitoring_client.config.alert_cooldown_seconds = alert_cooldown.value
        monitoring_client.config.enable_dashboard_notifications = enable_dashboard_alerts.value
        log_message("Monitoring configuration updated")
        update_progress(50)
    
    # Configure SPC
    if SPC_AVAILABLE and spc_client:
        spc_client.config.control_limit_sigma = spc_sigma.value
        spc_client.config.subgroup_size = spc_subgroup_size.value
        log_message("SPC configuration updated")
        update_progress(70)
    
    with center_output:
        print("‚úÖ Real-time monitoring setup completed!")
        print("\nConfiguration Summary:")
        print(f"  - Streaming buffer size: {streaming_buffer_size.value}")
        print(f"  - Streaming batch size: {streaming_batch_size.value}")
        print(f"  - Alert cooldown: {alert_cooldown.value}s")
        print(f"  - SPC sigma: {spc_sigma.value}")
        print(f"  - SPC subgroup size: {spc_subgroup_size.value}")
    
    log_message("Setup completed successfully")
    update_progress(100)

def execute_stream_processing(data_source):
    """Execute stream processing operation."""
    global is_streaming_active
    
    log_message("Starting stream processing...")
    update_progress(10)
    is_streaming_active = True
    
    # Generate or load data
    if data_source == 'demo':
        data = generate_demo_streaming_data(n_points=100)
        log_message(f"Generated {len(data)} demo data points")
    else:
        data = []  # Would load from MongoDB or Kafka in real implementation
        log_message("Loading data from source (demo mode)")
    
    update_progress(30)
    
    # Process data if streaming client available
    if STREAMING_AVAILABLE and streaming_client and data:
        batch_size = streaming_batch_size.value
        
        for i in range(0, len(data), batch_size):
            if not is_streaming_active:
                break
            
            batch = data[i:i+batch_size]
            result = streaming_client.process_stream_batch(batch)
            
            progress = min(30 + int((i / len(data)) * 60), 90)
            update_progress(progress)
            log_message(f"Processed batch {i//batch_size + 1}: {result.processed_count} messages")
        
        stats = streaming_client.get_stream_statistics()
        
        with center_output:
            print(f"‚úÖ Stream processing completed!")
            print(f"\nStatistics:")
            print(f"  - Messages processed: {stats['messages_processed']}")
            print(f"  - Batches processed: {stats['batches_processed']}")
            print(f"  - Average latency: {stats['average_latency_ms']:.2f} ms")
            print(f"  - Throughput: {stats['throughput_messages_per_sec']:.2f} msg/s")
    else:
        with center_output:
            print("‚ö†Ô∏è Streaming client not available - using demo mode")
            print(f"Would process {len(data)} data points")
    
    log_message("Stream processing completed")
    update_progress(100)

def execute_dashboard_setup():
    """Execute dashboard setup."""
    log_message("Setting up live dashboard...")
    update_progress(20)
    
    with center_output:
        print("‚úÖ Live dashboard setup!")
        print(f"\nDashboard Configuration:")
        print(f"  - WebSocket port: {dashboard_websocket_port.value}")
        print(f"  - Update interval: {dashboard_update_interval.value}s")
        print(f"  - Live updates: {enable_live_updates.value}")
        print("\nüìä Dashboard would be available at:")
        print(f"   ws://localhost:{dashboard_websocket_port.value}")
        print("\nNote: WebSocket server implementation would be started here in production")
    
    log_message("Dashboard setup completed")
    update_progress(100)

def execute_alert_configuration():
    """Execute alert configuration."""
    log_message("Configuring alerts...")
    update_progress(20)
    
    if MONITORING_AVAILABLE and monitoring_client:
        # Register metric with threshold
        threshold_config = ThresholdConfig(
            metric_name=threshold_metric_name.value,
            threshold_type=threshold_type.value,
            lower_threshold=lower_threshold.value if lower_threshold.value != 0 else None,
            upper_threshold=upper_threshold.value if upper_threshold.value != 0 else None,
        )
        monitoring_client.register_metric(threshold_metric_name.value, threshold_config)
        log_message(f"Registered metric: {threshold_metric_name.value}")
        update_progress(60)
        
        # Configure notification channels
        monitoring_client.config.enable_email_notifications = enable_email.value
        monitoring_client.config.enable_sms_notifications = enable_sms.value
        monitoring_client.config.enable_dashboard_notifications = enable_dashboard_alerts.value
        log_message("Notification channels configured")
        update_progress(80)
    
    with center_output:
        print("‚úÖ Alert configuration completed!")
        print(f"\nAlert Configuration:")
        print(f"  - Metric: {threshold_metric_name.value}")
        print(f"  - Threshold type: {threshold_type.value}")
        print(f"  - Lower threshold: {lower_threshold.value}")
        print(f"  - Upper threshold: {upper_threshold.value}")
        print(f"  - Cooldown: {alert_cooldown.value}s")
        print(f"\nNotification Channels:")
        print(f"  - Email: {enable_email.value}")
        print(f"  - SMS: {enable_sms.value}")
        print(f"  - Dashboard: {enable_dashboard_alerts.value}")
    
    log_message("Alert configuration completed")
    update_progress(100)

def execute_spc_integration(data_source):
    """Execute SPC integration."""
    log_message("Integrating SPC for real-time monitoring...")
    update_progress(20)
    
    if SPC_AVAILABLE and spc_client:
        # Generate demo data for baseline
        historical_data = np.random.normal(100.0, 10.0, 100)
        baseline = spc_client.establish_baseline(historical_data)
        log_message("Baseline established")
        update_progress(50)
        
        # Process streaming data with SPC
        if data_source == 'demo':
            streaming_data = generate_demo_streaming_data(n_points=50)
            values = [d.get('temperature', 1000.0) - 900.0 for d in streaming_data]  # Normalize
            values_array = np.array(values)
            
            chart_result = spc_client.create_control_chart(values_array, chart_type='individual')
            log_message("SPC control chart generated")
            update_progress(80)
            
            with center_output:
                print("‚úÖ SPC integration completed!")
                print(f"\nSPC Results:")
                if hasattr(chart_result, 'center_line'):
                    print(f"  - Center line: {chart_result.center_line:.2f}")
                if hasattr(chart_result, 'upper_control_limit'):
                    print(f"  - UCL: {chart_result.upper_control_limit:.2f}")
                if hasattr(chart_result, 'lower_control_limit'):
                    print(f"  - LCL: {chart_result.lower_control_limit:.2f}")
                if hasattr(chart_result, 'out_of_control_points'):
                    print(f"  - Out-of-control points: {len(chart_result.out_of_control_points)}")
        else:
            with center_output:
                print("‚ö†Ô∏è SPC integration (demo mode)")
    else:
        with center_output:
            print("‚ö†Ô∏è SPC client not available - using demo mode")
    
    log_message("SPC integration completed")
    update_progress(100)

def execute_health_monitoring():
    """Execute health monitoring."""
    log_message("Starting health monitoring...")
    update_progress(20)
    
    if MONITORING_AVAILABLE and monitoring_client:
        if enable_system_health.value:
            monitoring_client.start_monitoring()
            log_message("Health monitoring started")
            time.sleep(2)  # Wait for initial checks
            
            health_statuses = monitoring_client.get_health_status()
            update_progress(60)
            
            with center_output:
                print("‚úÖ Health monitoring completed!")
                print("\nHealth Status:")
                for name, status in health_statuses.items():
                    print(f"  - {name}: {status.status} (score: {status.health_score:.2f})")
                    if hasattr(status, 'issues') and status.issues:
                        print(f"    Issues: {', '.join(status.issues)}")
            
            monitoring_client.stop_monitoring()
            log_message("Health monitoring stopped")
        else:
            with center_output:
                print("‚ö†Ô∏è System health monitoring disabled")
    else:
        with center_output:
            print("‚ö†Ô∏è Monitoring client not available - using demo mode")
            print("\nDemo Health Status:")
            print("  - System: healthy (score: 0.85)")
            print("  - Process: healthy (score: 0.90)")
    
    log_message("Health monitoring completed")
    update_progress(100)

def execute_complete_workflow(data_source):
    """Execute complete end-to-end workflow."""
    log_message("Starting complete workflow...")
    update_progress(5)
    
    # Setup
    execute_setup()
    update_progress(20)
    
    # Stream processing
    execute_stream_processing(data_source)
    update_progress(50)
    
    # Alert configuration
    execute_alert_configuration()
    update_progress(70)
    
    # SPC integration
    if enable_spc_monitoring.value:
        execute_spc_integration(data_source)
    update_progress(85)
    
    # Health monitoring
    if enable_system_health.value:
        execute_health_monitoring()
    update_progress(95)
    
    with center_output:
        print("\n‚úÖ Complete workflow executed successfully!")
        print("\nSummary:")
        print("  ‚úÖ Setup completed")
        print("  ‚úÖ Stream processing completed")
        print("  ‚úÖ Alert configuration completed")
        if enable_spc_monitoring.value:
            print("  ‚úÖ SPC integration completed")
        if enable_system_health.value:
            print("  ‚úÖ Health monitoring completed")
    
    log_message("Complete workflow executed successfully")
    update_progress(100)

# Attach button handlers
execute_button.on_click(execute_operation)
stop_button.on_click(stop_operation)

print("‚úÖ Execution handlers created")

‚úÖ Execution handlers created


In [7]:
# ============================================
# Assemble and Display Complete Interface
# ============================================

# Create main layout
main_layout = VBox([
    top_panel,
    HBox([
        left_panel,
        VBox([center_panel], layout=Layout(width='100%', flex='1 1 auto')),
        right_panel,
    ], layout=Layout(width='100%', height='650px')),
    bottom_panel,
], layout=Layout(width='100%', padding='10px'))

# Initialize logs
with logs_output:
    print("=" * 80)
    print("Real-time Process Monitoring and Control Interface")
    print("=" * 80)
    print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] ‚úÖ Interface initialized")
    print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] ‚ÑπÔ∏è Select an operation type and configure settings, then click 'Execute Operation'")
    print("=" * 80)

# Initialize status panels
with status_output:
    print("Status: Ready")
    print("Operation: None")
    print("Elapsed: 0s")

with metrics_output:
    print("No metrics yet")
    print("Execute an operation to see metrics")

with alerts_output:
    print("No active alerts")
    print("Configure alerts to see activity")

with health_output:
    print("Health status: Unknown")
    print("Execute health monitoring to see status")

# Display the interface
display(main_layout)

print("‚úÖ Complete interface assembled and displayed!")
print("\n" + "=" * 80)
print("Real-time Monitoring Interface Ready!")
print("=" * 80)
print("\nFeatures:")
print("  ‚úÖ Top Panel: Operation type selection (Setup, Streaming, Dashboard, Alerts, SPC, Health, Complete Workflow)")
print("  ‚úÖ Left Panel: Configuration accordion (Kafka, Streaming, Dashboard, Alerts, Thresholds, SPC, Health)")
print("  ‚úÖ Center Panel: Main output (plots, logs, dashboards)")
print("  ‚úÖ Right Panel: Status, metrics, alerts, health status")
print("  ‚úÖ Bottom Panel: Logs, progress, warnings")
print("\n" + "=" * 80)

VBox(children=(VBox(children=(HBox(children=(HTML(value='<b>Operation Type:</b>'), RadioButtons(description='T‚Ä¶

‚úÖ Complete interface assembled and displayed!

Real-time Monitoring Interface Ready!

Features:
  ‚úÖ Top Panel: Operation type selection (Setup, Streaming, Dashboard, Alerts, SPC, Health, Complete Workflow)
  ‚úÖ Left Panel: Configuration accordion (Kafka, Streaming, Dashboard, Alerts, Thresholds, SPC, Health)
  ‚úÖ Center Panel: Main output (plots, logs, dashboards)
  ‚úÖ Right Panel: Status, metrics, alerts, health status



## Usage Instructions

1. **Select Operation Type**: Choose the operation you want to perform from the top panel
   - **Real-time Setup**: Configure streaming, monitoring, and SPC settings
   - **Stream Processing**: Process streaming data with incremental updates
   - **Live Dashboard**: Set up real-time quality dashboards
   - **Alert Configuration**: Configure alert thresholds and notification channels
   - **Real-time SPC**: Integrate SPC for real-time process monitoring
   - **Health Monitoring**: Monitor system and process health
   - **Complete Workflow**: Execute end-to-end real-time monitoring workflow

2. **Configure Settings**: Use the configuration accordion in the left panel to adjust settings:
   - **Kafka**: Configure Kafka connection settings
   - **Streaming**: Set buffer size, batch size, and processing options
   - **Dashboard**: Configure WebSocket port and update intervals
   - **Alerts**: Set alert cooldown and notification channels
   - **Thresholds**: Configure metric thresholds for alerts
   - **SPC**: Set SPC control limits and subgroup sizes
   - **Health**: Configure health check intervals and components

3. **Execute Operation**: Click "Execute Operation" to run the selected operation
   - Monitor progress in the progress bar
   - View detailed logs in the logs panel
   - See results in the center output panel
   - Check status, metrics, alerts, and health in the right panel

4. **Stop Operation**: Click "Stop Operation" to halt the current operation

5. **Export Results**: Click "Export Results" to save results to file

## Tips

- Start with "Real-time Setup" to configure all components
- Use "Complete Workflow" to run all operations in sequence
- Monitor logs for detailed execution information
- Check status panels for real-time updates
- Adjust configuration settings as needed for your use case