# Chapter 8: Pipeline and Stats Engine - Code Verification

This notebook verifies all code examples from Chapter 8, including:
- SQL syntax validation (dbt models)
- Python statistical engine implementation
- Airflow DAG structure validation

## Test Data Setup

We'll create mock DataFrames that simulate the warehouse tables to test the SQL logic.

In [None]:
# Import required libraries
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from statsmodels.stats.proportion import proportions_ztest
from statsmodels.stats.weightstats import ttest_ind
import warnings
warnings.filterwarnings('ignore')

print("Libraries imported successfully")
print(f"pandas version: {pd.__version__}")
print(f"numpy version: {np.__version__}")

## 1. Mock Data Generation

Simulate warehouse tables: `experiment_metadata`, `assignments`, `actions`

In [None]:
# Mock experiment_metadata table
experiment_metadata = pd.DataFrame({
    'experiment_id': ['exp-new-checkout-flow-v2', 'exp-promo-banner-color', 'exp-old-feature-test'],
    'start_date': [datetime(2025, 11, 1), datetime(2025, 11, 5), datetime(2025, 10, 10)],
    'end_date': [datetime(2025, 11, 15), datetime(2025, 11, 12), datetime(2025, 10, 25)],
    'status': ['running', 'running', 'completed']
})

print("Experiment Metadata:")
print(experiment_metadata)
print(f"\nShape: {experiment_metadata.shape}")

In [None]:
# Mock assignments table (10,000 users)
np.random.seed(42)
n_users = 10000

assignments = pd.DataFrame({
    'user_id': [f'user_{i:05d}' for i in range(n_users)],
    'timestamp': [datetime(2025, 11, 1) + timedelta(hours=np.random.randint(0, 240)) for _ in range(n_users)],
    'experiment_id': np.random.choice(['exp-new-checkout-flow-v2', 'exp-promo-banner-color'], n_users),
    'variant_id': np.random.choice(['control', 'treatment'], n_users)
})

print("Sample Assignments:")
print(assignments.head(10))
print(f"\nTotal assignments: {len(assignments)}")
print(f"Variant distribution:\n{assignments['variant_id'].value_counts()}")

In [None]:
# Mock actions table (simulate purchase events)
# Generate 20,000 action events across users
n_actions = 20000

actions = pd.DataFrame({
    'user_id': np.random.choice(assignments['user_id'].values, n_actions),
    'timestamp': [datetime(2025, 11, 1) + timedelta(hours=np.random.randint(1, 250)) for _ in range(n_actions)],
    'action_type': np.random.choice(['page_view', 'add_to_cart', 'purchase'], n_actions, p=[0.6, 0.3, 0.1]),
    'target_name': [f'product_{np.random.randint(1, 100)}' for _ in range(n_actions)],
    'page_name': np.random.choice(['home', 'product', 'checkout', 'confirmation'], n_actions),
    'value': [np.random.uniform(10, 200) if np.random.random() < 0.1 else 0 for _ in range(n_actions)]
})

print("Sample Actions:")
print(actions.head(10))
print(f"\nTotal actions: {len(actions)}")
print(f"Action type distribution:\n{actions['action_type'].value_counts()}")

## 2. SQL Logic Verification - Core Join

Test the `int_experiment_actions` logic in Python (pandas equivalent of the SQL)

In [None]:
# Simulate int_experiment_actions.sql logic

# Filter for active experiments
active_experiments = experiment_metadata[experiment_metadata['status'] == 'running'].copy()

# Join assignments with active experiments
assignments_filtered = assignments.merge(
    active_experiments[['experiment_id', 'start_date', 'end_date']], 
    on='experiment_id',
    how='inner'
)
assignments_filtered = assignments_filtered[
    assignments_filtered['timestamp'] >= assignments_filtered['start_date']
].copy()
assignments_filtered.rename(columns={'timestamp': 'assignment_timestamp'}, inplace=True)

# Filter actions to relevant timeframe
min_start_date = active_experiments['start_date'].min()
actions_filtered = actions[actions['timestamp'] >= min_start_date].copy()
actions_filtered.rename(columns={'timestamp': 'action_timestamp'}, inplace=True)

# Core Join: Join assignments with actions
int_experiment_actions = assignments_filtered.merge(
    actions_filtered,
    on='user_id',
    how='inner'
)

