# Feature Store SDK Demo

This notebook demonstrates the complete functionality of our custom Feature Store SDK.

## Features:
- ✅ Delta Lake storage format
- ✅ Automatic joins between feature groups
- ✅ Precise feature selection via projections
- ✅ Multiple output formats: Spark, Pandas, Polars
- ✅ Simple API without over-engineering

## Setup and Imports

In [1]:
import os
import sys
import pandas as pd
import polars as pl
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip

# Add the parent directory to Python path to import our SDK
sys.path.append('/workspace')
from feature_store_sdk import FeatureStore, projection

print("📦 All imports successful!")

📦 All imports successful!


## Initialize Spark with Delta Lake

In [2]:
# Initialize Spark with Delta Lake support
builder = SparkSession.builder.appName("FeatureStoreSDKDemo") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()
spark.sparkContext.setLogLevel("WARN")

print(f"✅ Spark {spark.version} initialized with Delta Lake support")
print(f"🌐 Spark UI: http://localhost:4040")

:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-33ccb19c-68f4-466c-99f7-873b0e03f892;1.0
	confs: [default]
	found io.delta#delta-core_2.12;2.4.0 in central
	found io.delta#delta-storage;2.4.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
downloading https://repo1.maven.org/maven2/io/delta/delta-core_2.12/2.4.0/delta-core_2.12-2.4.0.jar ...
	[SUCCESSFUL ] io.delta#delta-core_2.12;2.4.0!delta-core_2.12.jar (141ms)
downloading https://repo1.maven.org/maven2/io/delta/delta-storage/2.4.0/delta-storage-2.4.0.jar ...
	[SUCCESSFUL ] io.delta#delta-storage;2.4.0!delta-storage.jar (17ms)
downloading https://repo1.maven.org/maven2/org/antlr/antlr4-runtime/4.9.3/antlr4-runtime-4.9.3.jar ...
	[SUCCESSFUL ] org.antlr#antlr4-runtime;4.9.3!antlr4-runtime.jar (31ms)
:: resolution report :: resolve 416ms :: artifacts dl 192ms
	::

✅ Spark 3.4.4 initialized with Delta Lake support
🌐 Spark UI: http://localhost:4040


## Create Sample Business Data

Let's create realistic business data for our feature store demo.

In [3]:
# Create sample business data
print("📊 Creating sample business data...")

# Customer accounts data
accounts_data = pd.DataFrame({
    'account_id': ['ACC001', 'ACC002', 'ACC003', 'ACC004', 'ACC005', 'ACC006'],
    'user_id': ['USER001', 'USER002', 'USER003', 'USER004', 'USER005', 'USER006'],
    'account_type': ['PREMIUM', 'STANDARD', 'PREMIUM', 'GOLD', 'STANDARD', 'GOLD'],
    'status': ['ACTIVE', 'ACTIVE', 'INACTIVE', 'ACTIVE', 'ACTIVE', 'SUSPENDED'],
    'opened_at': ['2023-01-15', '2023-02-20', '2023-03-10', '2023-04-05', '2023-05-12', '2023-06-01'],
    'credit_limit': [10000, 5000, 15000, 25000, 3000, 20000]
})

# User profile data
users_data = pd.DataFrame({
    'user_id': ['USER001', 'USER002', 'USER003', 'USER004', 'USER005', 'USER006'],
    'age': [25, 34, 28, 45, 33, 39],
    'segment': ['PREMIUM', 'STANDARD', 'PREMIUM', 'GOLD', 'STANDARD', 'GOLD'],
    'country': ['US', 'UK', 'CA', 'US', 'DE', 'FR'],
    'city': ['New York', 'London', 'Toronto', 'San Francisco', 'Berlin', 'Paris'],
    'income_bracket': ['HIGH', 'MEDIUM', 'HIGH', 'VERY_HIGH', 'MEDIUM', 'HIGH'],
    'signup_date': ['2022-12-01', '2023-01-15', '2023-02-01', '2022-11-15', '2023-04-01', '2023-05-20']
})

