# Notebook 03: Feature Engineering (FIXED)

**Purpose**: Extract 95 + 250 features with train-serve parity validation

**FIXED VERSION**: This notebook fixes the feature parity issues by using a unified extractor approach.

**Pipeline**:
1. Load pre-split datasets from notebook 01
2. Compute historical statistics from training data
3. Extract 95 features using unified FeatureExtractor (78 base + 17 historical)
4. Build TF-IDF vocabulary (training data ONLY)
5. Extract TF-IDF features for all splits
6. Combine features (95 + 250 = 345 total)
7. Validate feature parity
8. Save to S3

**Key Fix**: Uses unified FeatureExtractor with historical features enabled for both training and inference.

**Duration**: ~45-60 minutes

## 1. Spark Configuration

Copy the Spark configuration from notebook 00 output and paste below:

In [None]:
%%configure -f
{
    "pyFiles": [
        "s3://uip-datalake-bucket-prod/sf_trino/trino_query_predictor/code/query_predictor_latest.zip",
        "s3://uipds-108043591022/dataintelligence-dev/di-airflow-prod/dags/common/utils/ParseArgs.py"
    ],
    "driverMemory": "16G",
    "driverCores": 4,
    "executorMemory": "20G",
    "executorCores": 5,
    "conf": {
        "spark.driver.maxResultSize": "8G",
        "spark.dynamicAllocation.enabled": "true",
        "spark.dynamicAllocation.minExecutors": "2",
        "spark.dynamicAllocation.maxExecutors": "20"
    }
}

## 2. Import Dependencies

In [None]:
import sys
import yaml
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import ArrayType, FloatType

# Import production modules
from query_predictor.core.featurizer.feature_extractor import FeatureExtractor
from query_predictor.training.spark_ml_tfidf_pipeline import SparkMLTfidfPipeline
from query_predictor.training.parity_validator import ParityValidator
from query_predictor.training.historical_stats_computer import HistoricalStatsComputer
from query_predictor.training.checkpoint_manager import CheckpointManager

print(f"Python version: {sys.version}")
print(f"PySpark version: {spark.version}")
print("‚úÖ All imports successful")

## 3. Load Configuration

In [None]:
import boto3

# Download training configuration from S3
s3_client = boto3.client('s3')
s3_bucket = 'uip-datalake-bucket-prod'
s3_prefix = 'sf_trino/trino_query_predictor'
config_s3_key = f"{s3_prefix}/config/training_config_latest.yaml"
config_path = '/tmp/training_config.yaml'

print(f"Downloading config from S3: s3://{s3_bucket}/{config_s3_key}")
s3_client.download_file(s3_bucket, config_s3_key, config_path)

# Load training configuration
with open(config_path) as f:
    config = yaml.safe_load(f)

# Initialize checkpoint manager
checkpoint_mgr = CheckpointManager(
    spark,
    s3_checkpoint_path=config['checkpointing']['s3_path'],
    enabled=config['checkpointing']['enabled']
)

print("‚úÖ Configuration loaded")
print(f"\nüìã Feature Configuration:")
print(f"  Base features: {config['features']['base_feature_count']}")
print(f"  Historical features: {config['features']['historical_feature_count']}")
print(f"  TF-IDF vocab size: {config['features']['tfidf_vocab_size']}")
print(f"  Total features: {config['features']['total_features']}")

## 4. Load Pre-Split Data from Notebook 01

In [None]:
# Load pre-split datasets from notebook 01
processed_path = config['data_loading']['processed_output_path']
date_range = f"{config['data_loading']['start_date']}_to_{config['data_loading']['end_date']}"
base_path = f"{processed_path}/{date_range}"

train_path = f"{base_path}/train_sampled"  # 5:1 sampled for training
val_path = f"{base_path}/val_original"      # ~36:1 original distribution
test_path = f"{base_path}/test_original"    # ~36:1 original distribution

print(f"Loading pre-split datasets...")
print(f"  Train (sampled): {train_path}")
print(f"  Val (original):  {val_path}")
print(f"  Test (original): {test_path}")

