# Dashboard Data Preparation

This notebook pulls data from Snowflake to be fed into a Streamlit dashboard

In [1]:
# Import required libraries
import os
import pandas as pd
from datetime import datetime
from snowflake_engine import SnowflakeEngine # custom SQLAlchemy Engine Wrapper
from dotenv import load_dotenv
import warnings
warnings.filterwarnings('ignore')

# Load environment variables
load_dotenv()

  from pandas.core import (


True

In [2]:
# Initialize Snowflake connection using custom engine
sf = SnowflakeEngine()
sf.test_connection()

Connected as: OCTOPYTH0N


True

In [3]:
# implementation with SnowflakeEngine
tables_df = sf.query("""SHOW TABLES IN SCHEMA SNOWFLAKE_SAMPLE_DATA.TPCH_SF1""")

print(f"\nAvailable tables ({len(tables_df)}):")
for table in tables_df['name'].values:
    print(f"- {table}")


Available tables (8):
- CUSTOMER
- LINEITEM
- NATION
- ORDERS
- PART
- PARTSUPP
- REGION
- SUPPLIER


## 1. Revenue Trend with Moving Average
Monthly revenue data with 3-month moving average for trend analysis

In [4]:
# Query 1: Revenue Trend with Moving Average
# Aggregating by month to reduce data volume
revenue_trend_query = """
WITH monthly_revenue AS (
    SELECT 
        DATE_TRUNC('month', O_ORDERDATE) as month,
        SUM(O_TOTALPRICE) as revenue,
        COUNT(*) as order_count,
        COUNT(DISTINCT O_CUSTKEY) as unique_customers
    FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.ORDERS
    WHERE O_ORDERDATE >= '1995-01-01'  -- Limiting to last 3-4 years to reduce data
        AND O_ORDERDATE < '1998-08-01'  -- Exclude incomplete August 1998 data
    GROUP BY DATE_TRUNC('month', O_ORDERDATE)
)
SELECT 
    month,
    revenue,
    order_count,
    unique_customers,
    -- 3-month moving average
    AVG(revenue) OVER (
        ORDER BY month 
        ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
    ) as revenue_ma3,
    -- Month-over-month growth
    LAG(revenue, 1) OVER (ORDER BY month) as prev_month_revenue,
    ((revenue - LAG(revenue, 1) OVER (ORDER BY month)) / 
     NULLIF(LAG(revenue, 1) OVER (ORDER BY month), 0) * 100) as mom_growth_pct
FROM monthly_revenue
ORDER BY month
"""

df_revenue_trend = sf.query(revenue_trend_query)
# convert date to datetime object
df_revenue_trend['month'] = pd.to_datetime(df_revenue_trend['month']) 

print(f"Retrieved {len(df_revenue_trend)} months of revenue data")
print(f"Date range: {df_revenue_trend['month'].min()} to {df_revenue_trend['month'].max()}")
print("\nFirst 3 rows:")
print(df_revenue_trend.head(3))

Retrieved 43 months of revenue data
Date range: 1995-01-01 00:00:00 to 1998-07-01 00:00:00

First 3 rows:
       month       revenue  order_count  unique_customers   revenue_ma3  \
0 1995-01-01  2.946587e+09        19472             17525  2.946587e+09   
1 1995-02-01  2.689142e+09        17721             16116  2.817865e+09   
2 1995-03-01  2.910048e+09        19313             17333  2.848593e+09   

   prev_month_revenue  mom_growth_pct  
0                 NaN             NaN  
1        2.946587e+09       -8.737066  
2        2.689142e+09        8.214749  


## 2. Product Revenue by Country
Monthly revenue contribution by product (part) for each country and aggregated for the world

In [5]:
# Query 6: Product Revenue by Country - Time Series
# Getting monthly revenue contribution by part for each country + World aggregate
product_revenue_query = """
-- Country-level product revenue
WITH country_product_revenue AS (
    SELECT 
        DATE_TRUNC('month', o.O_ORDERDATE) as month,
        n.N_NAME as country,
        p.P_TYPE as product_type,
        p.P_PARTKEY as product_id,
        SUM(l.L_EXTENDEDPRICE) as revenue
    FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.LINEITEM l
    JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.ORDERS o 
        ON l.L_ORDERKEY = o.O_ORDERKEY
    JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.PART p 
        ON l.L_PARTKEY = p.P_PARTKEY
    JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.CUSTOMER c 
        ON o.O_CUSTKEY = c.C_CUSTKEY
    JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.NATION n 
        ON c.C_NATIONKEY = n.N_NATIONKEY
    WHERE o.O_ORDERDATE >= '1995-01-01'
        AND o.O_ORDERDATE < '1998-08-01'
    GROUP BY DATE_TRUNC('month', o.O_ORDERDATE), n.N_NAME, p.P_TYPE, p.P_PARTKEY
),
-- World aggregate product revenue
world_product_revenue AS (
    SELECT 
        DATE_TRUNC('month', o.O_ORDERDATE) as month,
        'World' as country,
        p.P_TYPE as product_type,
        p.P_PARTKEY as product_id,
        SUM(l.L_EXTENDEDPRICE) as revenue
    FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.LINEITEM l
    JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.ORDERS o 
        ON l.L_ORDERKEY = o.O_ORDERKEY
    JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.PART p 
        ON l.L_PARTKEY = p.P_PARTKEY
    WHERE o.O_ORDERDATE >= '1995-01-01'
        AND o.O_ORDERDATE < '1998-08-01'
    GROUP BY DATE_TRUNC('month', o.O_ORDERDATE), p.P_TYPE, p.P_PARTKEY
)
-- Combine country and world data
SELECT * FROM country_product_revenue
UNION ALL
SELECT * FROM world_product_revenue
ORDER BY month, country, product_type
"""

df_product_revenue = sf.query(product_revenue_query)
df_product_revenue['month'] = pd.to_datetime(df_product_revenue['month'])

# Extract only the last word from product_type (e.g., "STANDARD POLISHED COPPER" -> "COPPER")
df_product_revenue['product_type'] = df_product_revenue['product_type'].str.split().str[-1]

print(f"\nRetrieved {len(df_product_revenue):,} rows of product revenue data")
print(f"Countries: {df_product_revenue['country'].nunique()} (including World)")
print(f"Unique product types: {df_product_revenue['product_type'].nunique()}")
print(f"Date range: {df_product_revenue['month'].min()} to {df_product_revenue['month'].max()}")
print("\nSample data:")
print(df_product_revenue.head(10))
print(f"\nUnique product types: {sorted(df_product_revenue['product_type'].unique())}")


Retrieved 5,965,025 rows of product revenue data
Countries: 26 (including World)
Unique product types: 5
Date range: 1995-01-01 00:00:00 to 1998-07-01 00:00:00

Sample data:
       month  country product_type  product_id   revenue
0 1995-01-01  ALGERIA        BRASS      167817  37696.20
1 1995-01-01  ALGERIA        BRASS      107282  39967.68
2 1995-01-01  ALGERIA        BRASS      180482  34374.56
3 1995-01-01  ALGERIA        BRASS      165483  44905.92
4 1995-01-01  ALGERIA        BRASS      181585  69996.36
5 1995-01-01  ALGERIA        BRASS      187896   5951.67
6 1995-01-01  ALGERIA        BRASS      197529  63434.28
7 1995-01-01  ALGERIA        BRASS      111767  53362.80
8 1995-01-01  ALGERIA        BRASS       73130  46331.46
9 1995-01-01  ALGERIA        BRASS      127509  41485.50

Unique product types: ['BRASS', 'COPPER', 'NICKEL', 'STEEL', 'TIN']


## 3. Geographic Anomaly Detection
Monthly metrics by country with statistical baselines for anomaly detection

In [6]:
# Query 7: Geographic Anomaly Detection
# Pull monthly metrics by country with statistical baselines
import numpy as np

geographic_anomaly_query = """
WITH monthly_country_metrics AS (
    SELECT 
        DATE_TRUNC('month', o.O_ORDERDATE) as month,
        n.N_NAME as country,
        n.N_NATIONKEY as country_key,
        r.R_NAME as region,
        
        -- Core metrics
        COUNT(DISTINCT o.O_ORDERKEY) as order_count,
        COUNT(DISTINCT c.C_CUSTKEY) as unique_customers,
        SUM(o.O_TOTALPRICE) as total_revenue,
        AVG(o.O_TOTALPRICE) as avg_order_value,
        
        -- Operational metrics
        SUM(CASE WHEN o.O_ORDERSTATUS = 'F' THEN 1 ELSE 0 END) as fulfilled_orders,
        SUM(CASE WHEN o.O_ORDERSTATUS = 'O' THEN 1 ELSE 0 END) as open_orders,
        
        -- Calculate orders per customer
        COUNT(DISTINCT o.O_ORDERKEY) / NULLIF(COUNT(DISTINCT c.C_CUSTKEY), 0) as orders_per_customer
        
    FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.ORDERS o
    JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.CUSTOMER c 
        ON o.O_CUSTKEY = c.C_CUSTKEY
    JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.NATION n 
        ON c.C_NATIONKEY = n.N_NATIONKEY
    JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.REGION r 
        ON n.N_REGIONKEY = r.R_REGIONKEY
    WHERE o.O_ORDERDATE >= '1995-01-01'
        AND o.O_ORDERDATE < '1998-08-01'
    GROUP BY 
        DATE_TRUNC('month', o.O_ORDERDATE),
        n.N_NAME,
        n.N_NATIONKEY,
        r.R_NAME
),

-- Calculate historical statistics for each country (for anomaly detection)
country_baselines AS (
    SELECT 
        country,
        country_key,
        region,
        
        -- Revenue statistics
        AVG(total_revenue) as avg_revenue,
        STDDEV(total_revenue) as stddev_revenue,
        PERCENTILE_CONT(0.25) WITHIN GROUP (ORDER BY total_revenue) as q1_revenue,
        PERCENTILE_CONT(0.75) WITHIN GROUP (ORDER BY total_revenue) as q3_revenue,
        
        -- Order count statistics
        AVG(order_count) as avg_orders,
        STDDEV(order_count) as stddev_orders,
        
        -- Customer statistics
        AVG(unique_customers) as avg_customers,
        STDDEV(unique_customers) as stddev_customers,
        
        -- Average order value statistics
        AVG(avg_order_value) as avg_aov,
        STDDEV(avg_order_value) as stddev_aov,
        
        -- Fulfillment rate statistics
        AVG(fulfilled_orders * 100.0 / NULLIF(order_count, 0)) as avg_fulfillment_rate,
        STDDEV(fulfilled_orders * 100.0 / NULLIF(order_count, 0)) as stddev_fulfillment_rate
        
    FROM monthly_country_metrics
    GROUP BY country, country_key, region
)

-- Join current metrics with baselines
SELECT 
    m.month,
    m.country,
    m.country_key,
    m.region,
    m.order_count,
    m.unique_customers,
    m.total_revenue,
    m.avg_order_value,
    m.fulfilled_orders,
    m.open_orders,
    m.orders_per_customer,
    
    -- Baseline statistics
    b.avg_revenue,
    b.stddev_revenue,
    b.q1_revenue,
    b.q3_revenue,
    b.avg_orders,
    b.stddev_orders,
    b.avg_customers,
    b.stddev_customers,
    b.avg_aov,
    b.stddev_aov,
    b.avg_fulfillment_rate,
    b.stddev_fulfillment_rate,
    
    -- Calculate Z-scores for anomaly detection
    CASE 
        WHEN b.stddev_revenue > 0 
        THEN (m.total_revenue - b.avg_revenue) / b.stddev_revenue
        ELSE 0
    END as revenue_zscore,
    
    CASE 
        WHEN b.stddev_orders > 0 
        THEN (m.order_count - b.avg_orders) / b.stddev_orders
        ELSE 0
    END as orders_zscore,
    
    CASE 
        WHEN b.stddev_customers > 0 
        THEN (m.unique_customers - b.avg_customers) / b.stddev_customers
        ELSE 0
    END as customers_zscore,
    
    CASE 
        WHEN b.stddev_aov > 0 
        THEN (m.avg_order_value - b.avg_aov) / b.stddev_aov
        ELSE 0
    END as aov_zscore,
    
    -- IQR-based anomaly detection for revenue
    b.q3_revenue + 1.5 * (b.q3_revenue - b.q1_revenue) as revenue_upper_fence,
    b.q1_revenue - 1.5 * (b.q3_revenue - b.q1_revenue) as revenue_lower_fence,
    
    -- Flag if revenue is outside IQR fences
    CASE 
        WHEN m.total_revenue > (b.q3_revenue + 1.5 * (b.q3_revenue - b.q1_revenue))
            OR m.total_revenue < (b.q1_revenue - 1.5 * (b.q3_revenue - b.q1_revenue))
        THEN 1
        ELSE 0
    END as revenue_iqr_outlier

FROM monthly_country_metrics m
JOIN country_baselines b ON m.country = b.country
ORDER BY m.month DESC, m.country
"""

# Execute query
df_geographic_anomalies = sf.query(geographic_anomaly_query)
df_geographic_anomalies['month'] = pd.to_datetime(df_geographic_anomalies['month'])

# Sort by country and month for MoM calculations
df_geographic_anomalies = df_geographic_anomalies.sort_values(['country', 'month'])

# 1. Month-over-month change detection
df_geographic_anomalies['revenue_mom_change'] = df_geographic_anomalies.groupby('country')['total_revenue'].pct_change() * 100
df_geographic_anomalies['orders_mom_change'] = df_geographic_anomalies.groupby('country')['order_count'].pct_change() * 100

# 2. Flag sudden changes (>30% swing)
df_geographic_anomalies['revenue_spike'] = df_geographic_anomalies['revenue_mom_change'].abs() > 30
df_geographic_anomalies['orders_spike'] = df_geographic_anomalies['orders_mom_change'].abs() > 30

# 3. Correlation breakdown detection
# Expected revenue based on order count and avg order value
df_geographic_anomalies['expected_revenue'] = df_geographic_anomalies['order_count'] * df_geographic_anomalies['avg_aov']
df_geographic_anomalies['revenue_deviation_pct'] = (
    (df_geographic_anomalies['total_revenue'] - df_geographic_anomalies['expected_revenue']) / 
    df_geographic_anomalies['expected_revenue'] * 100
)
df_geographic_anomalies['correlation_breakdown'] = df_geographic_anomalies['revenue_deviation_pct'].abs() > 20

# 4. Fulfillment rate anomalies
df_geographic_anomalies['fulfillment_rate'] = (
    df_geographic_anomalies['fulfilled_orders'] / df_geographic_anomalies['order_count'] * 100
)
df_geographic_anomalies['fulfillment_anomaly'] = (
    (df_geographic_anomalies['fulfillment_rate'] < 
     df_geographic_anomalies['avg_fulfillment_rate'] - 2 * df_geographic_anomalies['stddev_fulfillment_rate']) |
    (df_geographic_anomalies['fulfillment_rate'] > 
     df_geographic_anomalies['avg_fulfillment_rate'] + 2 * df_geographic_anomalies['stddev_fulfillment_rate'])
)

# 5. Z-score based anomalies (threshold = 2.0)
df_geographic_anomalies['revenue_anomaly'] = df_geographic_anomalies['revenue_zscore'].abs() > 2.0
df_geographic_anomalies['orders_anomaly'] = df_geographic_anomalies['orders_zscore'].abs() > 2.0
df_geographic_anomalies['customers_anomaly'] = df_geographic_anomalies['customers_zscore'].abs() > 2.0
df_geographic_anomalies['aov_anomaly'] = df_geographic_anomalies['aov_zscore'].abs() > 2.0

# 6. Calculate composite anomaly score (0-100)
# Weight different anomaly types
weights = {
    'revenue_zscore': 0.30,
    'orders_zscore': 0.20,
    'customers_zscore': 0.15,
    'aov_zscore': 0.10,
    'revenue_mom_change': 0.15,
    'revenue_deviation_pct': 0.10
}

# Normalize each component to 0-100 scale
df_geographic_anomalies['revenue_component'] = np.clip(df_geographic_anomalies['revenue_zscore'].abs() * 20, 0, 100) * weights['revenue_zscore']
df_geographic_anomalies['orders_component'] = np.clip(df_geographic_anomalies['orders_zscore'].abs() * 20, 0, 100) * weights['orders_zscore']
df_geographic_anomalies['customers_component'] = np.clip(df_geographic_anomalies['customers_zscore'].abs() * 20, 0, 100) * weights['customers_zscore']
df_geographic_anomalies['aov_component'] = np.clip(df_geographic_anomalies['aov_zscore'].abs() * 20, 0, 100) * weights['aov_zscore']
df_geographic_anomalies['mom_component'] = np.clip(df_geographic_anomalies['revenue_mom_change'].abs() / 3, 0, 100) * weights['revenue_mom_change']
df_geographic_anomalies['deviation_component'] = np.clip(df_geographic_anomalies['revenue_deviation_pct'].abs() / 2, 0, 100) * weights['revenue_deviation_pct']

# Sum weighted components
df_geographic_anomalies['anomaly_score'] = (
    df_geographic_anomalies['revenue_component'] +
    df_geographic_anomalies['orders_component'] +
    df_geographic_anomalies['customers_component'] +
    df_geographic_anomalies['aov_component'] +
    df_geographic_anomalies['mom_component'] +
    df_geographic_anomalies['deviation_component']
)

# 7. Categorize severity - QUARTILE-BASED RANGES
df_geographic_anomalies['anomaly_severity'] = pd.cut(
    df_geographic_anomalies['anomaly_score'],
    bins=[-np.inf, 25, 50, 75, np.inf],
    labels=['Normal', 'Minor', 'Moderate', 'Severe']
)

# 8. List specific anomaly types detected
def get_anomaly_types(row):
    anomalies = []
    if row['revenue_anomaly']:
        direction = 'spike' if row['revenue_zscore'] > 0 else 'drop'
        anomalies.append(f"Revenue {direction} (Z={row['revenue_zscore']:.1f})")
    if row['orders_anomaly']:
        direction = 'spike' if row['orders_zscore'] > 0 else 'drop'
        anomalies.append(f"Order volume {direction} (Z={row['orders_zscore']:.1f})")
    if row['customers_anomaly']:
        direction = 'increase' if row['customers_zscore'] > 0 else 'decrease'
        anomalies.append(f"Customer count {direction}")
    if row['aov_anomaly']:
        anomalies.append("AOV shift")
    if row['revenue_spike'] and not pd.isna(row['revenue_mom_change']):
        anomalies.append(f"Revenue MoM: {row['revenue_mom_change']:.1f}%")
    if row['correlation_breakdown']:
        anomalies.append("Rev-orders correlation breakdown")
    if row['fulfillment_anomaly']:
        anomalies.append("Fulfillment rate anomaly")
    
    return ', '.join(anomalies) if anomalies else 'None'

df_geographic_anomalies['anomaly_types'] = df_geographic_anomalies.apply(get_anomaly_types, axis=1)

# Country name to ISO-3 code mapping
COUNTRY_CODE_MAP = {
    'ALGERIA': 'DZA',
    'ARGENTINA': 'ARG',
    'BRAZIL': 'BRA',
    'CANADA': 'CAN',
    'EGYPT': 'EGY',
    'ETHIOPIA': 'ETH',
    'FRANCE': 'FRA',
    'GERMANY': 'DEU',
    'INDIA': 'IND',
    'INDONESIA': 'IDN',
    'IRAN': 'IRN',
    'IRAQ': 'IRQ',
    'JAPAN': 'JPN',
    'JORDAN': 'JOR',
    'KENYA': 'KEN',
    'MOROCCO': 'MAR',
    'MOZAMBIQUE': 'MOZ',
    'PERU': 'PER',
    'CHINA': 'CHN',
    'ROMANIA': 'ROU',
    'SAUDI ARABIA': 'SAU',
    'VIETNAM': 'VNM',
    'RUSSIA': 'RUS',
    'UNITED KINGDOM': 'GBR',
    'UNITED STATES': 'USA'
}

# Add ISO-3 country codes for mapping
df_geographic_anomalies['country_code'] = df_geographic_anomalies['country'].map(COUNTRY_CODE_MAP)

# Check for missing country codes
missing = df_geographic_anomalies[df_geographic_anomalies['country_code'].isna()]['country'].unique()
if len(missing) > 0:
    print(f"\nWarning: Missing country codes for: {missing}")

### How the Anomaly Score is Calculated

The **Anomaly Score** (0-100) is a composite metric that identifies unusual patterns in country-level performance. It combines multiple detection methods with different weights:

#### Score Calculation:

Each component is normalized to a 0-100 scale and multiplied by its weight:
- **Revenue z-score** : 30% weight
- **Orders z-score** : 20% weight
- **Customer count z-score** : 15% weight
- **Average Order Value (AOV) z-score** : 10% weight
- **MoM Revenue Change** : 15% weight
- **Difference between actual and expected revenue based on order count and average AOV** : 10% weight

**Final Score** = Sum of all weighted components (0-100)

#### Severity Categories:
- **Normal** (0-25)
- **Minor** (25-50)
- **Moderate** (50-75)
- **Severe** (75-100)

## Export Data to CSV Files
Save all datasets to the tables folder for use in Streamlit dashboard

In [7]:
## Create tables directory if it doesn't exist
#os.makedirs('tables', exist_ok=True)

# Export all dataframes to CSV
# 1. Revenue Trend
df_revenue_trend.to_csv('tables/revenue_trend.csv', index=False)

# 2. Product revenue by country
df_product_revenue.to_csv('tables/product_revenue_by_country.csv', index=False)

# 3. Geographic Anomalies
df_geographic_anomalies.to_csv('tables/geographic_anomalies.csv', index=False)

In [8]:
# Close the Snowflake connection
sf.dispose()

Connection closed