# Transaction profile data (aggregated features)
transactions_data = pd.DataFrame({
    'account_id': ['ACC001', 'ACC002', 'ACC003', 'ACC004', 'ACC005', 'ACC006'],
    'last_txn_ts': ['2024-01-15 10:30:00', '2024-01-14 15:45:00', '2023-12-20 09:15:00', 
                   '2024-01-16 14:20:00', '2024-01-15 11:55:00', '2024-01-13 16:30:00'],
    'avg_ticket': [125.50, 89.75, 245.30, 67.80, 156.25, 301.40],
    'txn_cnt_30d': [8, 5, 1, 12, 7, 15],
    'txn_cnt_90d': [15, 8, 2, 22, 12, 28],
    'total_spend_90d': [1882.5, 718.0, 490.6, 1491.6, 1875.0, 8439.2],
    'distinct_merchants_90d': [8, 5, 2, 12, 7, 16]
})

# Risk scores (additional feature group)
risk_data = pd.DataFrame({
    'account_id': ['ACC001', 'ACC002', 'ACC003', 'ACC004', 'ACC005', 'ACC006'],
    'credit_score': [750, 680, 720, 800, 650, 780],
    'fraud_score': [0.05, 0.12, 0.03, 0.01, 0.08, 0.02],
    'risk_category': ['LOW', 'MEDIUM', 'LOW', 'VERY_LOW', 'MEDIUM', 'LOW'],
    'last_risk_assessment': ['2024-01-10', '2024-01-12', '2023-12-15', '2024-01-14', '2024-01-11', '2024-01-09']
})

print(f"📋 Created {len(accounts_data)} accounts")
print(f"👥 Created {len(users_data)} user profiles") 
print(f"💳 Created {len(transactions_data)} transaction profiles")
print(f"⚠️ Created {len(risk_data)} risk assessments")

# Display sample data
print("\n📊 Sample accounts data:")
print(accounts_data.head(3))
print("\n👥 Sample users data:")
print(users_data.head(3))

📊 Creating sample business data...
📋 Created 6 accounts
👥 Created 6 user profiles
💳 Created 6 transaction profiles
⚠️ Created 6 risk assessments

📊 Sample accounts data:
  account_id  user_id account_type    status   opened_at  credit_limit
0     ACC001  USER001      PREMIUM    ACTIVE  2023-01-15         10000
1     ACC002  USER002     STANDARD    ACTIVE  2023-02-20          5000
2     ACC003  USER003      PREMIUM  INACTIVE  2023-03-10         15000

👥 Sample users data:
   user_id  age   segment country      city income_bracket signup_date
0  USER001   25   PREMIUM      US  New York           HIGH  2022-12-01
1  USER002   34  STANDARD      UK    London         MEDIUM  2023-01-15
2  USER003   28   PREMIUM      CA   Toronto           HIGH  2023-02-01


## Save Data as Delta Tables

In [4]:
# Save all data as Delta Lake tables
base_path = "/workspace/data/feature_store_demo"
print(f"💾 Saving data to Delta Lake at: {base_path}")

# Convert to Spark DataFrames and save
accounts_df = spark.createDataFrame(accounts_data)
accounts_df.write.format("delta").mode("overwrite").save(f"{base_path}/accounts")
print("✅ Accounts saved")

users_df = spark.createDataFrame(users_data)  
users_df.write.format("delta").mode("overwrite").save(f"{base_path}/users")
print("✅ Users saved")

transactions_df = spark.createDataFrame(transactions_data)
transactions_df.write.format("delta").mode("overwrite").save(f"{base_path}/transactions_profile")
print("✅ Transaction profiles saved")

risk_df = spark.createDataFrame(risk_data)
risk_df.write.format("delta").mode("overwrite").save(f"{base_path}/risk_scores")
print("✅ Risk scores saved")

print("\n🎉 All data successfully saved in Delta Lake format!")

💾 Saving data to Delta Lake at: /workspace/data/feature_store_demo


25/08/10 02:35:32 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

✅ Accounts saved
✅ Users saved
✅ Transaction profiles saved
✅ Risk scores saved

🎉 All data successfully saved in Delta Lake format!


## Initialize Feature Store SDK

Now let's use our SDK to create feature groups and feature views.

In [5]:
# Initialize Feature Store
fs = FeatureStore(spark=spark)
print("✅ Feature Store initialized")

# Create feature groups with explicit data locations
print("\n📊 Creating feature groups...")

accounts_fg = fs.get_or_create_batch_feature_group(
    name="accounts", 
    version=1, 
    keys=["account_id"],
    data_location=f"{base_path}/accounts",
    description="Customer account information"
)
print(f"✅ {accounts_fg}")