# Critical filter: actions must occur AFTER assignment
int_experiment_actions = int_experiment_actions[
    int_experiment_actions['action_timestamp'] >= int_experiment_actions['assignment_timestamp']
].copy()

# Select final columns
int_experiment_actions = int_experiment_actions[[
    'user_id', 'assignment_timestamp', 'experiment_id', 'variant_id',
    'action_timestamp', 'action_type', 'target_name', 'page_name', 'value'
]]

print("✓ Core Join (int_experiment_actions) executed successfully")
print(f"\nRows after join: {len(int_experiment_actions)}")
print("\nSample joined data:")
print(int_experiment_actions.head(10))

# Validation checks
assert len(int_experiment_actions) > 0, "Join produced no results"
assert (int_experiment_actions['action_timestamp'] >= int_experiment_actions['assignment_timestamp']).all(), \
    "Some actions occurred before assignment (temporal integrity violation)"
print("\n✓ All temporal integrity checks passed")

## 3. SQL Logic Verification - Aggregation Layer

Test the `mart_experiment_user_metrics` logic

In [None]:
# Simulate mart_experiment_user_metrics.sql logic

mart_experiment_user_metrics = int_experiment_actions.groupby(
    ['user_id', 'experiment_id', 'variant_id']
).agg(
    # Metric 1: Conversion (boolean - did user make any purchase?)
    converted=('action_type', lambda x: 1 if (x == 'purchase').any() else 0),
    
    # Metric 2: Total revenue
    total_revenue=('value', lambda x: x[int_experiment_actions.loc[x.index, 'action_type'] == 'purchase'].sum()),
    
    # Metric 3: Items in cart
    items_in_cart=('action_type', lambda x: (x == 'add_to_cart').sum())
).reset_index()

print("✓ Aggregation Layer (mart_experiment_user_metrics) executed successfully")
print(f"\nTotal users in analysis: {len(mart_experiment_user_metrics)}")
print("\nSample aggregated metrics:")
print(mart_experiment_user_metrics.head(10))

# Summary statistics
print("\n=== Metric Summary by Experiment and Variant ===")
summary = mart_experiment_user_metrics.groupby(['experiment_id', 'variant_id']).agg({
    'user_id': 'count',
    'converted': ['sum', 'mean'],
    'total_revenue': ['sum', 'mean'],
    'items_in_cart': 'mean'
}).round(4)
print(summary)

# Validation checks
assert len(mart_experiment_user_metrics) > 0, "Aggregation produced no results"
assert mart_experiment_user_metrics['converted'].isin([0, 1]).all(), "Converted metric must be binary (0 or 1)"
assert (mart_experiment_user_metrics['total_revenue'] >= 0).all(), "Revenue cannot be negative"
assert (mart_experiment_user_metrics['items_in_cart'] >= 0).all(), "Item count cannot be negative"
print("\n✓ All aggregation validation checks passed")

## 4. Python Statistical Engine Verification

Test the complete statistical analysis engine from Chapter 8

In [None]:
# Define the statistical analysis functions from Chapter 8