# Load splits
train_df = spark.read.parquet(train_path)
val_df = spark.read.parquet(val_path)
test_df = spark.read.parquet(test_path)

# Get counts
train_count = train_df.count()
val_count = val_df.count()
test_count = test_df.count()

print(f"\n‚úÖ Datasets loaded:")
print(f"  Train: {train_count:,} queries")
print(f"  Val:   {val_count:,} queries")
print(f"  Test:  {test_count:,} queries")

# Calculate ratios for reporting
train_heavy = train_df.filter(F.col('is_heavy') == 1).count()
train_ratio = (train_count - train_heavy) / train_heavy if train_heavy > 0 else 0

val_heavy = val_df.filter(F.col('is_heavy') == 1).count()
val_ratio = (val_count - val_heavy) / val_heavy if val_heavy > 0 else 0

test_heavy = test_df.filter(F.col('is_heavy') == 1).count()
test_ratio = (test_count - test_heavy) / test_heavy if test_heavy > 0 else 0

print(f"\nDistribution ratios (Small:Heavy):")
print(f"  Train: {train_ratio:.1f}:1 (sampled)")
print(f"  Val:   {val_ratio:.1f}:1 (original)")
print(f"  Test:  {test_ratio:.1f}:1 (original)")

## 5. Compute Historical Statistics

Compute statistics from training data only to prevent data leakage.

In [None]:
print("Computing historical statistics from training data...")

# Initialize stats computer
stats_computer = HistoricalStatsComputer(version='1.0.0')

# Compute stats from training data
date_range_dict = {
    'start': config['data_loading']['start_date'],
    'end': config['data_loading']['end_date']
}
stats_schema = stats_computer.compute(train_df, date_range_dict)

print(f"\n‚úÖ Historical stats computed:")
print(f"  Users: {len(stats_schema.users):,}")
print(f"  Catalogs: {len(stats_schema.catalogs):,}")
print(f"  Schemas: {len(stats_schema.schemas):,}")
print(f"  Overall heavy rate: {stats_schema.heavy_rate_overall:.2%}")

# Serialize to dict for FeatureExtractor
stats_dict = stats_schema.to_dict()

## 6. Initialize Unified Feature Extractor

**KEY FIX**: Use a single FeatureExtractor with historical features enabled.
This ensures consistent feature computation between training and inference.

In [None]:
# Create unified configuration with historical features enabled
unified_config = config.copy()
unified_config['enable_historical_features'] = True

# Ensure consistent AST parser settings
unified_config['ast_timeout_ms'] = 50
unified_config['ast_fallback_on_timeout'] = True

# Initialize unified feature extractor with historical stats
unified_extractor = FeatureExtractor(
    unified_config,
    historical_stats=stats_dict
)

print("‚úÖ Unified FeatureExtractor initialized")
print(f"  Feature count: {unified_extractor.feature_count}")
print(f"  Expected: 95 (78 base + 17 historical)")
print(f"  Historical features enabled: True")
print(f"  AST timeout: {unified_config['ast_timeout_ms']}ms")

assert unified_extractor.feature_count == 95, f"Expected 95 features, got {unified_extractor.feature_count}"

## 7. Extract Unified Features (95 features)

Extract base + historical features together using the unified extractor.

In [None]:
# Create Spark UDF for distributed extraction
unified_udf = unified_extractor.create_spark_udf()

print("Extracting unified features (base + historical) for all splits...")
print("This extracts 95 features in a single pass.\n")

# Extract for train
print("[1/3] Extracting train features...")
train_unified = train_df.withColumn(
    'unified_features',
    unified_udf(
        F.struct(
            F.col('query'),
            F.col('user'),
            F.col('catalog'),
            F.col('schema'),
            F.col('hour'),
            F.col('clientInfo')
        )
    )
)
# train_unified = checkpoint_mgr.checkpoint(train_unified, "03_train_unified_fixed")