users_fg = fs.get_or_create_batch_feature_group(
    name="users", 
    version=1, 
    keys=["user_id"],
    data_location=f"{base_path}/users",
    description="User demographic and profile data"
)
print(f"✅ {users_fg}")

transactions_fg = fs.get_or_create_batch_feature_group(
    name="transactions_profile", 
    version=1, 
    keys=["account_id"],
    data_location=f"{base_path}/transactions_profile",
    description="Aggregated transaction features per account"
)
print(f"✅ {transactions_fg}")

risk_fg = fs.get_or_create_batch_feature_group(
    name="risk_scores", 
    version=1, 
    keys=["account_id"],
    data_location=f"{base_path}/risk_scores",
    description="Risk assessment scores and categories"
)
print(f"✅ {risk_fg}")

print("\n🎯 All feature groups created successfully!")

✅ Feature Store initialized

📊 Creating feature groups...
✅ BatchFeatureGroup(name='accounts', version=1, keys=['account_id'], location='/workspace/data/feature_store_demo/accounts')
✅ BatchFeatureGroup(name='users', version=1, keys=['user_id'], location='/workspace/data/feature_store_demo/users')
✅ BatchFeatureGroup(name='transactions_profile', version=1, keys=['account_id'], location='/workspace/data/feature_store_demo/transactions_profile')
✅ BatchFeatureGroup(name='risk_scores', version=1, keys=['account_id'], location='/workspace/data/feature_store_demo/risk_scores')

🎯 All feature groups created successfully!


## Test 1: Basic Feature Selection

Test that we can select specific features from individual feature groups.

In [6]:
print("🧪 Test 1: Basic Feature Selection")
print("=" * 40)

# Create a simple feature view with only specific features
basic_fv = fs.get_or_create_feature_view(
    name="basic_account_features", 
    version=1, 
    base=accounts_fg,
    source_projections=[
        projection(
            source=accounts_fg,
            features=["account_id", "status", "account_type"]  # Only these 3 features
        )
    ],
    description="Basic account features - minimal set"
)

# Test the query
result = basic_fv.plan().to_pandas()
print(f"📋 Columns returned: {list(result.columns)}")
print(f"📊 Expected: ['account_id', 'status', 'account_type']")
print(f"✅ Feature selection working: {set(result.columns) == {'account_id', 'status', 'account_type'}}")
print(f"📈 Row count: {len(result)}")

print("\n📊 Sample data:")
print(result.head())

🧪 Test 1: Basic Feature Selection
📋 Columns returned: ['account_id', 'status', 'account_type']
📊 Expected: ['account_id', 'status', 'account_type']
✅ Feature selection working: True
📈 Row count: 6

📊 Sample data:
  account_id     status account_type
0     ACC003   INACTIVE      PREMIUM
1     ACC002     ACTIVE     STANDARD
2     ACC005     ACTIVE     STANDARD
3     ACC006  SUSPENDED         GOLD
4     ACC001     ACTIVE      PREMIUM


## Test 2: Multi-Table Join with Feature Selection

Test automatic joins between multiple feature groups with precise feature selection.

In [7]:
print("🧪 Test 2: Multi-Table Join with Feature Selection")
print("=" * 50)

# Create comprehensive feature view with joins
comprehensive_fv = fs.get_or_create_feature_view(
    name="comprehensive_features", 
    version=1, 
    base=accounts_fg,
    source_projections=[
        # Base account features
        projection(
            source=accounts_fg,
            features=["account_id", "user_id", "status", "account_type", "credit_limit"]
        ),
        # User demographics - join on user_id
        projection(
            source=users_fg,
            features=["age", "segment", "country", "income_bracket"],
            keys_map={"user_id": "user_id"},
            join_type="left"
        ),
        # Transaction features - join on account_id
        projection(
            source=transactions_fg,
            features=["avg_ticket", "txn_cnt_90d", "total_spend_90d"],
            keys_map={"account_id": "account_id"},
            join_type="left"
        ),
        # Risk scores - join on account_id
        projection(
            source=risk_fg,
            features=["credit_score", "fraud_score", "risk_category"],
            keys_map={"account_id": "account_id"},
            join_type="left"
        )
    ],
    description="Comprehensive account features with user, transaction, and risk data"
)

# Test the comprehensive query
result = comprehensive_fv.plan().to_pandas()
print(f"📋 Columns returned: {list(result.columns)}")
print(f"📊 Total features: {len(result.columns)}")
print(f"📈 Row count: {len(result)}")