def analyze_experiment_results(df: pd.DataFrame, metadata: pd.DataFrame, run_date: datetime):
    """
    Analyzes results for all relevant experiments in the DataFrame.
    This is the exact function from Chapter 8.
    """
    # Define the cool-down period for late-arriving data
    cool_down_period = timedelta(days=3)

    # Convert metadata to dict for easier access
    metadata_dict = metadata.set_index('experiment_id').to_dict('index')

    # In a real platform, this would come from the metric repository
    metrics_to_analyze = [
        {'name': 'converted', 'type': 'binomial'},
        {'name': 'total_revenue', 'type': 'continuous'}
    ]

    all_experiments_in_data = df['experiment_id'].unique()

    results = []

    for exp_id in all_experiments_in_data:
        exp_meta = metadata_dict.get(exp_id, {})
        if not exp_meta:
            print(f"Skipping analysis for '{exp_id}': No metadata found.")
            continue
        
        exp_data = df[df['experiment_id'] == exp_id]

        is_running = exp_meta.get('status') == 'running'
        is_in_cooldown = (exp_meta.get('status') == 'completed') and \
                         (run_date <= exp_meta.get('end_date', run_date - timedelta(days=99)) + cool_down_period)

        if not (is_running or is_in_cooldown):
            print(f"Skipping analysis for '{exp_id}': Completed and past cool-down period.")
            continue

        analysis_type = "Interim" if is_running else "Final (in cool-down)"
        print(f"\n{'='*60}")
        print(f"{analysis_type} Analysis for Experiment: {exp_id}")
        print(f"{'='*60}")

        control = exp_data[exp_data['variant_id'] == 'control']
        treatment = exp_data[exp_data['variant_id'] == 'treatment']

        if len(control) == 0 or len(treatment) == 0:
            print("⚠ Skipping analysis: control or treatment group has zero users.")
            continue

        print(f"Sample sizes - Control: {len(control)}, Treatment: {len(treatment)}")

        for metric in metrics_to_analyze:
            metric_name = metric['name']
            metric_type = metric['type']
            
            try:
                print(f"\n--- Metric: {metric_name} ({metric_type}) ---")
                if metric_type == 'binomial':
                    control_conversions = control[metric_name].sum()
                    treatment_conversions = treatment[metric_name].sum()
                    n_control = len(control)
                    n_treatment = len(treatment)
                    
                    stat, p_value = proportions_ztest(
                        [treatment_conversions, control_conversions], 
                        [n_treatment, n_control]
                    )
                    
                    control_rate = control_conversions / n_control
                    treatment_rate = treatment_conversions / n_treatment
                    lift = (treatment_rate - control_rate) / control_rate if control_rate > 0 else float('inf')

                    print(f"  Control Rate: {control_rate:.4f} ({control_conversions}/{n_control})")
                    print(f"  Treatment Rate: {treatment_rate:.4f} ({treatment_conversions}/{n_treatment})")
                    print(f"  Lift: {lift:+.2%}")

                elif metric_type == 'continuous':
                    stat, p_value, _ = ttest_ind(
                        treatment[metric_name], 
                        control[metric_name], 
                        usevar='unequal'
                    )
                    
                    control_mean = control[metric_name].mean()
                    treatment_mean = treatment[metric_name].mean()
                    lift = (treatment_mean - control_mean) / control_mean if control_mean > 0 else float('inf')

                    print(f"  Control Mean: ${control_mean:.2f}")
                    print(f"  Treatment Mean: ${treatment_mean:.2f}")
                    print(f"  Lift: {lift:+.2%}")

                print(f"  P-value: {p_value:.5f}")
                significance = "✓ Statistically Significant" if p_value < 0.05 else "✗ Not Statistically Significant"
                print(f"  Result: {significance}")

                results.append({
                    'experiment_id': exp_id,
                    'metric': metric_name,
                    'analysis_type': analysis_type,
                    'p_value': p_value,
                    'lift': lift,
                    'significant': p_value < 0.05
                })

            except Exception as e:
                print(f"  ⚠ Error analyzing metric '{metric_name}': {e}")
    
    return pd.DataFrame(results)

print("✓ Statistical engine functions defined successfully")

In [None]:
# Run the statistical analysis
RUN_DATE = datetime(2025, 11, 10)  # Simulate running on this date

print(f"Running analysis for date: {RUN_DATE.strftime('%Y-%m-%d')}\n")

results_df = analyze_experiment_results(
    mart_experiment_user_metrics, 
    experiment_metadata, 
    RUN_DATE
)

print("\n" + "="*60)
print("ANALYSIS SUMMARY")
print("="*60)
print(results_df)

# Validation
assert len(results_df) > 0, "Statistical analysis produced no results"
assert 'p_value' in results_df.columns, "Missing p_value in results"
assert 'lift' in results_df.columns, "Missing lift in results"
assert (results_df['p_value'] >= 0).all() and (results_df['p_value'] <= 1).all(), "P-values must be between 0 and 1"
print("\n✓ All statistical engine validation checks passed")

## 5. Test Individual Statistical Functions

Verify statsmodels integration with simple test cases

In [None]:
# Test binomial proportion z-test
print("Testing Binomial Proportion Z-Test:")
control_conv = 100
treatment_conv = 120
n_control = 1000
n_treatment = 1000

stat, p_value = proportions_ztest(
    [treatment_conv, control_conv], 
    [n_treatment, n_control]
)

