# Feature Engineering for Customer Anomaly Detection
### Extract features from customer usage data for anomaly detection

In [0]:
# Import required libraries
import pandas as pd
from databricks.feature_engineering import FeatureEngineeringClient
from pyspark.sql import functions as F
from pyspark.sql.types import *
import datetime

# Initialize Feature Engineering client
fe = FeatureEngineeringClient()

## Data Loading and Exploration

In [0]:
# Define catalog, schema and table names
catalog_name = 'main'
schema_name = 'ttw_workshop_demo'
source_table = f"{catalog_name}.{schema_name}.customer_sample_table"
feature_table_name = f"{catalog_name}.{schema_name}.customer_features"


In [0]:
df_spark = spark.read.format('delta').table(f'{catalog_name}.{schema_name}.{table_name}')
display(df_spark)

Customer_ID,Last_Login_Date,Ebooks_Downloaded_6_Months,Average_Session_Time,Subscription_Plan_Type,Primary_Discipline,Days_Since_Last_Activity
bdd640fb06674ad19c80317fa3b1799d,2025-02-28,5,2.67,institutional,STEM,9.0
1a3d1fa7bc8940a9a3b8c1e9392456de,2025-05-25,5,5.77,individual,business,13.0
972a846916414f828b9d2434e465e150,2025-04-01,9,1.35,individual,STEM,80.0
3b8faa1837f8488b97fc695a07a0ca6e,2025-04-19,4,21.52,institutional,STEM,23.0
b74d0fb132e746298fadc1a606cb0fb3,2025-05-20,8,9.38,institutional,business,45.0
72ff5d2a386e4be0ab65a6a48b8148f6,2025-05-07,5,5.45,institutional,STEM,6.0
c241330b01a9471f9e8a774bcf36d58b,2025-06-22,5,6.38,individual,business,19.0
47229389571a4876ac307511b2b9437a,2025-02-03,6,10.05,individual,business,13.0
1a2a73ed562b4f79837459eef50bea63,2025-01-20,11,5.29,individual,business,19.0
580d7b71d8f544139be6128e18c26797,2025-05-10,5,12.54,individual,STEM,80.0


In [0]:
# Data quality checks
print(f"Total records: {df_spark.count()}")
print(f"Unique customers: {df_spark.select('Customer_ID').distinct().count()}")
df_spark.printSchema()

summary,days_since_login,Ebooks_Downloaded_6_Months,Average_Session_Time,Days_Since_Last_Activity,engagement_score,subscription_tier_numeric
count,50.0,50.0,50.0,50.0,50.0,50.0
mean,105.9,5.12,11.24620002269745,30.18,9.408340015888214,1.0
stddev,61.52260245019627,2.2373453657659303,11.866951443515598,25.869129029302357,8.386561771082146,0.0
min,1.0,0.0,0.14,1.0,0.4200000166893005,1.0
max,205.0,11.0,61.78,91.0,45.04599914550781,1.0


## Feature Engineering

In [0]:
features_df = df_spark.withColumn("last_login_timestamp", 
                               F.to_timestamp(F.col("Last_Login_Date"), "yyyy-MM-dd")) \
                   .withColumn("current_timestamp", F.current_timestamp()) \
                   .withColumn("days_since_login", 
                              F.datediff(F.col("current_timestamp"), F.col("last_login_timestamp"))) \
                   .withColumn("usage_intensity", 
                              F.col("Ebooks_Downloaded_6_Months") / F.greatest(F.col("Days_Since_Last_Activity"), F.lit(1))) \
                   .withColumn("engagement_score", 
                              F.col("Average_Session_Time") * F.col("Ebooks_Downloaded_6_Months")) \
                   .withColumn("subscription_tier_numeric", 
                              F.when(F.col("Subscription_Plan_Type") == "Premium", 3)
                               .when(F.col("Subscription_Plan_Type") == "Standard", 2)
                               .otherwise(1))

# Add behavioral anomaly indicators
features_df = features_df.withColumn("high_downloads_low_sessions", 
                                   F.when((F.col("Ebooks_Downloaded_6_Months") > 20) & 
                                         (F.col("Average_Session_Time") < 15), 1).otherwise(0)) \
                        .withColumn("dormant_user", 
                                   F.when(F.col("Days_Since_Last_Activity") > 30, 1).otherwise(0)) \
                        .withColumn("power_user", 
                                   F.when((F.col("Ebooks_Downloaded_6_Months") > 50) & 
                                         (F.col("Average_Session_Time") > 60), 1).otherwise(0))