expected_cols = {
    'account_id', 'user_id', 'status', 'account_type', 'credit_limit',  # accounts
    'age', 'segment', 'country', 'income_bracket',  # users
    'avg_ticket', 'txn_cnt_90d', 'total_spend_90d',  # transactions
    'credit_score', 'fraud_score', 'risk_category'   # risk
}
print(f"✅ All expected features present: {set(result.columns) == expected_cols}")

print("\n📊 Sample comprehensive data:")
print(result.head(3))

🧪 Test 2: Multi-Table Join with Feature Selection


25/08/10 02:35:37 WARN Column: Constructing trivially true equals predicate, ''user_id = 'user_id'. Perhaps you need to use aliases.


AnalysisException: [AMBIGUOUS_REFERENCE] Reference `user_id` is ambiguous, could be: [`user_id`, `user_id`].

## Test 3: Multiple Output Formats

Demonstrate that the same feature view can output to Spark, Pandas, and Polars.

In [None]:
print("🧪 Test 3: Multiple Output Formats")
print("=" * 35)

# Create a focused feature view for format testing
format_test_fv = fs.get_or_create_feature_view(
    name="format_test_features", 
    version=1, 
    base=accounts_fg,
    source_projections=[
        projection(
            source=accounts_fg,
            features=["account_id", "status", "credit_limit"]
        ),
        projection(
            source=users_fg,
            features=["age", "country"],
            keys_map={"user_id": "user_id"},
            join_type="left"
        )
    ]
)

query_plan = format_test_fv.plan()

print("\n🔥 Testing Spark DataFrame output:")
spark_df = query_plan.to_spark()
print(f"   Type: {type(spark_df)}")
print(f"   Columns: {spark_df.columns}")
print(f"   Count: {spark_df.count()}")
spark_df.show(3)

print("\n🐼 Testing Pandas DataFrame output:")
pandas_df = query_plan.to_pandas()
print(f"   Type: {type(pandas_df)}")
print(f"   Shape: {pandas_df.shape}")
print(f"   Columns: {list(pandas_df.columns)}")
print(pandas_df.head(3))

print("\n⚡ Testing Polars DataFrame output:")
polars_df = query_plan.to_polars()
print(f"   Type: {type(polars_df)}")
print(f"   Shape: {polars_df.shape}")
print(f"   Columns: {list(polars_df.columns)}")
print(polars_df.head(3))

print("\n✅ All output formats working correctly!")

## Test 4: Advanced Feature Engineering Scenario

Simulate a real-world ML scenario where we need specific features for model training.

In [None]:
print("🧪 Test 4: Advanced Feature Engineering Scenario")
print("=" * 45)

# Scenario: Create features for a credit risk model
credit_risk_fv = fs.get_or_create_feature_view(
    name="credit_risk_model_features", 
    version=1, 
    base=accounts_fg,
    source_projections=[
        # Account basics
        projection(
            source=accounts_fg,
            features=["account_id", "account_type", "credit_limit", "status"]
        ),
        # Customer demographics for risk assessment
        projection(
            source=users_fg,
            features=["age", "income_bracket", "country"],
            keys_map={"user_id": "user_id"},
            join_type="left"
        ),
        # Transaction behavior patterns
        projection(
            source=transactions_fg,
            features=["txn_cnt_30d", "txn_cnt_90d", "avg_ticket", "total_spend_90d", "distinct_merchants_90d"],
            keys_map={"account_id": "account_id"},
            join_type="left"
        ),
        # Risk indicators
        projection(
            source=risk_fg,
            features=["credit_score", "fraud_score", "risk_category"],
            keys_map={"account_id": "account_id"},
            join_type="left"
        )
    ],
    description="Features for credit risk modeling"
)

# Get features as Polars for fast processing
ml_features = credit_risk_fv.plan().to_polars()

print(f"📊 ML Feature Set created:")
print(f"   Features: {len(ml_features.columns)}")
print(f"   Samples: {len(ml_features)}")
print(f"   Feature names: {list(ml_features.columns)}")

print("\n📈 Feature Statistics:")
print(ml_features.describe())

print("\n🎯 Ready for ML model training!")
print("\n📋 Sample ML training data:")
print(ml_features.head())

## Test 5: Performance and Query Plan Analysis