print(f"  Control: {control_conv}/{n_control} = {control_conv/n_control:.1%}")
print(f"  Treatment: {treatment_conv}/{n_treatment} = {treatment_conv/n_treatment:.1%}")
print(f"  Z-statistic: {stat:.4f}")
print(f"  P-value: {p_value:.4f}")
print(f"  Result: {'Significant' if p_value < 0.05 else 'Not significant'} at α=0.05")

assert isinstance(stat, (int, float)), "Z-statistic should be numeric"
assert 0 <= p_value <= 1, "P-value must be between 0 and 1"
print("\n✓ Binomial test validated")

In [None]:
# Test continuous metric t-test
print("\nTesting Continuous Metric T-Test:")
np.random.seed(42)
control_revenue = np.random.normal(50, 20, 1000)  # mean=$50, std=$20
treatment_revenue = np.random.normal(55, 20, 1000)  # mean=$55, std=$20

stat, p_value, _ = ttest_ind(treatment_revenue, control_revenue, usevar='unequal')

print(f"  Control mean: ${control_revenue.mean():.2f}")
print(f"  Treatment mean: ${treatment_revenue.mean():.2f}")
print(f"  T-statistic: {stat:.4f}")
print(f"  P-value: {p_value:.4f}")
print(f"  Result: {'Significant' if p_value < 0.05 else 'Not significant'} at α=0.05")

assert isinstance(stat, (int, float)), "T-statistic should be numeric"
assert 0 <= p_value <= 1, "P-value must be between 0 and 1"
print("\n✓ T-test validated")

## 6. Orchestration Logic Validation

Verify the experiment lifecycle logic (running vs. completed with cool-down)

In [None]:
# Test lifecycle logic
print("Testing Experiment Lifecycle Logic:\n")

cool_down_period = timedelta(days=3)
test_date = datetime(2025, 11, 10)

for _, exp in experiment_metadata.iterrows():
    exp_id = exp['experiment_id']
    status = exp['status']
    end_date = exp['end_date']
    
    is_running = status == 'running'
    is_in_cooldown = (status == 'completed') and (test_date <= end_date + cool_down_period)
    should_analyze = is_running or is_in_cooldown
    
    print(f"Experiment: {exp_id}")
    print(f"  Status: {status}")
    print(f"  End Date: {end_date.strftime('%Y-%m-%d')}")
    print(f"  Test Date: {test_date.strftime('%Y-%m-%d')}")
    print(f"  Should Analyze: {should_analyze}")
    
    if is_running:
        print(f"  → Running experiment: Generate INTERIM results")
    elif is_in_cooldown:
        print(f"  → In cool-down period: Generate FINAL results")
    else:
        print(f"  → Past cool-down: Skip analysis (use cached final results)")
    print()

# Validation: exp-old-feature-test should be skipped (ended 2025-10-25, cool-down ends 2025-10-28)
old_exp = experiment_metadata[experiment_metadata['experiment_id'] == 'exp-old-feature-test'].iloc[0]
is_past_cooldown = (old_exp['status'] == 'completed') and \
                   (test_date > old_exp['end_date'] + cool_down_period)
assert is_past_cooldown, "Old experiment should be past cool-down period"

print("✓ Lifecycle logic validated")

## 7. End-to-End Pipeline Test

Run complete pipeline from raw data to final results

In [None]:
print("="*60)
print("END-TO-END PIPELINE VERIFICATION")
print("="*60)

print("\n[Stage 1] Core Join (int_experiment_actions)")
print(f"  Input: {len(assignments)} assignments, {len(actions)} actions")
print(f"  Output: {len(int_experiment_actions)} joined records")
print(f"  ✓ Temporal integrity verified")

print("\n[Stage 2] Aggregation Layer (mart_experiment_user_metrics)")
print(f"  Input: {len(int_experiment_actions)} action records")
print(f"  Output: {len(mart_experiment_user_metrics)} user-level records")
print(f"  ✓ IID assumption maintained (one row per user)")

print("\n[Stage 3] Statistical Engine")
print(f"  Input: {len(mart_experiment_user_metrics)} user metrics")
print(f"  Output: {len(results_df)} statistical test results")
print(f"  ✓ All p-values in valid range [0, 1]")

print("\n" + "="*60)
print("✓ ALL PIPELINE STAGES VERIFIED SUCCESSFULLY")
print("="*60)