In [1]:
# Install required packages
print("üîÑ Installing dashboard dependencies...")
print("This may take a minute or two...")
%pip install ipywidgets requests pandas plotly python-dotenv tabulate --quiet

print("‚úÖ Dependencies installed successfully!")
print("üéâ Ready for the next step!")

üîÑ Installing dashboard dependencies...
This may take a minute or two...

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.1.1[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython -m pip install --upgrade pip[0m

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.1.1[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython -m pip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.
‚úÖ Dependencies installed successfully!
üéâ Ready for the next step!
Note: you may need to restart the kernel to use updated packages.
‚úÖ Dependencies installed successfully!
üéâ Ready for the next step!


# üöÄ N8N Admin Dashboard

Welcome to your comprehensive N8N administration and monitoring dashboard! This interactive notebook provides powerful tools to manage, monitor, and troubleshoot your N8N automation workflows.

## üéØ What You Can Do

- **System Monitoring**: Check N8N health, database connectivity, and system status
- **Configuration Management**: Easily update connection settings and API keys
- **Workflow Analytics**: View execution metrics, success rates, and performance data
- **Error Diagnostics**: Monitor failed executions and troubleshoot issues
- **Webhook Testing**: Test your webhook endpoints directly from the dashboard
- **Workflow Control**: Activate/deactivate workflows and manage your automation pipeline

## üìã Quick Start Guide

1. **Configure Connection** (Required First Step)
   - Enter your N8N URL and API key in the configuration section below
   - Click "Update & Verify" to test the connection

2. **Explore Features**
   - Run cells in order to access different dashboard sections
   - Each section is self-contained with clear instructions

3. **Monitor & Manage**
   - Use the status checks to ensure everything is working
   - Review metrics and logs to optimize your workflows

## üîß Requirements

- ‚úÖ N8N instance running (yours is at: `https://n8n-clean-deploy.fly.dev`)
- ‚úÖ API key created in N8N settings
- ‚úÖ Internet connection for API calls

---
**Ready to get started? Run the next cell to install required packages!**

In [2]:
# Import required libraries
print("üîÑ Loading dashboard components...")
import os
import requests
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
import ipywidgets as widgets
from IPython.display import display, clear_output, HTML
from datetime import datetime
from dotenv import load_dotenv
import json
import time

print("‚úÖ Dashboard components loaded!")

# --- Configuration ---
# Load environment variables (from a local .env file)
load_dotenv()

# Force reload environment variables to avoid caching issues
os.environ.pop('N8N_API_KEY', None)
os.environ.pop('N8N_URL', None)
load_dotenv(override=True)

# N8N API Configuration
N8N_URL = os.getenv('N8N_URL', 'https://n8n-dispatch.fly.dev')  # Default to deployed instance
N8N_API_KEY = os.getenv('N8N_API_KEY', '')  # Will be set through the Configuration Manager

# Define headers for API calls
headers = {'X-N8N-API-KEY': N8N_API_KEY}

print("‚úÖ Configuration initialized!")
print(f"üîó Default N8N URL: {N8N_URL}")
print("üîë API Key: " + ("‚úÖ Set" if N8N_API_KEY else "‚ùå Not set (configure below)"))

üîÑ Loading dashboard components...


‚úÖ Dashboard components loaded!
‚úÖ Configuration initialized!
üîó Default N8N URL: https://n8n-dispatch.fly.dev
üîë API Key: ‚úÖ Set


In [3]:
# Progress tracking for user experience
progress_sections = {
    'configuration': False,
    'system_check': False,
    'concurrency': False,
    'metrics': False,
    'webhooks': False,
    'diagnostics': False
}

## üß≠ Quick Navigation

Jump to any section of the dashboard:

| Section | Description | Status |
|---------|-------------|--------|
| [‚öôÔ∏è Configuration](#configuration) | Set up N8N connection | ‚úÖ Complete |
| [üìä System Health](#system-health) | Check N8N & database status | ‚úÖ Ready |
| [‚ö° Concurrency Watchdog](#concurrency-watchdog) | Monitor workflow concurrency | ‚úÖ Ready |
| [üóÑÔ∏è Database Management](#database-management) | PostgreSQL migration tools | ‚úÖ Ready |
| [üìà Analytics](#analytics) | View workflow metrics | ‚úÖ Ready |
| [üîó Webhook Testing](#webhooks) | Test webhook endpoints | ‚úÖ Ready |
| [üö® Error Monitoring](#diagnostics) | Monitor failed executions | ‚úÖ Ready |
| [üéâ Summary](#summary) | Review & next steps | ‚úÖ Ready |

---
**üí° Pro Tip**: Complete sections in order for the best experience!

In [4]:
# Test API connectivity and configuration
print("üîç Testing API connectivity...")
try:
    api_response = requests.get(f"{N8N_URL}/api/v1/workflows?limit=1", headers=headers, timeout=10)
    if api_response.status_code == 200:
        workflow_count = len(api_response.json().get('data', []))
        print("‚úÖ API connection successful!")
        print(f"üìä Found {workflow_count} workflows in your instance")
        print("\nüéâ Configuration complete! You can now use all dashboard features.")
        print("üí° Try running the System Health Check next!")

        # Mark configuration as complete
        progress_sections['configuration'] = True
    else:
        print(f"‚ùå API connection failed with status code: {api_response.status_code}")
        print("üîß Please check your N8N_URL and N8N_API_KEY configuration")
except Exception as e:
    print(f"‚ùå Error testing API connection: {str(e)}")
    print("üîß Please verify your N8N instance is running and accessible")

üîç Testing API connectivity...


‚ùå Error testing API connection: HTTPSConnectionPool(host='n8n-dispatch.fly.dev', port=443): Max retries exceeded with url: /api/v1/workflows?limit=1 (Caused by SSLError(SSLEOFError(8, '[SSL: UNEXPECTED_EOF_WHILE_READING] EOF occurred in violation of protocol (_ssl.c:1000)')))
üîß Please verify your N8N instance is running and accessible


# ‚öôÔ∏è 1. Configuration Setup {#configuration}

**Status**: üîÑ Not Configured Yet

This is the **most important step** - you must configure your connection to N8N before using any other features.

## üöÄ Quick Actions

**After configuration, try these one-click actions:**

- **üîç Run System Check**: Verify everything is working
- **üìä View Dashboard**: See all metrics at once
- **üîÑ Refresh Data**: Update all information

---
**Ready? Configure your connection below and start exploring!**

In [5]:
def check_n8n_status():
    """Check if n8n is accessible and running"""
    try:
        response = requests.get(f"{N8N_URL}/healthz", timeout=5)
        if response.status_code == 200:
            return True, "N8N is running normally"
        return False, f"N8N returned status code: {response.status_code}"
    except requests.exceptions.RequestException as e:
        return False, f"Error connecting to N8N: {str(e)}"

def check_database():
    """Check database connection via API key access"""
    if not N8N_API_KEY:
        return False, "API Key is not set."

    try:
        # Try workflows endpoint first
        response = requests.get(f"{N8N_URL}/api/v1/workflows?limit=1",
                                headers={'X-N8N-API-KEY': N8N_API_KEY}, timeout=5)

        if response.status_code == 200:
            return True, "Database connection is working (Workflows accessible)"
        elif response.status_code == 401:
            # API key exists but may have limited permissions
            # Try executions endpoint as alternative
            exec_response = requests.get(f"{N8N_URL}/api/v1/executions?limit=1",
                                       headers={'X-N8N-API-KEY': N8N_API_KEY}, timeout=5)
            if exec_response.status_code == 200:
                return True, "API key valid (Executions accessible, limited workflow permissions)"
            else:
                return False, f"API Key Unauthorized (401) - Check permissions in n8n settings"
        elif response.status_code == 403:
            return False, "API Key Forbidden (403) - Insufficient permissions"
        else:
            return False, f"API check failed: {response.status_code}"
    except requests.exceptions.RequestException as e:
        return False, f"Error checking database: {str(e)}"

def display_system_status():
    """Display system status in a formatted way"""
    print("üîç Running system health checks...")
    n8n_status, n8n_msg = check_n8n_status()
    db_status, db_msg = check_database()

    status_df = pd.DataFrame({
        'Component': ['N8N Service', 'Persistence (DB)'],
        'Status': ['üü¢ Online' if n8n_status else 'üî¥ Offline',
                   'üü¢ Connected' if db_status else 'üî¥ Disconnected'],
        'Message': [n8n_msg, db_msg]
    })

    display(status_df)

    # Update progress
    global progress_sections
    progress_sections['system_check'] = n8n_status and db_status

    if n8n_status and db_status:
        print("\n‚úÖ System health check completed successfully!")
        print("üí° Your N8N instance and database are both healthy.")
    else:
        print("\n‚ö†Ô∏è Some systems need attention - check the status table above.")

    return status_df

# System status functions are now ready
# Run display_system_status() manually after configuring your API key above
print("‚úÖ System health check functions loaded!")
print("üí° Run display_system_status() after setting your API key in the configuration section above.")

‚úÖ System health check functions loaded!
üí° Run display_system_status() after setting your API key in the configuration section above.


# üìä 2. System Health Check {#system-health}

**Status**: üîÑ Run this after configuration

In [6]:
# Direct API key configuration
import requests
import pandas as pd
from IPython.display import display

# Progress tracking for user experience
progress_sections = {
    'configuration': False,
    'system_check': False,
    'concurrency': False,
    'metrics': False,
    'webhooks': False,
    'diagnostics': False
}

N8N_URL = "https://n8n-dispatch.fly.dev"
N8N_API_KEY = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiI3Y2M1OGNlMC00NmMxLTQ2OTQtYjI3OC1jMjI5MTdjYWViY2UiLCJpc3MiOiJuOG4iLCJhdWQiOiJwdWJsaWMtYXBpIiwiaWF0IjoxNzYwNTUwMTAxfQ.NiADusdZUVI_1fp2LcEUAt8JAA2bUTpqgcxPOx0oPNg"
headers = {'X-N8N-API-KEY': N8N_API_KEY}

print("‚úÖ API key configured successfully!")
print(f"üîó N8N URL: {N8N_URL}")
print("üîë API Key: ‚úÖ Set and ready")
print("\nüîç Running system health check...")

# Define health check functions locally
def check_n8n_status():
    """Check if n8n is accessible and running"""
    try:
        response = requests.get(f"{N8N_URL}/healthz", timeout=5)
        if response.status_code == 200:
            return True, "N8N is running normally"
        return False, f"N8N returned status code: {response.status_code}"
    except requests.exceptions.RequestException as e:
        return False, f"Error connecting to N8N: {str(e)}"

def check_database():
    """Check database connection via API key access"""
    if not N8N_API_KEY:
        return False, "API Key is not set."

    try:
        # Try workflows endpoint first
        response = requests.get(f"{N8N_URL}/api/v1/workflows?limit=1",
                                headers={'X-N8N-API-KEY': N8N_API_KEY}, timeout=5)

        if response.status_code == 200:
            return True, "Database connection is working (Workflows accessible)"
        elif response.status_code == 401:
            # API key exists but may have limited permissions
            # Try executions endpoint as alternative
            exec_response = requests.get(f"{N8N_URL}/api/v1/executions?limit=1",
                                       headers={'X-N8N-API-KEY': N8N_API_KEY}, timeout=5)
            if exec_response.status_code == 200:
                return True, "API key valid (Executions accessible, limited workflow permissions)"
            else:
                # Try a simpler endpoint that might work with basic permissions
                health_response = requests.get(f"{N8N_URL}/healthz", timeout=5)
                if health_response.status_code == 200:
                    return True, "Basic connectivity confirmed (API key has limited permissions)"
                else:
                    return False, f"API Key Unauthorized (401) - Check permissions in n8n settings"
        elif response.status_code == 403:
            return False, "API Key Forbidden (403) - Insufficient permissions"
        else:
            return False, f"API check failed: {response.status_code}"
    except requests.exceptions.RequestException as e:
        return False, f"Error checking database: {str(e)}"

# Run system status check inline
n8n_status, n8n_msg = check_n8n_status()
db_status, db_msg = check_database()

status_df = pd.DataFrame({
    'Component': ['N8N Service', 'Persistence (DB)'],
    'Status': ['üü¢ Online' if n8n_status else 'üî¥ Offline',
               'üü¢ Connected' if db_status else 'üî¥ Disconnected'],
    'Message': [n8n_msg, db_msg]
})

display(status_df)

# Update progress
progress_sections['system_check'] = n8n_status and db_status

if n8n_status and db_status:
    print("\n‚úÖ System health check completed successfully!")
    print("üí° Your N8N instance and database are both healthy.")
else:
    print("\n‚ö†Ô∏è Some systems need attention - check the status table above.")

‚úÖ API key configured successfully!
üîó N8N URL: https://n8n-dispatch.fly.dev
üîë API Key: ‚úÖ Set and ready

üîç Running system health check...


Unnamed: 0,Component,Status,Message
0,N8N Service,üî¥ Offline,Error connecting to N8N: HTTPSConnectionPool(h...
1,Persistence (DB),üî¥ Disconnected,Error checking database: HTTPSConnectionPool(h...



‚ö†Ô∏è Some systems need attention - check the status table above.


# üìà 3. Workflow Analytics & Metrics {#analytics}

**Status**: ? Run after configuration

In [7]:
def get_execution_metrics():
    """Fetch and analyze workflow execution metrics"""
    if not N8N_API_KEY:
        print("‚ùå API Key required for metrics.")
        return pd.DataFrame()
        
    try:
        response = requests.get(
            f"{N8N_URL}/api/v1/executions",
            headers={'X-N8N-API-KEY': N8N_API_KEY},
            params={'limit': 100} 
        )
        if response.status_code != 200:
            print(f"‚ùå Failed to fetch executions: Status {response.status_code}")
            return pd.DataFrame()
        
        executions = response.json().get('data', [])
        execution_data = []
        
        for exec in executions:
            execution_data.append({
                'Workflow': exec.get('workflowName', 'N/A'),
                'Status': exec.get('status'),
                'Duration_s': exec.get('duration', 0) / 1000,  # Convert ms to seconds
                'Timestamp': datetime.fromisoformat(exec.get('startedAt').replace('Z', '+00:00')).strftime('%Y-%m-%d %H:%M')
            })
        
        return pd.DataFrame(execution_data)
    except Exception as e:
        print(f"‚ùå Error fetching metrics: {str(e)}")
        return pd.DataFrame()

def plot_execution_metrics():
    """Create visualizations for execution metrics"""
    df = get_execution_metrics()
    if df.empty:
        return
    
    # Calculate Success/Failure Counts
    status_counts = df.groupby('Status').size().reset_index(name='Count')
    fig1 = px.pie(
        status_counts,
        values='Count',
        names='Status',
        title='Last 100 Executions Status Breakdown',
        color_discrete_map={'success':'green', 'failed':'red', 'running':'blue'}
    )
    
    # Average duration by workflow
    fig2 = px.bar(
        df.groupby('Workflow')['Duration_s'].mean().reset_index(),
        x='Workflow',
        y='Duration_s',
        title='Average Execution Duration (seconds)',
        labels={'Duration_s': 'Average Duration (s)'}
    )
    
    fig1.show()
    fig2.show()

# Display metrics
plot_execution_metrics()

‚ùå Error fetching metrics: HTTPSConnectionPool(host='n8n-dispatch.fly.dev', port=443): Max retries exceeded with url: /api/v1/executions?limit=100 (Caused by SSLError(SSLEOFError(8, '[SSL: UNEXPECTED_EOF_WHILE_READING] EOF occurred in violation of protocol (_ssl.c:1000)')))


# üîó 4. Webhook Testing Tools {#webhooks}

**Status**: ? Run after configuration

In [8]:
def test_webhook(webhook_url, payload=None):
    """Test a webhook endpoint"""
    try:
        payload = payload or {"test": True, "timestamp": datetime.now().isoformat()}
        response = requests.post(webhook_url, json=payload, timeout=10)
        return {
            'Status Code': response.status_code,
            'Response': response.text,
            'Headers': dict(response.headers)
        }
    except Exception as e:
        return {'Error': str(e)}

webhook_url_input = widgets.Text(
    description='Webhook URL:',
    placeholder='e.g., https://n8n.app/webhook-test/...',
    style={'description_width': 'initial'},
    layout={'width': '80%'}
)

payload_input = widgets.Textarea(
    description='Payload (JSON):',
    style={'description_width': 'initial'},
    layout={'width': '80%', 'height': '150px'},
    value=json.dumps({"data": "test_payload"}, indent=2) 
)

test_output = widgets.Output()

def test_webhook_button_clicked(button):
    with test_output:
        clear_output(wait=True)
        try:
            payload = json.loads(payload_input.value)
        except:
            print("‚ùå Invalid JSON payload")
            return
        
        print("üîÑ Testing webhook...")
        result = test_webhook(webhook_url_input.value, payload)
        print("\nResults:")
        print(json.dumps(result, indent=2))

test_button = widgets.Button(
    description='Test Webhook',
    button_style='primary'
)
test_button.on_click(test_webhook_button_clicked)

# Display testing interface
display(webhook_url_input, payload_input, test_button, test_output)

Text(value='', description='Webhook URL:', layout=Layout(width='80%'), placeholder='e.g., https://n8n.app/webh‚Ä¶

Textarea(value='{\n  "data": "test_payload"\n}', description='Payload (JSON):', layout=Layout(height='150px', ‚Ä¶

Button(button_style='primary', description='Test Webhook', style=ButtonStyle())

Output()

# üö® 5. Error Monitoring & Diagnostics {#diagnostics}

**Status**:  Run after configuration

In [9]:
import requests
import pandas as pd
from dotenv import load_dotenv
import os
import ipywidgets as widgets
from IPython.display import display, clear_output
from datetime import datetime

load_dotenv()

N8N_URL = os.getenv('N8N_URL')
N8N_API_KEY = os.getenv('N8N_API_KEY')

def get_failed_executions():
    """Fetch recent executions and filter for 'failed' status."""
    if not N8N_API_KEY:
        print("‚ùå API Key required for diagnostics.")
        return pd.DataFrame(columns=['ID', 'Workflow', 'Status', 'Started At'])
        
    try:
        # Fetch up to 100 recent executions
        response = requests.get(
            f"{N8N_URL}/api/v1/executions",
            headers={'X-N8N-API-KEY': N8N_API_KEY},
            params={'limit': 100, 'sort': 'id', 'sortOrder': 'DESC'}
        )
        response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
        
        executions = response.json().get('data', [])
        
        failed_data = []
        for exec in executions:
            if exec.get('status') == 'failed':
                failed_data.append({
                    'ID': exec.get('id'),
                    'Workflow': exec.get('workflowName', 'N/A'),
                    'Status': 'üî¥ FAILED',
                    'Started At': datetime.fromisoformat(exec.get('startedAt').replace('Z', '+00:00')).strftime('%Y-%m-%d %H:%M:%S'),
                    'Error Message': exec.get('error', {}).get('message', 'No error message available')
                })
        
        df = pd.DataFrame(failed_data)
        if df.empty:
            print("‚úÖ No failed executions found in the last 100 runs.")
            return pd.DataFrame()
        
        return df[['ID', 'Workflow', 'Status', 'Started At', 'Error Message']]
        
    except requests.exceptions.RequestException as e:
        print(f"‚ùå Connection Error: {str(e)}")
    except Exception as e:
        print(f"‚ùå Diagnostic Error: {str(e)}")
        
    return pd.DataFrame()

# Create an output widget for failed executions
failed_executions_output = widgets.Output()

def refresh_failed_executions(button=None):
    with failed_executions_output:
        clear_output(wait=True)
        print("üîÑ Fetching failed executions...")
        df = get_failed_executions()
        if not df.empty:
            display(df)

refresh_button = widgets.Button(
    description='Refresh Failed Executions',
    button_style='warning'
)
refresh_button.on_click(refresh_failed_executions)

# Display the refresh button and output area
display(refresh_button)
display(failed_executions_output)

# Initial load of failed executions
refresh_failed_executions()



Output()

# N8N Admin Dashboard

This dashboard provides comprehensive monitoring and management capabilities for your n8n instance. It includes:

1. System Status Monitoring
2. Configuration Management
3. API Testing
4. User Management
5. Workflow Status

## Setup and Requirements

First, let's install the required packages:

In [10]:
%pip install requests pandas plotly flask-login sqlalchemy python-dotenv


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.1.1[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython -m pip install --upgrade pip[0m

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.1.1[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython -m pip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


In [11]:
# Import required libraries
import os
import requests
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from datetime import datetime
from dotenv import load_dotenv
import json

# Load environment variables
load_dotenv()

# N8N API Configuration
N8N_URL = os.getenv('N8N_URL', 'https://dispatch-pipeline.fly.dev')
N8N_API_KEY = os.getenv('N8N_API_KEY', '')  # Will be filled through the dashboard

## 1. System Status Check

This section helps you monitor the health of your n8n instance and check various system components.

In [1]:
import requests
import pandas as pd
from dotenv import load_dotenv
import os

load_dotenv()

N8N_URL = os.getenv('N8N_URL')
N8N_API_KEY = os.getenv('N8N_API_KEY')

def check_n8n_status():
    """Check if n8n is accessible and running"""
    try:
        response = requests.get(f"{N8N_URL}/healthz")
        if response.status_code == 200:
            return True, "N8N is running normally"
        return False, f"N8N returned status code: {response.status_code}"
    except requests.exceptions.RequestException as e:
        return False, f"Error connecting to N8N: {str(e)}"

def check_database():
    """Check database connection"""
    try:
        response = requests.get(f"{N8N_URL}/api/v1/executions",
                              headers={'X-N8N-API-KEY': N8N_API_KEY})
        if response.status_code == 200:
            return True, "Database connection is working"
        return False, "Database connection failed"
    except requests.exceptions.RequestException as e:
        return False, f"Error checking database: {str(e)}"

def check_concurrency():
    """Check current workflow concurrency and warn if approaching SQLite limits"""
    try:
        response = requests.get(f"{N8N_URL}/api/v1/executions?status=running&limit=100",
                              headers={'X-N8N-API-KEY': N8N_API_KEY})
        if response.status_code == 200:
            executions = response.json()
            running_count = len(executions)

            # SQLite concurrency warning thresholds
            if running_count >= 8:
                status = "üö® CRITICAL"
                message = f"{running_count} concurrent workflows - APPROACHING SQLITE LIMIT!"
            elif running_count >= 5:
                status = "‚ö†Ô∏è WARNING"
                message = f"{running_count} concurrent workflows - Monitor closely"
            else:
                status = "‚úÖ NORMAL"
                message = f"{running_count} concurrent workflows - Safe range"

            return running_count, status, message
        return 0, "‚ùå ERROR", "Failed to fetch execution data"
    except requests.exceptions.RequestException as e:
        return 0, "‚ùå ERROR", f"Connection failed: {str(e)}"

def display_system_status():
    """Display system status in a formatted way"""
    n8n_status, n8n_msg = check_n8n_status()
    db_status, db_msg = check_database()
    concurrency_count, concurrency_status, concurrency_msg = check_concurrency()

    status_df = pd.DataFrame({
        'Component': ['N8N Service', 'Database', 'Concurrency'],
        'Status': ['üü¢ Online' if n8n_status else 'üî¥ Offline',
                  'üü¢ Connected' if db_status else 'üî¥ Disconnected',
                  concurrency_status],
        'Message': [n8n_msg, db_msg, f"{concurrency_count} running workflows - {concurrency_msg}"]
    })

    return status_df

# Display current system status
display_system_status()

ModuleNotFoundError: No module named 'dotenv'

## 2. Configuration Management

This section allows you to view and update n8n configuration settings.

## 3. Concurrency Watchdog ‚ö°

**Critical Monitoring:** Tracks concurrent workflow executions to trigger PostgreSQL migration planning.

**Operational Mandate:**
- **üü° YELLOW (5-7 concurrent):** Migration plan activated. Schedule downtime within 7 days.
- **üî¥ RED (8+ concurrent):** CRITICAL - Execute immediate migration to `n8n-db-pristine` cluster.

**SQLite Limits:** Maximum 10-15 concurrent writers before SQLITE_BUSY errors occur.

In [13]:
def display_concurrency_monitor():
    """Display detailed concurrency monitoring with warnings"""
    try:
        response = requests.get(f"{N8N_URL}/api/v1/executions?status=running&limit=100",
                              headers={'X-N8N-API-KEY': N8N_API_KEY})

        if response.status_code == 200:
            executions = response.json()
            running_count = len(executions)

            # Create monitoring display with migration triggers
            if running_count >= 8:
                alert_color = "#dc3545"  # Red
                alert_icon = "üö®"
                alert_title = "CRITICAL: EXECUTE POSTGRESQL MIGRATION"
                action_required = "üî¥ RED CONDITION: Halt new workflows and begin immediate migration to n8n-db-pristine cluster"
            elif running_count >= 5:
                alert_color = "#ffc107"  # Yellow
                alert_icon = "‚ö†Ô∏è"
                alert_title = "WARNING: SCHEDULE POSTGRESQL MIGRATION"
                action_required = "üü° YELLOW CONDITION: Migration plan activated. Schedule downtime within 7 days."
            else:
                alert_color = "#28a745"  # Green
                alert_icon = "‚úÖ"
                alert_title = "NORMAL: Safe Concurrency"
                action_required = "Monitor concurrency levels"

            html_output = f"""
            <div style="background: {alert_color}; color: white; padding: 15px; border-radius: 8px; margin: 10px 0;">
                <h4 style="margin-top: 0;">{alert_icon} {alert_title}</h4>
                <p><strong>Current Concurrent Workflows:</strong> {running_count}</p>
                <p><strong>SQLite Write Limit:</strong> 10-15 concurrent operations</p>
                <p><strong>Action Required:</strong> {action_required}</p>
                {'<p style="color: yellow; font-weight: bold;">üö® CRITICAL: EXECUTE IMMEDIATE MIGRATION TO POSTGRESQL</p>' if running_count >= 8 else '<p style="color: black; font-weight: bold;">‚ö†Ô∏è SCHEDULE MIGRATION WITHIN 7 DAYS</p>' if running_count >= 5 else '<p style="color: white;">‚úÖ System operating within safe limits</p>'}
            </div>
            """

            display(HTML(html_output))

            # Show running workflows if any
            if running_count > 0:
                workflow_names = [exec.get('workflowName', 'Unknown') for exec in executions[:10]]  # Show first 10
                print(f"\nüìã Currently Running Workflows ({running_count} total):")
                for i, name in enumerate(workflow_names, 1):
                    print(f"  {i}. {name}")
                if running_count > 10:
                    print(f"  ... and {running_count - 10} more")
            else:
                print("\nüìã No workflows currently running")

        else:
            display(HTML('<div style="background: #dc3545; color: white; padding: 15px; border-radius: 8px; margin: 10px 0;"><h4>‚ùå Monitoring Error</h4><p>Failed to fetch concurrency data</p></div>'))

    except Exception as e:
        display(HTML(f'<div style="background: #dc3545; color: white; padding: 15px; border-radius: 8px; margin: 10px 0;"><h4>‚ùå Connection Error</h4><p>{str(e)}</p></div>'))

# Display concurrency monitor
display_concurrency_monitor()

## 4. System Metrics

View system performance metrics and execution statistics.

In [14]:
def get_execution_metrics():
    """Fetch and analyze workflow execution metrics"""
    try:
        response = requests.get(
            f"{N8N_URL}/api/v1/executions",
            headers={'X-N8N-API-KEY': N8N_API_KEY},
            params={'limit': 100}  # Last 100 executions
        )
        if response.status_code != 200:
            return pd.DataFrame()
        
        executions = response.json()
        execution_data = []
        
        for exec in executions:
            execution_data.append({
                'Workflow': exec.get('workflowName'),
                'Status': exec.get('status'),
                'Duration': exec.get('duration', 0),
                'Timestamp': datetime.fromisoformat(exec.get('startedAt')).strftime('%Y-%m-%d %H:%M')
            })
        
        return pd.DataFrame(execution_data)
    except:
        return pd.DataFrame()

def plot_execution_metrics():
    """Create visualizations for execution metrics"""
    df = get_execution_metrics()
    if df.empty:
        print("No execution data available")
        return
    
    # Success rate by workflow
    fig1 = px.pie(
        df.groupby('Workflow')['Status'].apply(lambda x: (x == 'success').mean()),
        values=0,
        names=df['Workflow'].unique(),
        title='Workflow Success Rate'
    )
    
    # Average duration by workflow
    fig2 = px.bar(
        df.groupby('Workflow')['Duration'].mean().reset_index(),
        x='Workflow',
        y='Duration',
        title='Average Execution Duration by Workflow'
    )
    
    fig1.show()
    fig2.show()

# Display metrics
plot_execution_metrics()

No execution data available


## 5. Testing Tools

Test your n8n workflows and endpoints.

In [15]:
def test_webhook(webhook_url, payload=None):
    """Test a webhook endpoint"""
    try:
        payload = payload or {"test": True, "timestamp": datetime.now().isoformat()}
        response = requests.post(webhook_url, json=payload)
        return {
            'Status Code': response.status_code,
            'Response': response.text,
            'Headers': dict(response.headers)
        }
    except Exception as e:
        return {'Error': str(e)}

# Create webhook testing interface
webhook_url_input = widgets.Text(
    description='Webhook URL:',
    style={'description_width': 'initial'},
    layout={'width': '50%'}
)

payload_input = widgets.Textarea(
    description='Payload (JSON):',
    style={'description_width': 'initial'},
    layout={'width': '50%'},
    value='{}'
)

def test_webhook_button_clicked(button):
    try:
        payload = json.loads(payload_input.value)
    except:
        print("‚ùå Invalid JSON payload")
        return
    
    clear_output(wait=True)
    print("üîÑ Testing webhook...")
    result = test_webhook(webhook_url_input.value, payload)
    print("\nResults:")
    print(json.dumps(result, indent=2))

test_button = widgets.Button(
    description='Test Webhook',
    button_style='primary'
)
test_button.on_click(test_webhook_button_clicked)

# Display testing interface
display(webhook_url_input)
display(payload_input)
display(test_button)

Text(value='', description='Webhook URL:', layout=Layout(width='50%'), style=TextStyle(description_width='init‚Ä¶

Textarea(value='{}', description='Payload (JSON):', layout=Layout(width='50%'), style=TextStyle(description_wi‚Ä¶

Button(button_style='primary', description='Test Webhook', style=ButtonStyle())

In [16]:
def get_workflows():
    """Fetch all workflows from n8n"""
    try:
        response = requests.get(
            f"{N8N_URL}/api/v1/workflows",
            headers={'X-N8N-API-KEY': N8N_API_KEY}
        )
        if response.status_code == 200:
            return response.json().get('data', [])
        return []
    except:
        return []

def display_workflows():
    """Display workflows in a formatted table"""
    workflows = get_workflows()
    if not workflows:
        return pd.DataFrame(columns=['ID', 'Name', 'Status', 'Last Updated'])
    
    workflow_data = []
    for wf in workflows:
        workflow_data.append({
            'ID': wf.get('id'),
            'Name': wf.get('name'),
            'Status': 'üü¢ Active' if wf.get('active') else '‚ö´ Inactive',
            'Last Updated': datetime.fromisoformat(wf.get('updatedAt')).strftime('%Y-%m-%d %H:%M')
        })
    
    return pd.DataFrame(workflow_data)

# Display workflows
display_workflows()

Unnamed: 0,ID,Name,Status,Last Updated


## 3. Workflow Management

Monitor and manage your n8n workflows.

In [1]:
import ipywidgets as widgets
from IPython.display import display, clear_output

# Create input widgets for configuration
n8n_url_input = widgets.Text(
    value=N8N_URL,
    description='N8N URL:',
    style={'description_width': 'initial'}
)

api_key_input = widgets.Password(
    value=N8N_API_KEY,
    description='API Key:',
    style={'description_width': 'initial'}
)

def update_config(button):
    global N8N_URL, N8N_API_KEY
    N8N_URL = n8n_url_input.value
    N8N_API_KEY = api_key_input.value
    clear_output()
    print("‚úÖ Configuration updated successfully!")
    display_system_status()

update_button = widgets.Button(
    description='Update Configuration',
    button_style='primary'
)
update_button.on_click(update_config)

# Display configuration widgets
display(n8n_url_input)
display(api_key_input)
display(update_button)

ModuleNotFoundError: No module named 'ipywidgets'

## 2. System Health Check

Monitor system resources and container health:

In [18]:
# Create authentication widgets
n8n_url = widgets.Text(
    value='https://dispatch-pipeline.fly.dev',
    description='N8N URL:',
    style={'description_width': 'initial'},
    layout={'width': '50%'}
)

username = widgets.Text(
    value='admin',
    description='Username:',
    style={'description_width': 'initial'},
    layout={'width': '50%'}
)

password = widgets.Password(
    description='Password:',
    style={'description_width': 'initial'},
    layout={'width': '50%'}
)

test_auth_button = widgets.Button(
    description='Test Connection',
    button_style='info'
)

auth_output = widgets.Output()

def test_auth(b):
    with auth_output:
        clear_output()
        try:
            response = requests.get(
                f"{n8n_url.value}/healthz",
                auth=(username.value, password.value),
                timeout=5
            )
            if response.status_code == 200:
                print("‚úÖ Connection successful!")
            else:
                print(f"‚ùå Connection failed! Status code: {response.status_code}")
        except Exception as e:
            print(f"‚ùå Error connecting to n8n: {str(e)}")

test_auth_button.on_click(test_auth)

# Display widgets
display(n8n_url, username, password, test_auth_button, auth_output)

Text(value='https://dispatch-pipeline.fly.dev', description='N8N URL:', layout=Layout(width='50%'), style=Text‚Ä¶

Text(value='admin', description='Username:', layout=Layout(width='50%'), style=TextStyle(description_width='in‚Ä¶

Password(description='Password:', layout=Layout(width='50%'), style=TextStyle(description_width='initial'))

Button(button_style='info', description='Test Connection', style=ButtonStyle())

Output()

## 1. Authentication and Connection Settings

Enter your n8n credentials and connection details:

In [19]:
# Import required libraries
import ipywidgets as widgets
from IPython.display import display, clear_output
import requests
import json
import psutil
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from datetime import datetime
import os
import re

In [20]:
# Install required packages
%pip install ipywidgets requests psutil pandas plotly pymongo


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.1.1[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython -m pip install --upgrade pip[0m

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.1.1[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython -m pip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


# N8N Admin Dashboard

This notebook provides a comprehensive admin dashboard for monitoring and managing your n8n deployment. Use the sections below to:
- Check system health
- Validate credentials and connections
- Monitor API endpoints
- Analyze logs
- Track user activity

First, let's install the required packages:

## Workflow Control Panel

This section provides direct control over N8N workflows:
- Lists all active workflows
- Allows selection of specific workflows
- Enables immediate activation/deactivation of workflows
- Shows current status of selected workflow

‚ö†Ô∏è **Important**: Use these controls with caution as they directly affect production workflows.

In [21]:
def get_all_workflows():
    """Fetch all workflows from N8N API"""
    try:
        response = requests.get(f"{n8n_url}/api/v1/workflows", headers=headers)
        response.raise_for_status()
        return response.json()
    except requests.exceptions.RequestException as e:
        print(f"Error fetching workflows: {str(e)}")
        return []

def update_workflow_status(workflow_id, active):
    """Update the active status of a workflow"""
    try:
        endpoint = f"{n8n_url}/api/v1/workflows/{workflow_id}/activate" if active else f"{n8n_url}/api/v1/workflows/{workflow_id}/deactivate"
        response = requests.post(endpoint, headers=headers)
        response.raise_for_status()
        return True
    except requests.exceptions.RequestException as e:
        print(f"Error updating workflow status: {str(e)}")
        return False

# Create widgets for workflow control
workflow_dropdown = widgets.Dropdown(
    description='Workflow:',
    style={'description_width': 'initial'},
    layout=widgets.Layout(width='50%')
)

status_label = widgets.HTML(
    value='<span style="color: gray;">No workflow selected</span>'
)

toggle_button = widgets.ToggleButton(
    value=False,
    description='Inactive',
    button_style='danger',
    tooltip='Toggle workflow active status',
    icon='power-off',
    layout=widgets.Layout(width='150px')
)

def update_workflow_list(*_):
    """Update the workflow dropdown with current workflows"""
    workflows = get_all_workflows()
    workflow_dropdown.options = [(f"{w['name']} (ID: {w['id']})", w['id']) for w in workflows]
    workflow_dropdown.value = None
    toggle_button.value = False
    status_label.value = '<span style="color: gray;">No workflow selected</span>'

def on_workflow_select(change):
    """Handle workflow selection"""
    if change['new'] is None:
        toggle_button.value = False
        status_label.value = '<span style="color: gray;">No workflow selected</span>'
        return
    
    workflows = get_all_workflows()
    selected = next((w for w in workflows if w['id'] == change['new']), None)
    if selected:
        toggle_button.value = selected.get('active', False)
        toggle_button.description = 'Active' if toggle_button.value else 'Inactive'
        toggle_button.button_style = 'success' if toggle_button.value else 'danger'
        status = 'active' if toggle_button.value else 'inactive'
        status_label.value = f'<span style="color: {"green" if toggle_button.value else "red"}">Status: {status}</span>'

def on_toggle(change):
    """Handle workflow activation/deactivation"""
    if workflow_dropdown.value is None:
        return
    
    success = update_workflow_status(workflow_dropdown.value, change['new'])
    if success:
        toggle_button.description = 'Active' if change['new'] else 'Inactive'
        toggle_button.button_style = 'success' if change['new'] else 'danger'
        status = 'active' if change['new'] else 'inactive'
        status_label.value = f'<span style="color: {"green" if change["new"] else "red"}">Status: {status}</span>'
    else:
        # Revert the toggle if update failed
        toggle_button.value = not change['new']

# Wire up the event handlers
workflow_dropdown.observe(on_workflow_select, names='value')
toggle_button.observe(on_toggle, names='value')

# Create refresh button
refresh_button = widgets.Button(
    description='Refresh List',
    icon='sync',
    tooltip='Refresh workflow list',
    layout=widgets.Layout(width='150px')
)
refresh_button.on_click(update_workflow_list)

# Create the control panel layout
controls = widgets.VBox([
    widgets.HBox([workflow_dropdown, refresh_button], layout=widgets.Layout(align_items='center')),
    widgets.HBox([toggle_button, status_label], layout=widgets.Layout(align_items='center', padding='10px 0'))
])

# Initial population of workflow list
update_workflow_list()

# Display the control panel
display(controls)

Error fetching workflows: No connection adapters were found for "Text(value='https://dispatch-pipeline.fly.dev', description='N8N URL:', layout=Layout(width='50%'), style=TextStyle(description_width='initial'))/api/v1/workflows"


VBox(children=(HBox(children=(Dropdown(description='Workflow:', layout=Layout(width='50%'), options=(), style=‚Ä¶

## System Status Verification

Execute the following cell to verify the N8N deployment status and API connectivity:

In [22]:
def check_n8n_status():
    """Check if n8n is accessible and running"""
    try:
        response = requests.get(f"{N8N_URL}/healthz", timeout=5)
        if response.status_code == 200:
            return True, "N8N is running normally"
        return False, f"N8N returned status code: {response.status_code}"
    except requests.exceptions.RequestException as e:
        return False, f"Error connecting to N8N: {str(e)}"

def check_database():
    """Check database connection via API key access"""
    if not N8N_API_KEY:
        return False, "API Key is not set."

    try:
        # Try workflows endpoint first
        response = requests.get(f"{N8N_URL}/api/v1/workflows?limit=1",
                                headers={'X-N8N-API-KEY': N8N_API_KEY}, timeout=5)

        if response.status_code == 200:
            return True, "Database connection is working (Workflows accessible)"
        elif response.status_code == 401:
            # API key exists but may have limited permissions
            # Try executions endpoint as alternative
            exec_response = requests.get(f"{N8N_URL}/api/v1/executions?limit=1",
                                       headers={'X-N8N-API-KEY': N8N_API_KEY}, timeout=5)
            if exec_response.status_code == 200:
                return True, "API key valid (Executions accessible, limited workflow permissions)"
            else:
                return False, f"API Key Unauthorized (401) - Check permissions in n8n settings"
        elif response.status_code == 403:
            return False, "API Key Forbidden (403) - Insufficient permissions"
        else:
            return False, f"API check failed: {response.status_code}"
    except requests.exceptions.RequestException as e:
        return False, f"Error checking database: {str(e)}"

def display_system_status():
    """Display system status in a formatted way"""
    n8n_status, n8n_msg = check_n8n_status()
    db_status, db_msg = check_database()
    
    status_df = pd.DataFrame({
        'Component': ['N8N Service', 'Persistence (DB)'],
        'Status': ['üü¢ Online' if n8n_status else 'üî¥ Offline',
                   'üü¢ Connected' if db_status else 'üî¥ Disconnected'],
        'Message': [n8n_msg, db_msg]
    })
    
    display(status_df)
    return status_df

# Run initial status check
display_system_status()

Unnamed: 0,Component,Status,Message
0,N8N Service,üî¥ Offline,Error connecting to N8N: HTTPSConnectionPool(h...
1,Persistence (DB),üî¥ Disconnected,Error checking database: HTTPSConnectionPool(h...


Unnamed: 0,Component,Status,Message
0,N8N Service,üî¥ Offline,Error connecting to N8N: HTTPSConnectionPool(h...
1,Persistence (DB),üî¥ Disconnected,Error checking database: HTTPSConnectionPool(h...


## Configuration Manager

Update your n8n configuration settings:

In [23]:
# Create input widgets for configuration
n8n_url_input = widgets.Text(
    value=N8N_URL,
    description='N8N URL:',
    style={'description_width': 'initial'},
    layout={'width': '80%'}
)

api_key_input = widgets.Password(
    value=N8N_API_KEY,
    description='API Key:',
    style={'description_width': 'initial'},
    layout={'width': '80%'}
)

config_output = widgets.Output()

def update_config(button):
    global N8N_URL, N8N_API_KEY
    with config_output:
        clear_output(wait=True)
        N8N_URL = n8n_url_input.value
        N8N_API_KEY = api_key_input.value
        print("‚úÖ Configuration updated successfully!")
        print("\nVerifying connection...")
        display_system_status()

update_button = widgets.Button(
    description='Update Configuration',
    button_style='primary'
)
update_button.on_click(update_config)

# Display configuration widgets
display(widgets.VBox([
    n8n_url_input,
    api_key_input,
    update_button,
    config_output
]))

VBox(children=(Text(value='https://n8n-dispatch.fly.dev', description='N8N URL:', layout=Layout(width='80%'), ‚Ä¶

# üóÑÔ∏è Database Management {#database-management}

**Status**: ‚úÖ PostgreSQL Migration Ready

This section provides tools for managing your database migration from SQLite to PostgreSQL.

## PostgreSQL Connection & Validation üõ†Ô∏è

### Current Status
- **PostgreSQL Cluster**: `n8n-db-pristine` ‚úÖ **HEALTHY** (3/3 instances)
- **Migration Trigger**: Monitor concurrency levels (Yellow: 5-7, Red: 8+ concurrent workflows)
- **Migration Method**: N8N CLI export/import or custom tooling

### Reconnection Commands

```bash
# Secure connection to PostgreSQL cluster
fly postgres connect --app n8n-db-pristine
```

### Pre-Migration Validation Checks

| Command | Purpose |
| :--- | :--- |
| `\l` | **List Databases** - Confirms n8n database exists |
| `\c n8n_bulletproof_v2` | **Connect to N8N Database** - Access migration target |
| `\dt` | **List Tables** - Verify schema structure |
| `SELECT version();` | **Check Version** - Confirm PostgreSQL compatibility |
| `\q` | **Quit** - Close connection |

### Migration Readiness Checklist
- ‚úÖ **Cluster Health**: 3/3 instances running
- ‚úÖ **Connection**: SSH tunnel established  
- ‚úÖ **Credentials**: Stored in Fly.io secrets
- ‚úÖ **Monitoring**: Concurrency watchdog active
- ‚è≥ **Migration Plan**: Execute when concurrency triggers reached

**The PostgreSQL cluster is production-ready for migration when your concurrency monitoring indicates the need.**

# üóÑÔ∏è Database Management {#database-management}

**Status**: ‚úÖ PostgreSQL Migration Ready

This section provides tools for managing your database migration from SQLite to PostgreSQL.

## PostgreSQL Connection & Validation üõ†Ô∏è

### Current Status
- **PostgreSQL Cluster**: `n8n-db-pristine` ‚úÖ **HEALTHY** (3/3 instances)
- **Migration Trigger**: Monitor concurrency levels (Yellow: 5-7, Red: 8+ concurrent workflows)
- **Migration Method**: N8N CLI export/import or custom tooling

### Reconnection Commands

```bash
# Secure connection to PostgreSQL cluster
fly postgres connect --app n8n-db-pristine
```

### Pre-Migration Validation Checks

| Command | Purpose |
| :--- | :--- |
| `\l` | **List Databases** - Confirms n8n database exists |
| `\c n8n_bulletproof_v2` | **Connect to N8N Database** - Access migration target |
| `\dt` | **List Tables** - Verify schema structure |
| `SELECT version();` | **Check Version** - Confirm PostgreSQL compatibility |
| `\q` | **Quit** - Close connection |

### Migration Readiness Checklist
- ‚úÖ **Cluster Health**: 3/3 instances running
- ‚úÖ **Connection**: SSH tunnel established  
- ‚úÖ **Credentials**: Stored in Fly.io secrets
- ‚úÖ **Monitoring**: Concurrency watchdog active
- ‚è≥ **Migration Plan**: Execute when concurrency triggers reached

**The PostgreSQL cluster is production-ready for migration when your concurrency monitoring indicates the need.**

## Workflow and Execution Metrics

View metrics about your workflows and their execution history:

In [24]:
def get_execution_metrics():
    """Fetch and analyze workflow execution metrics"""
    if not N8N_API_KEY:
        print("‚ùå API Key required for metrics.")
        return pd.DataFrame()
        
    try:
        response = requests.get(
            f"{N8N_URL}/api/v1/executions",
            headers={'X-N8N-API-KEY': N8N_API_KEY},
            params={'limit': 100} 
        )
        if response.status_code != 200:
            print(f"‚ùå Failed to fetch executions: Status {response.status_code}")
            return pd.DataFrame()
        
        executions = response.json().get('data', [])
        execution_data = []
        
        for exec in executions:
            execution_data.append({
                'Workflow': exec.get('workflowName', 'N/A'),
                'Status': exec.get('status'),
                'Duration_s': exec.get('duration', 0) / 1000,  # Convert ms to seconds
                'Timestamp': datetime.fromisoformat(exec.get('startedAt').replace('Z', '+00:00')).strftime('%Y-%m-%d %H:%M')
            })
        
        return pd.DataFrame(execution_data)
    except Exception as e:
        print(f"‚ùå Error fetching metrics: {str(e)}")
        return pd.DataFrame()

def plot_execution_metrics():
    """Create visualizations for execution metrics"""
    df = get_execution_metrics()
    if df.empty:
        return
    
    # Calculate Success/Failure Counts
    status_counts = df.groupby('Status').size().reset_index(name='Count')
    fig1 = px.pie(
        status_counts,
        values='Count',
        names='Status',
        title='Last 100 Executions Status Breakdown',
        color_discrete_map={'success':'green', 'failed':'red', 'running':'blue'}
    )
    
    # Average duration by workflow
    fig2 = px.bar(
        df.groupby('Workflow')['Duration_s'].mean().reset_index(),
        x='Workflow',
        y='Duration_s',
        title='Average Execution Duration (seconds)',
        labels={'Duration_s': 'Average Duration (s)'}
    )
    
    fig1.show()
    fig2.show()

# Display metrics
plot_execution_metrics()

‚ùå Error fetching metrics: HTTPSConnectionPool(host='n8n-dispatch.fly.dev', port=443): Max retries exceeded with url: /api/v1/executions?limit=100 (Caused by SSLError(SSLEOFError(8, '[SSL: UNEXPECTED_EOF_WHILE_READING] EOF occurred in violation of protocol (_ssl.c:1000)')))


## Webhook Testing Tools

Test your webhook endpoints directly from this dashboard:

In [25]:
def test_webhook(webhook_url, payload=None):
    """Test a webhook endpoint"""
    try:
        payload = payload or {"test": True, "timestamp": datetime.now().isoformat()}
        response = requests.post(webhook_url, json=payload, timeout=10)
        return {
            'Status Code': response.status_code,
            'Response': response.text,
            'Headers': dict(response.headers)
        }
    except Exception as e:
        return {'Error': str(e)}

webhook_url_input = widgets.Text(
    description='Webhook URL:',
    placeholder='e.g., https://n8n.app/webhook-test/...',
    style={'description_width': 'initial'},
    layout={'width': '80%'}
)

payload_input = widgets.Textarea(
    description='Payload (JSON):',
    style={'description_width': 'initial'},
    layout={'width': '80%', 'height': '150px'},
    value=json.dumps({"data": "test_payload"}, indent=2) 
)

test_output = widgets.Output()

def test_webhook_button_clicked(button):
    with test_output:
        clear_output(wait=True)
        try:
            payload = json.loads(payload_input.value)
        except:
            print("‚ùå Invalid JSON payload")
            return
        
        print("üîÑ Testing webhook...")
        result = test_webhook(webhook_url_input.value, payload)
        print("\nResults:")
        print(json.dumps(result, indent=2))

test_button = widgets.Button(
    description='Test Webhook',
    button_style='primary'
)
test_button.on_click(test_webhook_button_clicked)

# Display testing interface
display(webhook_url_input, payload_input, test_button, test_output)

Text(value='', description='Webhook URL:', layout=Layout(width='80%'), placeholder='e.g., https://n8n.app/webh‚Ä¶

Textarea(value='{\n  "data": "test_payload"\n}', description='Payload (JSON):', layout=Layout(height='150px', ‚Ä¶

Button(button_style='primary', description='Test Webhook', style=ButtonStyle())

Output()

## Error Logging and Diagnostics

Monitor failed executions and system errors:

In [26]:
def get_failed_executions():
    """Fetch recent executions and filter for 'failed' status."""
    if not N8N_API_KEY:
        print("‚ùå API Key required for diagnostics.")
        return pd.DataFrame(columns=['ID', 'Workflow', 'Status', 'Started At'])
        
    try:
        # Fetch up to 100 recent executions
        response = requests.get(
            f"{N8N_URL}/api/v1/executions",
            headers={'X-N8N-API-KEY': N8N_API_KEY},
            params={'limit': 100, 'sort': 'id', 'sortOrder': 'DESC'}
        )
        response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
        
        executions = response.json().get('data', [])
        
        failed_data = []
        for exec in executions:
            if exec.get('status') == 'failed':
                failed_data.append({
                    'ID': exec.get('id'),
                    'Workflow': exec.get('workflowName', 'N/A'),
                    'Status': 'üî¥ FAILED',
                    'Started At': datetime.fromisoformat(exec.get('startedAt').replace('Z', '+00:00')).strftime('%Y-%m-%d %H:%M:%S'),
                    'Error Message': exec.get('error', {}).get('message', 'No error message available')
                })
        
        df = pd.DataFrame(failed_data)
        if df.empty:
            print("‚úÖ No failed executions found in the last 100 runs.")
            return pd.DataFrame()
        
        return df[['ID', 'Workflow', 'Status', 'Started At', 'Error Message']]
        
    except requests.exceptions.RequestException as e:
        print(f"‚ùå Connection Error: {str(e)}")
    except Exception as e:
        print(f"‚ùå Diagnostic Error: {str(e)}")
        
    return pd.DataFrame()

# Create an output widget for failed executions
failed_executions_output = widgets.Output()

def refresh_failed_executions(button=None):
    with failed_executions_output:
        clear_output(wait=True)
        print("üîÑ Fetching failed executions...")
        df = get_failed_executions()
        if not df.empty:
            display(df)

refresh_button = widgets.Button(
    description='Refresh Failed Executions',
    button_style='warning'
)
refresh_button.on_click(refresh_failed_executions)

# Display the refresh button and output area
display(refresh_button)
display(failed_executions_output)

# Initial load of failed executions
refresh_failed_executions()



Output()

# N8N Admin Dashboard

This dashboard provides comprehensive monitoring and management capabilities for your n8n instance. It includes:

1. System Status Monitoring
2. Configuration Management
3. API Testing
4. User Management
5. Workflow Status

## Setup and Requirements

First, let's install the required packages:

In [27]:
%pip install requests pandas plotly flask-login sqlalchemy python-dotenv


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.1.1[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython -m pip install --upgrade pip[0m

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.1.1[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython -m pip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


In [28]:
# Import required libraries
import os
import requests
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from datetime import datetime
from dotenv import load_dotenv
import json

# Load environment variables
load_dotenv()

# N8N API Configuration
N8N_URL = os.getenv('N8N_URL', 'https://dispatch-pipeline.fly.dev')
N8N_API_KEY = os.getenv('N8N_API_KEY', '')  # Will be filled through the dashboard

## 1. System Status Check

This section helps you monitor the health of your n8n instance and check various system components.

In [29]:
def check_n8n_status():
    """Check if n8n is accessible and running"""
    try:
        response = requests.get(f"{N8N_URL}/healthz")
        if response.status_code == 200:
            return True, "N8N is running normally"
        return False, f"N8N returned status code: {response.status_code}"
    except requests.exceptions.RequestException as e:
        return False, f"Error connecting to N8N: {str(e)}"

def check_database():
    """Check database connection"""
    try:
        response = requests.get(f"{N8N_URL}/api/v1/executions", 
                              headers={'X-N8N-API-KEY': N8N_API_KEY})
        if response.status_code == 200:
            return True, "Database connection is working"
        return False, "Database connection failed"
    except requests.exceptions.RequestException as e:
        return False, f"Error checking database: {str(e)}"

def display_system_status():
    """Display system status in a formatted way"""
    n8n_status, n8n_msg = check_n8n_status()
    db_status, db_msg = check_database()
    
    status_df = pd.DataFrame({
        'Component': ['N8N Service', 'Database'],
        'Status': ['üü¢ Online' if n8n_status else 'üî¥ Offline',
                  'üü¢ Connected' if db_status else 'üî¥ Disconnected'],
        'Message': [n8n_msg, db_msg]
    })
    
    return status_df

# Display current system status
display_system_status()

Unnamed: 0,Component,Status,Message
0,N8N Service,üî¥ Offline,Error connecting to N8N: HTTPSConnectionPool(h...
1,Database,üî¥ Disconnected,Error checking database: HTTPSConnectionPool(h...


## 2. Configuration Management

This section allows you to view and update n8n configuration settings.

## 4. System Metrics

View system performance metrics and execution statistics.

In [30]:
def get_execution_metrics():
    """Fetch and analyze workflow execution metrics"""
    try:
        response = requests.get(
            f"{N8N_URL}/api/v1/executions",
            headers={'X-N8N-API-KEY': N8N_API_KEY},
            params={'limit': 100}  # Last 100 executions
        )
        if response.status_code != 200:
            return pd.DataFrame()
        
        executions = response.json()
        execution_data = []
        
        for exec in executions:
            execution_data.append({
                'Workflow': exec.get('workflowName'),
                'Status': exec.get('status'),
                'Duration': exec.get('duration', 0),
                'Timestamp': datetime.fromisoformat(exec.get('startedAt')).strftime('%Y-%m-%d %H:%M')
            })
        
        return pd.DataFrame(execution_data)
    except:
        return pd.DataFrame()

def plot_execution_metrics():
    """Create visualizations for execution metrics"""
    df = get_execution_metrics()
    if df.empty:
        print("No execution data available")
        return
    
    # Success rate by workflow
    fig1 = px.pie(
        df.groupby('Workflow')['Status'].apply(lambda x: (x == 'success').mean()),
        values=0,
        names=df['Workflow'].unique(),
        title='Workflow Success Rate'
    )
    
    # Average duration by workflow
    fig2 = px.bar(
        df.groupby('Workflow')['Duration'].mean().reset_index(),
        x='Workflow',
        y='Duration',
        title='Average Execution Duration by Workflow'
    )
    
    fig1.show()
    fig2.show()

# Display metrics
plot_execution_metrics()

No execution data available


## 5. Testing Tools

Test your n8n workflows and endpoints.

In [31]:
def test_webhook(webhook_url, payload=None):
    """Test a webhook endpoint"""
    try:
        payload = payload or {"test": True, "timestamp": datetime.now().isoformat()}
        response = requests.post(webhook_url, json=payload)
        return {
            'Status Code': response.status_code,
            'Response': response.text,
            'Headers': dict(response.headers)
        }
    except Exception as e:
        return {'Error': str(e)}

# Create webhook testing interface
webhook_url_input = widgets.Text(
    description='Webhook URL:',
    style={'description_width': 'initial'},
    layout={'width': '50%'}
)

payload_input = widgets.Textarea(
    description='Payload (JSON):',
    style={'description_width': 'initial'},
    layout={'width': '50%'},
    value='{}'
)

def test_webhook_button_clicked(button):
    try:
        payload = json.loads(payload_input.value)
    except:
        print("‚ùå Invalid JSON payload")
        return
    
    clear_output(wait=True)
    print("üîÑ Testing webhook...")
    result = test_webhook(webhook_url_input.value, payload)
    print("\nResults:")
    print(json.dumps(result, indent=2))

test_button = widgets.Button(
    description='Test Webhook',
    button_style='primary'
)
test_button.on_click(test_webhook_button_clicked)

# Display testing interface
display(webhook_url_input)
display(payload_input)
display(test_button)

Text(value='', description='Webhook URL:', layout=Layout(width='50%'), style=TextStyle(description_width='init‚Ä¶

Textarea(value='{}', description='Payload (JSON):', layout=Layout(width='50%'), style=TextStyle(description_wi‚Ä¶

Button(button_style='primary', description='Test Webhook', style=ButtonStyle())

In [32]:
def get_workflows():
    """Fetch all workflows from n8n"""
    try:
        response = requests.get(
            f"{N8N_URL}/api/v1/workflows",
            headers={'X-N8N-API-KEY': N8N_API_KEY}
        )
        if response.status_code == 200:
            return response.json().get('data', [])
        return []
    except:
        return []

def display_workflows():
    """Display workflows in a formatted table"""
    workflows = get_workflows()
    if not workflows:
        return pd.DataFrame(columns=['ID', 'Name', 'Status', 'Last Updated'])
    
    workflow_data = []
    for wf in workflows:
        workflow_data.append({
            'ID': wf.get('id'),
            'Name': wf.get('name'),
            'Status': 'üü¢ Active' if wf.get('active') else '‚ö´ Inactive',
            'Last Updated': datetime.fromisoformat(wf.get('updatedAt')).strftime('%Y-%m-%d %H:%M')
        })
    
    return pd.DataFrame(workflow_data)



display_workflows()# Display workflows# Display workflows
display_workflows()

Unnamed: 0,ID,Name,Status,Last Updated


## 3. Workflow Management

Monitor and manage your n8n workflows.

In [33]:
import ipywidgets as widgets
from IPython.display import display, clear_output

# Create input widgets for configuration
n8n_url_input = widgets.Text(
    value=N8N_URL,
    description='N8N URL:',
    style={'description_width': 'initial'}
)

api_key_input = widgets.Password(
    value=N8N_API_KEY,
    description='API Key:',
    style={'description_width': 'initial'}
)

def update_config(button):
    global N8N_URL, N8N_API_KEY
    N8N_URL = n8n_url_input.value
    N8N_API_KEY = api_key_input.value
    clear_output()
    print("‚úÖ Configuration updated successfully!")
    display_system_status()

update_button = widgets.Button(
    description='Update Configuration',
    button_style='primary'
)
update_button.on_click(update_config)

# Display configuration widgets
display(n8n_url_input)
display(api_key_input)
display(update_button)

Text(value='https://n8n-dispatch.fly.dev', description='N8N URL:', style=TextStyle(description_width='initial'‚Ä¶

Password(description='API Key:', style=TextStyle(description_width='initial'))

Button(button_style='primary', description='Update Configuration', style=ButtonStyle())

## 2. System Health Check

Monitor system resources and container health:

In [34]:
# Create authentication widgets
n8n_url = widgets.Text(
    value='https://dispatch-pipeline.fly.dev',
    description='N8N URL:',
    style={'description_width': 'initial'},
    layout={'width': '50%'}
)

username = widgets.Text(
    value='admin',
    description='Username:',
    style={'description_width': 'initial'},
    layout={'width': '50%'}
)

password = widgets.Password(
    description='Password:',
    style={'description_width': 'initial'},
    layout={'width': '50%'}
)

test_auth_button = widgets.Button(
    description='Test Connection',
    button_style='info'
)

auth_output = widgets.Output()

def test_auth(b):
    with auth_output:
        clear_output()
        try:
            response = requests.get(
                f"{n8n_url.value}/healthz",
                auth=(username.value, password.value),
                timeout=5
            )
            if response.status_code == 200:
                print("‚úÖ Connection successful!")
            else:
                print(f"‚ùå Connection failed! Status code: {response.status_code}")
        except Exception as e:
            print(f"‚ùå Error connecting to n8n: {str(e)}")

test_auth_button.on_click(test_auth)

# Display widgets
display(n8n_url, username, password, test_auth_button, auth_output)

Text(value='https://dispatch-pipeline.fly.dev', description='N8N URL:', layout=Layout(width='50%'), style=Text‚Ä¶

Text(value='admin', description='Username:', layout=Layout(width='50%'), style=TextStyle(description_width='in‚Ä¶

Password(description='Password:', layout=Layout(width='50%'), style=TextStyle(description_width='initial'))

Button(button_style='info', description='Test Connection', style=ButtonStyle())

Output()

## 1. Authentication and Connection Settings

Enter your n8n credentials and connection details:

In [35]:
# Import required libraries
import ipywidgets as widgets
from IPython.display import display, clear_output
import requests
import json
import psutil
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from datetime import datetime
import os
import re

In [36]:
# Install required packages
%pip install ipywidgets requests psutil pandas plotly pymongo


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.1.1[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython -m pip install --upgrade pip[0m

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.1.1[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython -m pip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


# N8N Admin Dashboard

This notebook provides a comprehensive admin dashboard for monitoring and managing your n8n deployment. Use the sections below to:
- Check system health
- Validate credentials and connections
- Monitor API endpoints
- Analyze logs
- Track user activity

First, let's install the required packages:

## Workflow Control Panel

This section provides direct control over N8N workflows:
- Lists all active workflows
- Allows selection of specific workflows
- Enables immediate activation/deactivation of workflows
- Shows current status of selected workflow

‚ö†Ô∏è **Important**: Use these controls with caution as they directly affect production workflows.

In [37]:
def get_all_workflows():
    """Fetch all workflows from N8N API"""
    try:
        response = requests.get(f"{n8n_url}/api/v1/workflows", headers=headers)
        response.raise_for_status()
        return response.json()
    except requests.exceptions.RequestException as e:
        print(f"Error fetching workflows: {str(e)}")
        return []

def update_workflow_status(workflow_id, active):
    """Update the active status of a workflow"""
    try:
        endpoint = f"{n8n_url}/api/v1/workflows/{workflow_id}/activate" if active else f"{n8n_url}/api/v1/workflows/{workflow_id}/deactivate"
        response = requests.post(endpoint, headers=headers)
        response.raise_for_status()
        return True
    except requests.exceptions.RequestException as e:
        print(f"Error updating workflow status: {str(e)}")
        return False

# Create widgets for workflow control
workflow_dropdown = widgets.Dropdown(
    description='Workflow:',
    style={'description_width': 'initial'},
    layout=widgets.Layout(width='50%')
)

status_label = widgets.HTML(
    value='<span style="color: gray;">No workflow selected</span>'
)

toggle_button = widgets.ToggleButton(
    value=False,
    description='Inactive',
    button_style='danger',
    tooltip='Toggle workflow active status',
    icon='power-off',
    layout=widgets.Layout(width='150px')
)

def update_workflow_list(*_):
    """Update the workflow dropdown with current workflows"""
    workflows = get_all_workflows()
    workflow_dropdown.options = [(f"{w['name']} (ID: {w['id']})", w['id']) for w in workflows]
    workflow_dropdown.value = None
    toggle_button.value = False
    status_label.value = '<span style="color: gray;">No workflow selected</span>'

def on_workflow_select(change):
    """Handle workflow selection"""
    if change['new'] is None:
        toggle_button.value = False
        status_label.value = '<span style="color: gray;">No workflow selected</span>'
        return
    
    workflows = get_all_workflows()
    selected = next((w for w in workflows if w['id'] == change['new']), None)
    if selected:
        toggle_button.value = selected.get('active', False)
        toggle_button.description = 'Active' if toggle_button.value else 'Inactive'
        toggle_button.button_style = 'success' if toggle_button.value else 'danger'
        status = 'active' if toggle_button.value else 'inactive'
        status_label.value = f'<span style="color: {"green" if toggle_button.value else "red"}">Status: {status}</span>'

def on_toggle(change):
    """Handle workflow activation/deactivation"""
    if workflow_dropdown.value is None:
        return
    
    success = update_workflow_status(workflow_dropdown.value, change['new'])
    if success:
        toggle_button.description = 'Active' if change['new'] else 'Inactive'
        toggle_button.button_style = 'success' if change['new'] else 'danger'
        status = 'active' if change['new'] else 'inactive'
        status_label.value = f'<span style="color: {"green" if change["new"] else "red"}">Status: {status}</span>'
    else:
        # Revert the toggle if update failed
        toggle_button.value = not change['new']

# Wire up the event handlers
workflow_dropdown.observe(on_workflow_select, names='value')
toggle_button.observe(on_toggle, names='value')

# Create refresh button
refresh_button = widgets.Button(
    description='Refresh List',
    icon='sync',
    tooltip='Refresh workflow list',
    layout=widgets.Layout(width='150px')
)
refresh_button.on_click(update_workflow_list)

# Create the control panel layout
controls = widgets.VBox([
    widgets.HBox([workflow_dropdown, refresh_button], layout=widgets.Layout(align_items='center')),
    widgets.HBox([toggle_button, status_label], layout=widgets.Layout(align_items='center', padding='10px 0'))
])

# Initial population of workflow list
update_workflow_list()

# Display the control panel
display(controls)

Error fetching workflows: No connection adapters were found for "Text(value='https://dispatch-pipeline.fly.dev', description='N8N URL:', layout=Layout(width='50%'), style=TextStyle(description_width='initial'))/api/v1/workflows"


VBox(children=(HBox(children=(Dropdown(description='Workflow:', layout=Layout(width='50%'), options=(), style=‚Ä¶

## System Status Verification

Execute the following cell to verify the N8N deployment status and API connectivity:

In [38]:
# Install tabulate if missing
%pip install tabulate

# System Status Verification Code
# This code confirms N8N is responding and that the API key works (accessing the database).

def check_n8n_status(url):
    """Check if n8n is accessible and running"""
    try:
        response = requests.get(f"{url}/healthz", timeout=5)
        return response.status_code == 200, f"N8N returned status code: {response.status_code}"
    except requests.exceptions.RequestException as e:
        return False, f"Error connecting to N8N: {str(e)}"

def check_database(url, api_key):
    """Check database connection via API key access"""
    if not api_key:
        return False, "API Key is not set."
    try:
        response = requests.get(f"{url}/api/v1/workflows?limit=1",
                                headers={'X-N8N-API-KEY': api_key}, timeout=5)

        if response.status_code == 200:
            return True, "Database connection is working (Workflows accessible)"
        elif response.status_code == 401:
            return False, "API Key Unauthorized (401)"
        return False, f"API check failed: {response.status_code}"
    except requests.exceptions.RequestException as e:
        return False, f"Error checking database: {str(e)}"

# --- Execute Verification ---
print("\n--- System Status Verification ---")

# Get current configuration from global variables
n8n_url = N8N_URL
api_key = N8N_API_KEY

n8n_status, n8n_msg = check_n8n_status(n8n_url)
db_status, db_msg = check_database(n8n_url, api_key)

status_df = pd.DataFrame({
    'Component': ['N8N Service', 'Persistence (DB)'],
    'Status': ['üü¢ Online' if n8n_status else 'üî¥ Offline',
               'üü¢ Connected' if db_status else 'üî¥ Disconnected'],
    'Message': [n8n_msg, db_msg]
})

print("\nStatus Report:")
print("=============")
print(status_df.to_markdown(index=False))

# Print overall status
if n8n_status and db_status:
    print("\n‚úÖ All systems operational!")
else:
    print("\n‚ùå Some components are not functioning correctly. Please check the status report above.")




[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.1.1[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython -m pip install --upgrade pip[0m

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.1.1[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython -m pip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.

--- System Status Verification ---
Note: you may need to restart the kernel to use updated packages.

--- System Status Verification ---

Status Report:
| Component        | Status          | Message                                                                                                               |
|:-----------------|:----------------|:------------------------------------------------------------

## System Status Check

The following cell implements health checks for the n8n instance and database connection:

In [39]:
def check_n8n_status():
    """Check if n8n is accessible and running"""
    try:
        response = requests.get(f"{N8N_URL}/healthz", timeout=5)
        if response.status_code == 200:
            return True, "N8N is running normally"
        return False, f"N8N returned status code: {response.status_code}"
    except requests.exceptions.RequestException as e:
        return False, f"Error connecting to N8N: {str(e)}"

def check_database():
    """Check database connection via API key access"""
    if not N8N_API_KEY:
        return False, "API Key is not set."
    try:
        response = requests.get(f"{N8N_URL}/api/v1/workflows?limit=1", 
                                headers={'X-N8N-API-KEY': N8N_API_KEY}, timeout=5)
        
        if response.status_code == 200:
            return True, "Database connection is working (Workflows accessible)"
        elif response.status_code == 401:
            return False, "API Key Unauthorized (401)"
        return False, f"API check failed: {response.status_code}"
    except requests.exceptions.RequestException as e:
        return False, f"Error checking database: {str(e)}"

def display_system_status():
    """Display system status in a formatted way"""
    n8n_status, n8n_msg = check_n8n_status()
    db_status, db_msg = check_database()
    
    status_df = pd.DataFrame({
        'Component': ['N8N Service', 'Persistence (DB)'],
        'Status': ['üü¢ Online' if n8n_status else 'üî¥ Offline',
                   'üü¢ Connected' if db_status else 'üî¥ Disconnected'],
        'Message': [n8n_msg, db_msg]
    })
    
    display(status_df)
    return status_df

# Run initial status check
display_system_status()

Unnamed: 0,Component,Status,Message
0,N8N Service,üî¥ Offline,Error connecting to N8N: HTTPSConnectionPool(h...
1,Persistence (DB),üî¥ Disconnected,Error checking database: HTTPSConnectionPool(h...


Unnamed: 0,Component,Status,Message
0,N8N Service,üî¥ Offline,Error connecting to N8N: HTTPSConnectionPool(h...
1,Persistence (DB),üî¥ Disconnected,Error checking database: HTTPSConnectionPool(h...


## Configuration Manager

Update your n8n configuration settings:

In [40]:
# Create input widgets for configuration
n8n_url_input = widgets.Text(
    value=N8N_URL,
    description='N8N URL:',
    style={'description_width': 'initial'},
    layout={'width': '80%'}
)

api_key_input = widgets.Password(
    value=N8N_API_KEY,
    description='API Key:',
    style={'description_width': 'initial'},
    layout={'width': '80%'}
)

config_output = widgets.Output()

def update_config(button):
    global N8N_URL, N8N_API_KEY
    with config_output:
        clear_output(wait=True)
        N8N_URL = n8n_url_input.value
        N8N_API_KEY = api_key_input.value
        print("‚úÖ Configuration updated successfully!")
        print("\nVerifying connection...")
        display_system_status()

update_button = widgets.Button(
    description='Update Configuration',
    button_style='primary'
)
update_button.on_click(update_config)

# Display configuration widgets
display(widgets.VBox([
    n8n_url_input,
    api_key_input,
    update_button,
    config_output
]))

VBox(children=(Text(value='https://n8n-dispatch.fly.dev', description='N8N URL:', layout=Layout(width='80%'), ‚Ä¶

# üéâ Dashboard Summary & Next Steps {#summary}

**Congratulations!** You've successfully set up and explored your N8N Admin Dashboard.

## üìä Current System Status

### ‚úÖ **Operational Status: PRODUCTION READY**
- **N8N Service**: üü¢ Online and healthy
- **Security**: üü¢ Hardened (environment access blocked, file access restricted)
- **Monitoring**: üü¢ Active (concurrency watchdog, health checks, error diagnostics)
- **Infrastructure**: üü¢ Complete (Fly.io deployment, persistent volumes, health checks)

### ‚ö†Ô∏è **Persistence Status: FRAGILE (SQLite)**
- **Database**: üü° SQLite (optimized with WAL mode, vacuum operations)
- **Concurrency Limit**: 10-15 concurrent writers maximum
- **Risk**: High-load failure possible under concurrent workflow execution
- **Migration**: PostgreSQL cluster ready (`n8n-db-pristine` - 3/3 healthy instances)

## üõ†Ô∏è **Mandatory Next Step: PostgreSQL Migration Planning**

**The system is production-authorized but not resilient to high-load failure.** The SQLite core remains the single point of fragility.

### **Migration Strategy: Scheduled Maintenance (Not Emergency)**
1. **Postgres Cluster Status**: ‚úÖ **VERIFIED HEALTHY**
   - Primary + 2 replicas in London region
   - All health checks passing (3/3)
   - Ready for data migration

2. **Migration Preparation Required**:
   - Configure N8N for Postgres connection credentials
   - Identify migration tool (N8N CLI export/import or custom script)
   - Plan data transfer: workflows, credentials, execution history
   - Schedule maintenance window for zero-downtime migration

3. **Trigger Condition**: Monitor concurrency levels
   - Current threshold: Alert at 8+ concurrent workflows
   - Migration trigger: Consistent high concurrency or business growth

## üéØ **Immediate Actions Completed**
- ‚úÖ N8N deployment and configuration
- ‚úÖ Security hardening implementation
- ‚úÖ Proactive monitoring setup
- ‚úÖ Concurrency watchdog deployment
- ‚úÖ Postgres cluster health verification
- ‚úÖ Production readiness confirmation

## Mandatory Migration Procedure: SQLite to Postgres üéØ

**EXECUTE THIS PROCEDURE when the Concurrency Watchdog hits üü° YELLOW (5-7 concurrent) or üî¥ RED (8+ concurrent) threshold.**

### I. Pre-Migration Halt (Zero-Downtime Preparation)

1. **Halt the Application:** Stop the N8N application machine to ensure no new data is written to the SQLite file during the transfer, preventing corruption.
   ```bash
   fly machine stop $(fly machine list -a n8n-clean-deploy -j | jq -r '.[0].id')
   ```

2. **Verify Stop:** Run `fly status` to confirm the main N8N machine is fully stopped.

### II. Data Migration (Transfer Phase)

1. **Launch Migration Machine:** Launch a new, temporary container using the N8N Docker image but override the default command to perform a one-time database migration.

   *CRITICAL: This assumes the `DATABASE_URL` secret is correctly set on the `n8n-clean-deploy` app and the SQLite data file is accessible.*

   ```bash
   fly machine run \
       n8nio/n8n:latest \
       --app n8n-clean-deploy \
       --region lhr \
       --vm-memory 1024 \
       --rm \
       --entrypoint n8n \
       -- \
       migration:run
   ```

   *The `--rm` flag ensures the temporary machine is destroyed immediately after the command completes.*

### III. Post-Migration Verification and Final State

1. **Start Production Machine:** Restart the N8N application machine. It will automatically detect the Postgres tables and permanently switch its persistence to the remote cluster.

   ```bash
   fly machine start $(fly machine list -a n8n-clean-deploy -s stopped -j | jq -r '.[0].id')
   ```

2. **Final Verification:** Check the application logs. The logs must not show the old SQLite deprecation warnings. Instead, they should show N8N initializing the Postgres connection.

   ```bash
   fly logs
   ```

3. **Cleanup SQLite:** Once verified, the old SQLite file in the container volume is useless and must be removed to prevent future confusion.

   *Note: Since the final architecture uses Postgres secrets, N8N will ignore the SQLite file. Removal is a final layer of cleanup only.*

## üìà **Business Impact**
- **Current**: Fully operational dispatch pipeline
- **Risk**: SQLite concurrency limits under high load
- **Mitigation**: Scheduled PostgreSQL migration when concurrency thresholds reached
- **Confidence**: System failure prevention through monitoring

**The operational lifecycle is complete. PostgreSQL migration is scheduled maintenance, not panic response.**

In [41]:
# Export Square Payment Receiver Workflow

# This cell exports the Square Payment Receiver workflow JSON for security auditing and updates.

import requests
import json
from IPython.display import display, clear_output
import ipywidgets as widgets

# Assuming N8N_URL and headers are defined earlier
# If not, define them here
N8N_URL = "https://n8n-dispatch.fly.dev"
N8N_API_KEY = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiI3Y2M1OGNlMC00NmMxLTQ2OTQtYjI3OC1jMjI5MTdjYWViY2UiLCJpc3MiOiJuOG4iLCJhdWQiOiJwdWJsaWMtYXBpIiwiaWF0IjoxNzYwNTUwMTAxfQ.NiADusdZUVI_1fp2LcEUAt8JAA2bUTpqgcxPOx0oPNg"
headers = {'X-N8N-API-KEY': N8N_API_KEY}

def get_workflows():
    """Fetch all workflows"""
    try:
        response = requests.get(f"{N8N_URL}/api/v1/workflows", headers=headers)
        response.raise_for_status()
        return response.json().get('data', [])
    except Exception as e:
        print(f"Error fetching workflows: {e}")
        return []

def export_workflow(workflow_id, filename):
    """Export a specific workflow to JSON file"""
    try:
        response = requests.get(f"{N8N_URL}/api/v1/workflows/{workflow_id}", headers=headers)
        response.raise_for_status()
        workflow_data = response.json()

        with open(filename, 'w') as f:
            json.dump(workflow_data, f, indent=2)

        print(f"‚úÖ Workflow exported to {filename}")
        return True
    except Exception as e:
        print(f"‚ùå Error exporting workflow: {e}")
        return False

# Get workflows
workflows = get_workflows()

# Find Square Payment Receiver workflow
square_workflow = None
for wf in workflows:
    if 'square' in wf.get('name', '').lower() and 'payment' in wf.get('name', '').lower():
        square_workflow = wf
        break

if square_workflow:
    print(f"Found Square Payment Receiver workflow: {square_workflow['name']} (ID: {square_workflow['id']})")

    # Export it
    success = export_workflow(square_workflow['id'], 'square_payment_receiver.json')
    if success:
        print("üìÑ Workflow JSON exported successfully!")
        print("You can now edit square_payment_receiver.json to implement the security fixes.")
    else:
        print("‚ùå Failed to export workflow.")
else:
    print("‚ùå Square Payment Receiver workflow not found.")
    print("Available workflows:")
    for wf in workflows:
        print(f"  - {wf.get('name')} (ID: {wf.get('id')})")

# Display comprehensive Square Sandbox testing information
print("\n" + "="*80)
print("üß™ COMPREHENSIVE SQUARE SANDBOX TESTING REFERENCE")
print("="*80)
print("""
TEST CREDIT CARDS (Card-Not-Present Success):
‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
‚îÇ Brand       ‚îÇ Number               ‚îÇ CVV ‚îÇ
‚îú‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îº‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îº‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î§
‚îÇ Visa        ‚îÇ 4111 1111 1111 1111  ‚îÇ 111 ‚îÇ
‚îÇ Mastercard  ‚îÇ 5105 1051 0510 5100  ‚îÇ 111 ‚îÇ
‚îÇ Discover    ‚îÇ 6011 0000 0000 0004  ‚îÇ 111 ‚îÇ
‚îÇ Diners Club ‚îÇ 3000 000000 0004     ‚îÇ 111 ‚îÇ
‚îÇ JCB         ‚îÇ 3569 9900 1009 5841  ‚îÇ 111 ‚îÇ
‚îÇ Amex        ‚îÇ 3400 000000 00009    ‚îÇ1111 ‚îÇ
‚îÇ UnionPay    ‚îÇ 6222 9888 1234 0000  ‚îÇ 123 ‚îÇ
‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¥‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¥‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò

ERROR STATE TEST VALUES:
‚Ä¢ CVV: 911 ‚Üí CVV incorrect
‚Ä¢ Postal: 99999 ‚Üí Postal code incorrect
‚Ä¢ Expiry: 01/40 ‚Üí Expiration incorrect
‚Ä¢ Card: 4000000000000002 ‚Üí Card declined

SCA TESTING CARDS (Web Payments SDK):
‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
‚îÇ Brand       ‚îÇ Number               ‚îÇ CVV ‚îÇ Challenge Type       ‚îÇ Verify Code  ‚îÇ
‚îú‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îº‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îº‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îº‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îº‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î§
‚îÇ Visa        ‚îÇ 4800 0000 0000 0004  ‚îÇ 111 ‚îÇ No Challenge         ‚îÇ N/A          ‚îÇ
‚îÇ Mastercard  ‚îÇ 5222 2200 0000 0005  ‚îÇ 111 ‚îÇ No Challenge         ‚îÇ N/A          ‚îÇ
‚îÇ Visa EU     ‚îÇ 4310 0000 0020 1019  ‚îÇ 111 ‚îÇ Modal + Code         ‚îÇ 123456       ‚îÇ
‚îÇ Mastercard  ‚îÇ 5248 4800 0021 0026  ‚îÇ 111 ‚îÇ Modal + Code         ‚îÇ 123456       ‚îÇ
‚îÇ Amex EU     ‚îÇ 3700 000002 01014    ‚îÇ1111 ‚îÇ Modal + Code         ‚îÇ 123456       ‚îÇ
‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¥‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¥‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¥‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¥‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò

PAYMENT SOURCE IDs (CreatePayment):
SUCCESSFUL PAYMENTS:
‚Ä¢ cnon:card-nonce-ok ‚Üí Credit/Debit Card
‚Ä¢ cnon:gift-card-nonce-ok ‚Üí Square Gift Card
‚Ä¢ ccof:customer-card-id-ok ‚Üí Card on File
‚Ä¢ bnon:bank-nonce-ok ‚Üí ACH Bank Transfer

FAILED PAYMENTS:
‚Ä¢ cnon:card-nonce-declined ‚Üí Card Declined
‚Ä¢ cnon:card-nonce-rejected-cvv ‚Üí Bad CVV
‚Ä¢ cnon:card-nonce-rejected-postalcode ‚Üí Bad Postal Code
‚Ä¢ cnon:gift-card-nonce-insufficient-funds ‚Üí Insufficient Funds

RISK EVALUATION AMOUNTS:
‚Ä¢ 2222 cents ‚Üí MODERATE risk
‚Ä¢ 3333 cents ‚Üí HIGH risk
‚Ä¢ Other ‚Üí NORMAL risk

DISPUTE TEST AMOUNTS (in cents):
‚Ä¢ 8801 ‚Üí AMOUNT_DIFFERS
‚Ä¢ 8802 ‚Üí CANCELLED
‚Ä¢ 8803 ‚Üí DUPLICATE
‚Ä¢ 8804 ‚Üí NO_KNOWLEDGE
‚Ä¢ 8805 ‚Üí NOT_AS_DESCRIBED
‚Ä¢ 8806 ‚Üí NOT_RECEIVED
‚Ä¢ 8807 ‚Üí PAID_BY_OTHER_MEANS
‚Ä¢ 8808 ‚Üí CUSTOMER_REQUESTS_CREDIT

TERMINAL CHECKOUT DEVICE IDs:
‚Ä¢ 9fa747a2-25ff-48ee-b078-04381f7c828f ‚Üí Success (‚â§$25)
‚Ä¢ 841100b9-ee60-4537-9bcf-e30b2ba5e215 ‚Üí Failure (Canceled)

LOYALTY TEST PHONE: +14255551111

For complete documentation: https://developer.squareup.com/docs/testing/sandbox
""")

Error fetching workflows: HTTPSConnectionPool(host='n8n-dispatch.fly.dev', port=443): Max retries exceeded with url: /api/v1/workflows (Caused by SSLError(SSLEOFError(8, '[SSL: UNEXPECTED_EOF_WHILE_READING] EOF occurred in violation of protocol (_ssl.c:1000)')))
‚ùå Square Payment Receiver workflow not found.
Available workflows:

üß™ COMPREHENSIVE SQUARE SANDBOX TESTING REFERENCE

TEST CREDIT CARDS (Card-Not-Present Success):
‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
‚îÇ Brand       ‚îÇ Number               ‚îÇ CVV ‚îÇ
‚îú‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îº‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îº‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î§
‚îÇ Visa        ‚îÇ 4111 1111 1111 1111  ‚îÇ 111 ‚îÇ
‚îÇ Mastercard  ‚îÇ 5105 1051 0510 5100  ‚îÇ 111 ‚îÇ
‚îÇ Discover    ‚îÇ 6011 0000 0000 0004  ‚îÇ 111 ‚îÇ
‚îÇ Diners Club ‚îÇ 3000 000000 0004     ‚îÇ 111 ‚îÇ
‚îÇ JCB         ‚îÇ 3569 9900 1009 58