# Extract for val
print("[2/3] Extracting val features...")
val_unified = val_df.withColumn(
    'unified_features',
    unified_udf(
        F.struct(
            F.col('query'),
            F.col('user'),
            F.col('catalog'),
            F.col('schema'),
            F.col('hour'),
            F.col('clientInfo')
        )
    )
)
# val_unified = checkpoint_mgr.checkpoint(val_unified, "03_val_unified_fixed")

# Extract for test
print("[3/3] Extracting test features...")
test_unified = test_df.withColumn(
    'unified_features',
    unified_udf(
        F.struct(
            F.col('query'),
            F.col('user'),
            F.col('catalog'),
            F.col('schema'),
            F.col('hour'),
            F.col('clientInfo')
        )
    )
)
# test_unified = checkpoint_mgr.checkpoint(test_unified, "03_test_unified_fixed")

print("\n‚úÖ Unified features extracted for all splits (95 features each)")

## 8. Verify Unified Feature Dimensions

In [None]:
# Sample to verify dimensions
sample_train = train_unified.select('unified_features').limit(1).collect()[0]
unified_dim = len(sample_train['unified_features'])

print(f"Unified feature dimensions:")
print(f"  Actual: {unified_dim}")
print(f"  Expected: 95")

assert unified_dim == 95, f"Unified features should be 95, got {unified_dim}"
print("\n‚úÖ Unified feature dimensions validated")

## 9. Build TF-IDF Vocabulary (TRAINING DATA ONLY)

In [None]:
# Initialize Spark ML TF-IDF pipeline with SQL-aware optimizations
tfidf_config = {
    'tfidf_vocab_size': config['features']['tfidf_vocab_size'],
    'min_df': config['features']['min_df'],
    'max_df': config['features']['max_df'],
    'use_binary': config['features'].get('use_binary', True),
    'filter_sql_keywords': config['features'].get('filter_sql_keywords', True),
    'normalize_sql': config['features'].get('normalize_sql', True)
}

tfidf_pipeline = SparkMLTfidfPipeline(tfidf_config)

print("Building TF-IDF vocabulary on TRAINING DATA ONLY...")
print(f"  Config: vocab_size={tfidf_config['tfidf_vocab_size']}, min_df={tfidf_config['min_df']}, max_df={tfidf_config['max_df']}")
print(f"  SQL optimizations: binary={tfidf_config['use_binary']}, filter_keywords={tfidf_config['filter_sql_keywords']}")
print("  This prevents data leakage into val/test sets.\n")

# Fit on DataFrame directly (NO COLLECT!)
tfidf_pipeline.fit_on_dataframe(train_unified, query_column='query')

print(f"\n‚úÖ TF-IDF vocabulary built successfully")
metadata = tfidf_pipeline.get_feature_metadata()
print(f"  Vocabulary size: {metadata['vocab_size']:,}")
print(f"  Method: {metadata['method']}")

## 10. Extract TF-IDF Features

In [None]:
# Create Spark UDF from fitted pipeline
tfidf_udf = tfidf_pipeline.create_spark_udf()

print("Extracting TF-IDF features for all splits...")

# Extract for train
print("\n[1/3] Extracting train TF-IDF features...")
train_tfidf = train_unified.withColumn('tfidf_features', tfidf_udf(F.col('query')))
# train_tfidf = checkpoint_mgr.checkpoint(train_tfidf, "03_train_tfidf_fixed")

# Extract for val
print("[2/3] Extracting val TF-IDF features...")
val_tfidf = val_unified.withColumn('tfidf_features', tfidf_udf(F.col('query')))
# val_tfidf = checkpoint_mgr.checkpoint(val_tfidf, "03_val_tfidf_fixed")

# Extract for test
print("[3/3] Extracting test TF-IDF features...")
test_tfidf = test_unified.withColumn('tfidf_features', tfidf_udf(F.col('query')))
# test_tfidf = checkpoint_mgr.checkpoint(test_tfidf, "03_test_tfidf_fixed")

print("\n‚úÖ TF-IDF features extracted for all splits")

## 11. Combine Features

Concatenate unified features (95) + TF-IDF features (250) = 345 total

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, FloatType