Examine the underlying Spark execution plan and performance characteristics.

In [None]:
print("🧪 Test 5: Performance and Query Plan Analysis")
print("=" * 45)

# Get the Spark DataFrame to analyze execution plan
spark_result = comprehensive_fv.plan().to_spark()

print("🔍 Spark Execution Plan:")
print("=" * 25)
spark_result.explain(True)

print("\n📊 Query Performance Metrics:")
print(f"   Total columns: {len(spark_result.columns)}")
print(f"   Total rows: {spark_result.count()}")

print("\n🏗️ Data Sources Verified:")
print(f"   ✅ Accounts FG exists: {accounts_fg.exists()}")
print(f"   ✅ Users FG exists: {users_fg.exists()}")
print(f"   ✅ Transactions FG exists: {transactions_fg.exists()}")
print(f"   ✅ Risk FG exists: {risk_fg.exists()}")

print("\n📋 Schema Information:")
spark_result.printSchema()

## SDK Validation Summary

Let's run a comprehensive validation of all SDK features.

In [None]:
print("🏆 Feature Store SDK Validation Summary")
print("=" * 50)

# Test checklist
tests_passed = 0
total_tests = 0

def validate_test(condition, description):
    global tests_passed, total_tests
    total_tests += 1
    if condition:
        tests_passed += 1
        print(f"✅ {description}")
    else:
        print(f"❌ {description}")
    return condition

print("\n📋 Core Functionality Tests:")

# Test 1: FeatureStore initialization
validate_test(fs is not None, "FeatureStore initialization")

# Test 2: Feature group creation with data location
validate_test(accounts_fg.exists(), "Feature group creation and Delta Lake storage")

# Test 3: Basic feature selection
basic_result = basic_fv.plan().to_pandas()
validate_test(
    set(basic_result.columns) == {'account_id', 'status', 'account_type'},
    "Precise feature selection from projections"
)

# Test 4: Multi-table automatic joins
comp_result = comprehensive_fv.plan().to_pandas()
validate_test(
    len(comp_result.columns) == 16 and len(comp_result) == 6,
    "Multi-table automatic joins with feature selection"
)

# Test 5: Multiple output formats
try:
    test_plan = format_test_fv.plan()
    spark_out = test_plan.to_spark()
    pandas_out = test_plan.to_pandas()
    polars_out = test_plan.to_polars()
    formats_work = all([
        len(spark_out.columns) > 0,
        len(pandas_out.columns) > 0,
        len(polars_out.columns) > 0
    ])
    validate_test(formats_work, "Multiple output formats (Spark/Pandas/Polars)")
except Exception as e:
    validate_test(False, f"Multiple output formats - Error: {e}")

# Test 6: Join key mapping
user_joined = any('age' in col for col in comp_result.columns)
validate_test(user_joined, "Custom join key mapping (account.user_id -> users.user_id)")

# Test 7: Different join types
validate_test(
    len(comp_result) == len(accounts_data),
    "Left join behavior - preserves all base records"
)

print(f"\n🎯 Test Results: {tests_passed}/{total_tests} passed")

if tests_passed == total_tests:
    print("\n🎉 ALL TESTS PASSED! Feature Store SDK is fully functional! 🎉")
    print("\n✨ SDK Features Validated:")
    print("   ✅ Delta Lake storage format")
    print("   ✅ Automatic multi-table joins")
    print("   ✅ Precise feature selection via projections")
    print("   ✅ Custom join key mapping")
    print("   ✅ Multiple output formats (Spark, Pandas, Polars)")
    print("   ✅ Left/Inner join support")
    print("   ✅ Query plan execution")
    print("   ✅ Feature group management")
    print("   ✅ Feature view creation")
    print("   ✅ Simple, clean API")
else:
    print(f"\n⚠️ {total_tests - tests_passed} tests failed. Please review the implementation.")

print(f"\n📊 Final Statistics:")
print(f"   Feature Groups: 4")
print(f"   Feature Views: 5")
print(f"   Total Features Available: {sum([len(accounts_data.columns), len(users_data.columns), len(transactions_data.columns), len(risk_data.columns)])}")
print(f"   Sample Records: {len(accounts_data)}")

## Cleanup

In [None]:
# Clean up Spark session
spark.stop()
print("🧹 Spark session stopped")
print("\n🎊 Feature Store SDK Demo Complete! 🎊")