# Solana Webhook Transactions - Bronze Layer Explorer

This notebook provides insights into the webhook transaction data being captured from Helius.

## Architecture
- **Bronze Layer**: Raw webhook payloads from Helius
- **Storage**: PostgreSQL `bronze.bronze_webhook_transactions` table
- **Processing**: Direct FastAPI → PostgreSQL writes

## Data Flow
```
Helius Webhook → FastAPI Listener → PostgreSQL Bronze Layer
```

In [None]:
import pandas as pd
import numpy as np
from sqlalchemy import create_engine, text
from datetime import datetime, timedelta
import json
import warnings
warnings.filterwarnings('ignore')

# Database connection with multiple fallback options
def get_database_connection():
    """Try multiple database connection options"""
    connection_options = [
        "postgresql://trader:trader_password@solana_postgres:5432/solana-smart-traders",
        "postgresql://trader:trader_password@postgres:5432/solana-smart-traders", 
        "postgresql://trader:trader_password@host.docker.internal:5432/solana-smart-traders",
        "postgresql://trader:trader_password@172.17.0.1:5432/solana-smart-traders",
        "postgresql://trader:trader_password@localhost:5432/solana-smart-traders",
        "postgresql://trader:trader_password@localhost:5433/solana-smart-traders"
    ]
    
    for db_url in connection_options:
        try:
            engine = create_engine(db_url)
            # Test the connection
            with engine.connect() as conn:
                conn.execute(text("SELECT 1"))
            print(f"✅ Database connection successful using: {db_url}")
            return engine
        except Exception as e:
            print(f"❌ Failed to connect with {db_url}: {e}")
            continue
    
    raise Exception("Could not establish database connection with any of the available options")

# Establish database connection
engine = get_database_connection()

def query_table(query, description=""):
    """Execute a query and return results as DataFrame"""
    try:
        df = pd.read_sql_query(query, engine)
        if description:
            print(f"\n{description}")
            print("=" * len(description))
        print(f"Rows: {len(df)}")
        return df
    except Exception as e:
        print(f"Error executing query: {e}")
        return pd.DataFrame()

def show_table_info(schema, table_name):
    """Show basic info about a table"""
    query = f"""
    SELECT 
        column_name,
        data_type,
        is_nullable,
        column_default
    FROM information_schema.columns 
    WHERE table_schema = '{schema}' AND table_name = '{table_name}'
    ORDER BY ordinal_position;
    """
    return query_table(query, f"Schema for {schema}.{table_name}")

print("Database connection established!")
print("Ready to explore webhook data!")

## Database Schema
Let's first check if the webhook table exists and examine its structure

In [None]:
# Check if bronze_webhook_transactions table exists
table_exists = query_table("""
SELECT EXISTS (
    SELECT FROM information_schema.tables 
    WHERE table_schema = 'bronze' 
    AND table_name = 'bronze_webhook_transactions'
) as table_exists;
""", "Table Existence Check")

if table_exists.iloc[0]['table_exists']:
    print("✅ bronze_webhook_transactions table exists!")
    
    # Show table schema
    show_table_info('bronze', 'bronze_webhook_transactions')
else:
    print("❌ bronze_webhook_transactions table does not exist yet.")
    print("Run the Alembic migration first: alembic upgrade head")

## 1. Webhook Transaction Overview
Overall statistics about webhook transactions received

In [None]:
# Check if table exists before querying
try:
    overview = query_table("""
    SELECT 
        COUNT(*) as total_webhooks,
        COUNT(DISTINCT webhook_id) as unique_webhooks,
        COUNT(DISTINCT transaction_signature) as unique_transactions,
        COUNT(DISTINCT account_address) as unique_accounts,
        COUNT(DISTINCT token_address) as unique_tokens,
        COUNT(DISTINCT webhook_type) as unique_webhook_types,
        MIN(webhook_timestamp) as first_webhook,
        MAX(webhook_timestamp) as last_webhook,
        MIN(ingested_at) as first_ingested,
        MAX(ingested_at) as last_ingested
    FROM bronze.bronze_webhook_transactions;
    """, "Webhook Transactions - Overview")
    
    display(overview)
except Exception as e:
    print(f"Table might not exist yet or is empty: {e}")

## 2. Processing Status
Track the processing status of webhook transactions

In [None]:
try:
    processing_status = query_table("""
    SELECT 
        processing_status,
        COUNT(*) as count,
        ROUND(COUNT(*) * 100.0 / (SELECT COUNT(*) FROM bronze.bronze_webhook_transactions), 2) as percentage
    FROM bronze.bronze_webhook_transactions
    GROUP BY processing_status
    ORDER BY count DESC;
    """, "Processing Status Distribution")
    
    display(processing_status)