@udf(returnType=ArrayType(FloatType()))
def combine_features(unified, tfidf):
    """Concatenate unified + tfidf features."""
    if unified is None or tfidf is None:
        return None
    return unified + tfidf

print("Combining unified and TF-IDF features...")

# Combine for train
train_final = train_tfidf.withColumn(
    'features',
    combine_features(
        F.col('unified_features'),
        F.col('tfidf_features')
    )
)

# Combine for val
val_final = val_tfidf.withColumn(
    'features',
    combine_features(
        F.col('unified_features'),
        F.col('tfidf_features')
    )
)

# Combine for test
test_final = test_tfidf.withColumn(
    'features',
    combine_features(
        F.col('unified_features'),
        F.col('tfidf_features')
    )
)

print("\n‚úÖ Features combined")
print(f"  Unified: 95 (78 base + 17 historical)")
print(f"  TF-IDF: {config['features']['tfidf_vocab_size']}")
print(f"  Total: {config['features']['total_features']}")

## 12. Validate Feature Dimensions

In [None]:
# Sample and verify dimensions
print("Validating final feature dimensions...")

sample_train = train_final.select('features', 'is_heavy').limit(1).collect()[0]
sample_val = val_final.select('features', 'is_heavy').limit(1).collect()[0]
sample_test = test_final.select('features', 'is_heavy').limit(1).collect()[0]

train_dim = len(sample_train['features'])
val_dim = len(sample_val['features'])
test_dim = len(sample_test['features'])
expected_dim = config['features']['total_features']

print(f"\nüìä Feature Dimensions:")
print(f"  Train: {train_dim}")
print(f"  Val:   {val_dim}")
print(f"  Test:  {test_dim}")
print(f"  Expected: {expected_dim}")

assert train_dim == expected_dim, f"Train dimension mismatch: {train_dim} != {expected_dim}"
assert val_dim == expected_dim, f"Val dimension mismatch: {val_dim} != {expected_dim}"
assert test_dim == expected_dim, f"Test dimension mismatch: {test_dim} != {expected_dim}"

print("\n‚úÖ All dimensions validated")

## 13. Feature Parity Validation

**CRITICAL**: Validate that training features match inference features.
This should now pass with the unified extractor approach.

In [None]:
print("="*70)
print("FEATURE PARITY VALIDATION")
print("="*70)

# Initialize validator
validation_config = config.get('validation', {})
n_samples = validation_config.get('parity_samples', 100)
validator = ParityValidator(config=config)

# Collect sample of training features and queries
print(f"\nCollecting {n_samples} samples for validation...")
train_samples = train_final.select(
    'features', 'query', 'user', 'catalog', 'schema', 'hour', 'clientInfo', 'is_heavy'
).limit(n_samples).collect()

# Convert to numpy arrays
training_features = np.array([row['features'] for row in train_samples], dtype=np.float32)

# Prepare sample queries for inference
sample_queries = [
    {
        'query': row['query'],
        'user': row['user'],
        'catalog': row['catalog'],
        'schema': row['schema'],
        'hour': row['hour'],
        'clientInfo': row['clientInfo']
    }
    for row in train_samples
]

print(f"\nRunning parity validation...")
print(f"  Tolerance: {validator.tolerance}")
print(f"  Success threshold: <{validation_config.get('parity_success_threshold', 0.5)}% mismatch")

# Create inference featurizer with IDENTICAL configuration
# This is the KEY FIX - using the same unified configuration
inference_featurizer = FeatureExtractor(
    unified_config,  # Same config as training
    historical_stats=stats_dict  # Same historical stats
)

print(f"\nInference featurizer initialized (identical to training):")
print(f"  Feature count: {inference_featurizer.feature_count}")
print(f"  Historical features enabled: True")
print(f"  Config matches training: YES\n")

# Run validation
parity_result = validator.validate_parity(
    training_features=training_features,
    inference_featurizer=inference_featurizer,
    tfidf_pipeline=tfidf_pipeline,
    sample_queries=sample_queries,
    n_samples=n_samples
)