display(features_df)

Customer_ID,Last_Login_Date,Ebooks_Downloaded_6_Months,Average_Session_Time,Subscription_Plan_Type,Primary_Discipline,Days_Since_Last_Activity,last_login_timestamp,current_timestamp,days_since_login,usage_intensity,engagement_score,subscription_tier_numeric,high_downloads_low_sessions,dormant_user,power_user
bdd640fb06674ad19c80317fa3b1799d,2025-02-28,5,2.67,institutional,STEM,9.0,2025-02-28T00:00:00Z,2025-08-03T22:48:46.844Z,156,0.5555555555555556,13.35,1,0,0,0
1a3d1fa7bc8940a9a3b8c1e9392456de,2025-05-25,5,5.77,individual,business,13.0,2025-05-25T00:00:00Z,2025-08-03T22:48:46.844Z,70,0.3846153846153846,28.85,1,0,0,0
972a846916414f828b9d2434e465e150,2025-04-01,9,1.35,individual,STEM,80.0,2025-04-01T00:00:00Z,2025-08-03T22:48:46.844Z,124,0.1125,12.150001,1,0,1,0
3b8faa1837f8488b97fc695a07a0ca6e,2025-04-19,4,21.52,institutional,STEM,23.0,2025-04-19T00:00:00Z,2025-08-03T22:48:46.844Z,106,0.1739130434782608,86.08,1,0,0,0
b74d0fb132e746298fadc1a606cb0fb3,2025-05-20,8,9.38,institutional,business,45.0,2025-05-20T00:00:00Z,2025-08-03T22:48:46.844Z,75,0.1777777777777777,75.04,1,0,1,0
72ff5d2a386e4be0ab65a6a48b8148f6,2025-05-07,5,5.45,institutional,STEM,6.0,2025-05-07T00:00:00Z,2025-08-03T22:48:46.844Z,88,0.8333333333333334,27.25,1,0,0,0
c241330b01a9471f9e8a774bcf36d58b,2025-06-22,5,6.38,individual,business,19.0,2025-06-22T00:00:00Z,2025-08-03T22:48:46.844Z,42,0.2631578947368421,31.900002,1,0,0,0
47229389571a4876ac307511b2b9437a,2025-02-03,6,10.05,individual,business,13.0,2025-02-03T00:00:00Z,2025-08-03T22:48:46.844Z,181,0.4615384615384615,60.300003,1,0,0,0
1a2a73ed562b4f79837459eef50bea63,2025-01-20,11,5.29,individual,business,19.0,2025-01-20T00:00:00Z,2025-08-03T22:48:46.844Z,195,0.5789473684210527,58.19,1,0,0,0
580d7b71d8f544139be6128e18c26797,2025-05-10,5,12.54,individual,STEM,80.0,2025-05-10T00:00:00Z,2025-08-03T22:48:46.844Z,85,0.0625,62.7,1,0,1,0


In [0]:
# Convert date column and calculate additional features
features_df = df_spark.withColumn("Last_Login_Date", F.to_date(F.col("Last_Login_Date"), "yyyy-MM-dd")) \
    .withColumn("current_date", F.current_date()) \
    .withColumn("days_since_login", F.datediff(F.col("current_date"), F.col("Last_Login_Date"))) \
    .withColumn("engagement_score", 
                F.col("Ebooks_Downloaded_6_Months") * 0.3 + 
                F.col("Average_Session_Time") * 0.7) \
    .withColumn("usage_intensity", 
                F.when(F.col("Days_Since_Last_Activity") <= 7, "High")
                .when(F.col("Days_Since_Last_Activity") <= 30, "Medium")
                .otherwise("Low")) \
    .withColumn("subscription_tier_numeric", 
                F.when(F.col("Subscription_Plan_Type") == "Premium", 3)
                .when(F.col("Subscription_Plan_Type") == "Standard", 2)
                .otherwise(1)) \
    .withColumn("feature_timestamp", F.current_timestamp())