except Exception as e:
    print(f"Table might not exist yet or is empty: {e}")

## 3. Webhook Types Analysis
Breakdown of different types of webhook events received

In [None]:
try:
    webhook_types = query_table("""
    SELECT 
        webhook_type,
        COUNT(*) as count,
        ROUND(COUNT(*) * 100.0 / (SELECT COUNT(*) FROM bronze.bronze_webhook_transactions), 2) as percentage,
        MIN(webhook_timestamp) as first_seen,
        MAX(webhook_timestamp) as last_seen
    FROM bronze.bronze_webhook_transactions
    WHERE webhook_type IS NOT NULL
    GROUP BY webhook_type
    ORDER BY count DESC;
    """, "Webhook Types Distribution")
    
    display(webhook_types)
except Exception as e:
    print(f"Table might not exist yet or is empty: {e}")

## 4. Account Activity
Most active accounts sending webhook events

In [None]:
try:
    account_activity = query_table("""
    SELECT 
        account_address,
        COUNT(*) as webhook_count,
        COUNT(DISTINCT webhook_type) as unique_types,
        COUNT(DISTINCT token_address) as unique_tokens,
        MIN(webhook_timestamp) as first_activity,
        MAX(webhook_timestamp) as last_activity
    FROM bronze.bronze_webhook_transactions
    WHERE account_address IS NOT NULL
    GROUP BY account_address
    ORDER BY webhook_count DESC
    LIMIT 20;
    """, "Top 20 Most Active Accounts")
    
    display(account_activity)
except Exception as e:
    print(f"Table might not exist yet or is empty: {e}")

## 5. Token Activity
Most frequently seen tokens in webhook events

In [None]:
try:
    token_activity = query_table("""
    SELECT 
        token_address,
        COUNT(*) as webhook_count,
        COUNT(DISTINCT account_address) as unique_accounts,
        COUNT(DISTINCT webhook_type) as unique_types,
        MIN(webhook_timestamp) as first_seen,
        MAX(webhook_timestamp) as last_seen
    FROM bronze.bronze_webhook_transactions
    WHERE token_address IS NOT NULL
    GROUP BY token_address
    ORDER BY webhook_count DESC
    LIMIT 20;
    """, "Top 20 Most Active Tokens")
    
    display(token_activity)
except Exception as e:
    print(f"Table might not exist yet or is empty: {e}")

## 6. Temporal Analysis
Webhook activity over time

In [None]:
try:
    temporal_analysis = query_table("""
    SELECT 
        DATE(webhook_timestamp) as date,
        COUNT(*) as webhook_count,
        COUNT(DISTINCT account_address) as unique_accounts,
        COUNT(DISTINCT token_address) as unique_tokens,
        COUNT(DISTINCT webhook_type) as unique_types
    FROM bronze.bronze_webhook_transactions
    WHERE webhook_timestamp IS NOT NULL
    GROUP BY DATE(webhook_timestamp)
    ORDER BY date DESC
    LIMIT 14;
    """, "Daily Webhook Activity (Last 14 Days)")
    
    display(temporal_analysis)
except Exception as e:
    print(f"Table might not exist yet or is empty: {e}")

## 7. Hourly Activity Pattern
Webhook activity by hour of day

In [None]:
try:
    hourly_pattern = query_table("""
    SELECT 
        EXTRACT(HOUR FROM webhook_timestamp) as hour,
        COUNT(*) as webhook_count,
        COUNT(DISTINCT account_address) as unique_accounts,
        AVG(COUNT(*)) OVER() as avg_per_hour
    FROM bronze.bronze_webhook_transactions
    WHERE webhook_timestamp IS NOT NULL
    GROUP BY EXTRACT(HOUR FROM webhook_timestamp)
    ORDER BY hour;
    """, "Hourly Activity Pattern")
    
    display(hourly_pattern)
except Exception as e:
    print(f"Table might not exist yet or is empty: {e}")

## 8. Recent Activity
Latest webhook transactions for monitoring

In [None]:
try:
    recent_activity = query_table("""
    SELECT 
        webhook_id,
        webhook_type,
        transaction_signature,
        account_address,
        token_address,
        webhook_timestamp,
        ingested_at,
        processing_status
    FROM bronze.bronze_webhook_transactions
    ORDER BY ingested_at DESC
    LIMIT 10;
    """, "10 Most Recent Webhook Transactions")
    
    display(recent_activity)