# Generate and print report
report = validator.generate_report(parity_result)
print(report)

if not parity_result['passed']:
    print("\n‚ö†Ô∏è  WARNING: Parity validation still failing!")
    print("Debugging information:")
    print(f"  - Training used unified_extractor with {unified_extractor.feature_count} features")
    print(f"  - Inference using identical configuration")
    print(f"  - Mismatched features: {parity_result['mismatches'][0]['indices'][:10] if parity_result['mismatches'] else 'N/A'}")
    # Don't raise error - allow notebook to continue for debugging
else:
    print("\n‚úÖ PARITY VALIDATION PASSED!")
    print("Features are consistent between training and inference.")

## 14. Debug Feature Differences (if parity fails)

In [None]:
# Debug cell - only run if parity validation fails
if not parity_result['passed']:
    print("Debugging feature differences...\n")
    
    # Get first sample
    sample_idx = 0
    sample_query = sample_queries[sample_idx]
    training_feat = training_features[sample_idx]
    
    # Extract features using inference path
    inference_feat = inference_featurizer.extract(sample_query)
    tfidf_feat = tfidf_pipeline.transform_single(sample_query['query'])
    combined_inference = np.concatenate([inference_feat, tfidf_feat])
    
    # Find differences
    diff = np.abs(training_feat - combined_inference)
    mismatch_indices = np.where(diff > validator.tolerance)[0]
    
    print(f"Sample {sample_idx} analysis:")
    print(f"  Total mismatches: {len(mismatch_indices)}")
    print(f"  Mismatch indices: {mismatch_indices[:20]}")
    
    # Check specific feature ranges
    print(f"\nFeature range analysis:")
    print(f"  AST features (45-54): {[i for i in mismatch_indices if 45 <= i <= 54]}")
    print(f"  Historical boundary (78-94): {[i for i in mismatch_indices if 78 <= i <= 94]}")
    print(f"  TF-IDF start (95+): {[i for i in mismatch_indices if i >= 95]}")
    
    # Sample specific features
    if 45 in mismatch_indices:
        print(f"\nAST feature 45 (ast_depth):")
        print(f"  Training: {training_feat[45]}")
        print(f"  Inference: {combined_inference[45]}")

## 15. Save Feature Datasets to S3

In [None]:
# Define output paths with "_fixed" suffix to distinguish from original
features_path = config['features']['output_path']
date_range = f"{config['data_loading']['start_date']}_to_{config['data_loading']['end_date']}"
output_base = f"{features_path}/{date_range}_fixed"

train_path = f"{output_base}/train"
val_path = f"{output_base}/val"
test_path = f"{output_base}/test"

print(f"Saving feature datasets to S3...")
print(f"  Base path: {output_base}")

# Select relevant columns
output_columns = [
    'queryId',
    'query',
    'user',
    'catalog',
    'schema',
    'queryDate',
    'hour',
    'is_heavy',
    'cpu_time_seconds',
    'memory_gb',
    'features'  # Combined features array
]

# Save train
print("\n[1/3] Saving train dataset...")
train_final.select(output_columns).write.mode('overwrite').parquet(train_path)
print(f"  ‚úÖ Train saved: {train_path}")

# Save val
print("[2/3] Saving val dataset...")
val_final.select(output_columns).write.mode('overwrite').parquet(val_path)
print(f"  ‚úÖ Val saved: {val_path}")

# Save test
print("[3/3] Saving test dataset...")
test_final.select(output_columns).write.mode('overwrite').parquet(test_path)
print(f"  ‚úÖ Test saved: {test_path}")

print("\n‚úÖ All feature datasets saved to S3")

## 16. Save TF-IDF Vectorizer and Metadata

In [None]:
import boto3
import tempfile
import json
import os

# Save TF-IDF pipeline
print("Saving TF-IDF vectorizer...")

with tempfile.NamedTemporaryFile(mode='wb', delete=False, suffix='.pkl') as tmp:
    tfidf_pipeline.save(tmp.name)
    local_tfidf_path = tmp.name