def select_features_for_ml(df):
    """
    Select final features for ML model
    Following dbdemos pattern for feature selection
    """
    # Define feature columns for ML (numerical features only for anomaly detection)
    feature_columns = [
        "Customer_ID",
        "days_since_login", 
        "Ebooks_Downloaded_6_Months",
        "Average_Session_Time",
        "Days_Since_Last_Activity",
        "engagement_score",
        "subscription_tier_numeric",
        "feature_timestamp"
    ]
    
    df_final = df.select(*feature_columns)
    
    print(f"Selected {len(feature_columns)-2} features for ML") # -2 for ID and timestamp
    return df_final

# Create final feature dataset
df_features_final = select_features_for_ml(features_df)
display(df_features_final)

Customer_ID,days_since_login,Ebooks_Downloaded_6_Months,Average_Session_Time,Days_Since_Last_Activity,engagement_score,subscription_tier_numeric,feature_timestamp
bdd640fb06674ad19c80317fa3b1799d,156,5,2.67,9.0,3.3690000534057614,1,2025-08-03T22:48:47.369Z
1a3d1fa7bc8940a9a3b8c1e9392456de,70,5,5.77,13.0,5.53899998664856,1,2025-08-03T22:48:47.369Z
972a846916414f828b9d2434e465e150,124,9,1.35,80.0,3.6450000166893,1,2025-08-03T22:48:47.369Z
3b8faa1837f8488b97fc695a07a0ca6e,106,4,21.52,23.0,16.26400032043457,1,2025-08-03T22:48:47.369Z
b74d0fb132e746298fadc1a606cb0fb3,75,8,9.38,45.0,8.966000080108643,1,2025-08-03T22:48:47.369Z
72ff5d2a386e4be0ab65a6a48b8148f6,88,5,5.45,6.0,5.314999866485596,1,2025-08-03T22:48:47.369Z
c241330b01a9471f9e8a774bcf36d58b,42,5,6.38,19.0,5.966000080108643,1,2025-08-03T22:48:47.369Z
47229389571a4876ac307511b2b9437a,181,6,10.05,13.0,8.835000133514404,1,2025-08-03T22:48:47.369Z
1a2a73ed562b4f79837459eef50bea63,195,11,5.29,19.0,7.002999973297118,1,2025-08-03T22:48:47.369Z
580d7b71d8f544139be6128e18c26797,85,5,12.54,80.0,10.27799997329712,1,2025-08-03T22:48:47.369Z


In [0]:
# Display feature statistics
display(df_features_final.describe())

summary,Customer_ID,days_since_login,Ebooks_Downloaded_6_Months,Average_Session_Time,Days_Since_Last_Activity,engagement_score,subscription_tier_numeric
count,50,50.0,50.0,50.0,50.0,50.0,50.0
mean,,105.9,5.12,11.24620002269745,30.18,9.408340015888214,1.0
stddev,,61.52260245019627,2.2373453657659303,11.866951443515598,25.869129029302357,8.386561771082146,0.0
min,11ce5dd2b45e41f0b139d32c93cd59bf,1.0,0.0,0.14,1.0,0.4200000166893005,1.0
max,ff50bde4382547b89cabcc97663f1c97,205.0,11.0,61.78,91.0,45.04599914550781,1.0


## Create Feature Store Table

In [0]:
try:
    spark.sql(f"DROP TABLE IF EXISTS {feature_table_name}")
    print(f"Dropped existing table: {feature_table_name}")
except:
    pass

# Create the feature table
fe.create_table(
    name=feature_table_name,
    primary_keys=["Customer_ID", "feature_timestamp"],
    timestamp_keys=["feature_timestamp"],
    df=features_final,
    description="Customer engagement features for anomaly detection - following dbdemos pattern"
)

<FeatureTable: name='main.ttw_workshop_demo.customer_features', table_id='a99a96df-e158-451e-8238-1a2a36bccc2d', description='Customer engagement features for anomaly detection - following dbdemos pattern', primary_keys=['Customer_ID', 'feature_timestamp'], partition_columns=[], features=['Customer_ID',
 'days_since_login',
 'Ebooks_Downloaded_6_Months',
 'Average_Session_Time',
 'Days_Since_Last_Activity',
 'engagement_score',
 'subscription_tier_numeric',
 'feature_timestamp'], creation_timestamp=1754261937664, online_stores=[], notebook_producers=[], job_producers=[], table_data_sources=[], path_data_sources=[], custom_data_sources=[], timestamp_keys=['feature_timestamp'], tags={}>

In [0]:
# Write features to the feature store
fe.write_table(
    name=feature_table_name,
    df=df_features_final,
    mode="merge"
)