except Exception as e:
    print(f"Table might not exist yet or is empty: {e}")

## 9. Data Quality Checks
Identify potential data quality issues

In [None]:
try:
    data_quality = query_table("""
    SELECT 
        'Total Records' as metric,
        COUNT(*) as count,
        'N/A' as percentage
    FROM bronze.bronze_webhook_transactions
    
    UNION ALL
    
    SELECT 
        'Missing Transaction Signature' as metric,
        COUNT(*) as count,
        ROUND(COUNT(*) * 100.0 / (SELECT COUNT(*) FROM bronze.bronze_webhook_transactions), 2) || '%' as percentage
    FROM bronze.bronze_webhook_transactions
    WHERE transaction_signature IS NULL
    
    UNION ALL
    
    SELECT 
        'Missing Account Address' as metric,
        COUNT(*) as count,
        ROUND(COUNT(*) * 100.0 / (SELECT COUNT(*) FROM bronze.bronze_webhook_transactions), 2) || '%' as percentage
    FROM bronze.bronze_webhook_transactions
    WHERE account_address IS NULL
    
    UNION ALL
    
    SELECT 
        'Missing Token Address' as metric,
        COUNT(*) as count,
        ROUND(COUNT(*) * 100.0 / (SELECT COUNT(*) FROM bronze.bronze_webhook_transactions), 2) || '%' as percentage
    FROM bronze.bronze_webhook_transactions
    WHERE token_address IS NULL
    
    UNION ALL
    
    SELECT 
        'Empty Raw Payload' as metric,
        COUNT(*) as count,
        ROUND(COUNT(*) * 100.0 / (SELECT COUNT(*) FROM bronze.bronze_webhook_transactions), 2) || '%' as percentage
    FROM bronze.bronze_webhook_transactions
    WHERE raw_payload = '{}' OR raw_payload IS NULL;
    """, "Data Quality Assessment")
    
    display(data_quality)
except Exception as e:
    print(f"Table might not exist yet or is empty: {e}")

## 10. Sample Raw Payload
Examine a sample webhook payload to understand the data structure

In [None]:
try:
    sample_payload = query_table("""
    SELECT 
        webhook_id,
        webhook_type,
        transaction_signature,
        account_address,
        token_address,
        raw_payload
    FROM bronze.bronze_webhook_transactions
    WHERE raw_payload IS NOT NULL AND raw_payload != '{}'
    ORDER BY ingested_at DESC
    LIMIT 1;
    """, "Sample Webhook Payload")
    
    if not sample_payload.empty:
        display(sample_payload)
        
        # Pretty print the JSON payload
        if 'raw_payload' in sample_payload.columns:
            try:
                payload_json = sample_payload.iloc[0]['raw_payload']
                if payload_json:
                    print("\nFormatted Raw Payload:")
                    print("=" * 50)
                    print(json.dumps(payload_json, indent=2))
            except Exception as e:
                print(f"Error formatting JSON: {e}")
    else:
        print("No webhook data with payloads found yet.")
        
except Exception as e:
    print(f"Table might not exist yet or is empty: {e}")

## 11. System Health
Monitor the health of the webhook ingestion system

In [None]:
try:
    system_health = query_table("""
    SELECT 
        'Webhooks in last hour' as metric,
        COUNT(*) as value
    FROM bronze.bronze_webhook_transactions
    WHERE ingested_at >= NOW() - INTERVAL '1 hour'
    
    UNION ALL
    
    SELECT 
        'Webhooks in last 24 hours' as metric,
        COUNT(*) as value
    FROM bronze.bronze_webhook_transactions
    WHERE ingested_at >= NOW() - INTERVAL '24 hours'
    
    UNION ALL
    
    SELECT 
        'Failed processing count' as metric,
        COUNT(*) as value
    FROM bronze.bronze_webhook_transactions
    WHERE processing_status = 'failed'
    
    UNION ALL
    
    SELECT 
        'Pending processing count' as metric,
        COUNT(*) as value
    FROM bronze.bronze_webhook_transactions
    WHERE processing_status = 'pending'
    
    UNION ALL
    
    SELECT 
        'Average ingestion delay (seconds)' as metric,
        ROUND(AVG(EXTRACT(EPOCH FROM (ingested_at - webhook_timestamp))), 2) as value
    FROM bronze.bronze_webhook_transactions
    WHERE webhook_timestamp IS NOT NULL 
    AND ingested_at >= NOW() - INTERVAL '24 hours';
    """, "System Health Metrics")
    
    display(system_health)
except Exception as e:
    print(f"Table might not exist yet or is empty: {e}")