# Upload to S3 with "_fixed" suffix
s3_tfidf_key = f"{config['s3']['prefix']}/models/tfidf_vectorizer_{date_range}_fixed.pkl"
s3_client = boto3.client('s3')
s3_client.upload_file(local_tfidf_path, config['s3']['bucket'], s3_tfidf_key)

s3_tfidf_path = f"s3://{config['s3']['bucket']}/{s3_tfidf_key}"
print(f"  ‚úÖ Uploaded: {s3_tfidf_path}")

# Cleanup
os.unlink(local_tfidf_path)

# Save metadata
print("\nSaving metadata...")
metadata = {
    'timestamp': datetime.now().isoformat(),
    'date_range': date_range,
    'fixed_version': True,
    'features': {
        'unified_features': 95,
        'base_features': 78,
        'historical_features': 17,
        'tfidf_features': tfidf_pipeline.vocab_size,
        'total_features': config['features']['total_features']
    },
    'parity_validation': parity_result,
    's3_paths': {
        'train': train_path,
        'val': val_path,
        'test': test_path,
        'tfidf_vectorizer': s3_tfidf_path
    },
    'class_distributions': {
        'train': f'{train_ratio:.1f}:1',
        'val': f'{val_ratio:.1f}:1',
        'test': f'{test_ratio:.1f}:1'
    }
}

# Save metadata
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.json') as tmp:
    json.dump(metadata, tmp, indent=2)
    local_metadata_path = tmp.name

metadata_key = f"{config['s3']['prefix']}/metadata/features_{date_range}_fixed.json"
s3_client.upload_file(local_metadata_path, config['s3']['bucket'], metadata_key)
print(f"  ‚úÖ Metadata saved: s3://{config['s3']['bucket']}/{metadata_key}")

# Cleanup
os.unlink(local_metadata_path)

## 17. Summary Report

In [None]:
print("="*70)
print("FEATURE ENGINEERING SUMMARY (FIXED VERSION)")
print("="*70)

print(f"\n‚úÖ KEY FIX APPLIED:")
print(f"  Used unified FeatureExtractor with historical features enabled")
print(f"  Training and inference use identical configuration")
print(f"  AST parser settings consistent")

print(f"\nFeature Breakdown:")
print(f"  Unified features:    95 (78 base + 17 historical)")
print(f"  TF-IDF features:     {tfidf_pipeline.vocab_size}")
print(f"  {'-' * 40}")
print(f"  Total features:      {config['features']['total_features']}")

print(f"\nDataset Sizes:")
print(f"  Train: {train_count:,} queries")
print(f"  Val:   {val_count:,} queries")
print(f"  Test:  {test_count:,} queries")

print(f"\nClass Distributions:")
print(f"  Train: {train_ratio:.1f}:1 (sampled)")
print(f"  Val:   {val_ratio:.1f}:1 (original)")
print(f"  Test:  {test_ratio:.1f}:1 (original)")

print(f"\nParity Validation:")
if parity_result['passed']:
    print(f"  Status: ‚úÖ PASSED")
    print(f"  Mismatch rate: {parity_result['mismatch_rate']:.2f}%")
else:
    print(f"  Status: ‚ùå FAILED")
    print(f"  Mismatch rate: {parity_result['mismatch_rate']:.2f}%")
    print(f"  Investigation needed for remaining issues")

print(f"\nS3 Outputs (fixed version):")
print(f"  Features: {output_base}/")
print(f"  TF-IDF: {s3_tfidf_path}")
print(f"  Metadata: s3://{config['s3']['bucket']}/{metadata_key}")

print("\n" + "="*70)
print("FEATURE ENGINEERING COMPLETE (FIXED VERSION)")
print("="*70)

print("\nNext Steps:")
if parity_result['passed']:
    print("1. ‚úÖ Proceed to notebook 04 for model training")
    print("2. Use the fixed feature datasets for training")
else:
    print("1. ‚ö†Ô∏è  Investigate remaining parity issues")
    print("2. Check AST parser behavior in detail")
    print("3. May need to disable AST features if issues persist")