# Mistplay Fraud Demo – Feature Pipeline

This notebook builds feature tables in Feature Store using the synthetic data. It creates account-level and device-level features and prepares a base dataset for model training.

In [0]:
%pip install databricks-feature-engineering>=0.13.0a4 
# %pip install pyarrow==20.0.0

In [0]:
dbutils.library.restartPython()

In [0]:
from pyspark.sql import functions as F
from databricks.feature_store import FeatureStoreClient

DB_NAME = "ramin_serverless_aws_catalog.mistplay_fraud_demo"
fs = FeatureStoreClient()

# Create UC Python function for last batch cutoff (current timestamp - 3 days)
function_full_name = f"{DB_NAME}.last_batch_cutoff"

spark.sql(
    f"""
    CREATE OR REPLACE FUNCTION {function_full_name}()
    RETURNS TIMESTAMP
    LANGUAGE PYTHON
    AS $$
    from datetime import datetime, timedelta
    def last_batch_cutoff():
        return datetime.utcnow() - timedelta(days=3)
    $$
    """
)

accounts = spark.table(f"{DB_NAME}.accounts")
account_device = spark.table(f"{DB_NAME}.account_device")
devices = spark.table(f"{DB_NAME}.devices")
events = spark.table(f"{DB_NAME}.events")
rewards = spark.table(f"{DB_NAME}.rewards")


## Create Account and device level features

In [0]:

# Account-level features (7-day window)
account_event_features = (
    events.groupBy("account_id")
    .agg(
        F.count("event_type").alias("events_7d"),
        F.avg("session_minutes").alias("avg_session_minutes_7d"),
        F.avg("is_vpn").alias("vpn_rate_7d"),
        F.countDistinct("device_id").alias("distinct_devices_7d"),
    )
)

account_reward_features = (
    rewards.groupBy("account_id")
    .agg(
        F.count("reward_type").alias("rewards_7d"),
        F.sum("reward_amount").alias("reward_amount_7d"),
    )
)

# Incremental feature since last batch (via UC function)
last_batch_cutoff = F.expr(f"{function_full_name}()")
reward_since_batch = (
    rewards.filter(F.col("reward_ts") >= last_batch_cutoff)
    .groupBy("account_id")
    .agg(F.sum("reward_amount").alias("reward_amount_since_last_batch"))
)

account_features = (
    accounts.select("account_id", "country", "platform", "marketing_channel")
    .withColumn("last_batch_cutoff", last_batch_cutoff)
    .join(account_event_features, on="account_id", how="left")
    .join(account_reward_features, on="account_id", how="left")
    .join(reward_since_batch, on="account_id", how="left")
    .fillna({
        "events_7d": 0,
        "avg_session_minutes_7d": 0.0,
        "vpn_rate_7d": 0.0,
        "distinct_devices_7d": 0,
        "rewards_7d": 0,
        "reward_amount_7d": 0.0,
        "reward_amount_since_last_batch": 0.0,
    })
)

# Device-level features
accounts_per_device = (
    account_device.groupBy("device_id")
    .agg(F.countDistinct("account_id").alias("accounts_per_device_7d"))
)

device_features = (
    devices.join(accounts_per_device, on="device_id", how="left")
    .fillna({"accounts_per_device_7d": 0})
    .select(
        "device_id",
        "device_type",
        "os_version",
        "is_emulator",
        "device_risk_score",
        "accounts_per_device_7d",
    )
)


In [0]:

# Feature Store write helpers

def table_exists(name: str) -> bool:
    try:
        fs.get_table(name)
        return True
    except Exception:
        return False


def upsert_feature_table(name: str, df, primary_keys, description: str):
    if table_exists(name):
        fs.write_table(name=name, df=df, mode="overwrite")
    else:
        fs.create_table(name=name, primary_keys=primary_keys, df=df, description=description)

account_features_name = f"{DB_NAME}.account_features"
device_features_name = f"{DB_NAME}.device_features"

upsert_feature_table(
    account_features_name,
    account_features,
    primary_keys=["account_id"],
    description="Account-level behavioral features for fraud detection",
)

upsert_feature_table(
    device_features_name,
    device_features,
    primary_keys=["device_id"],
    description="Device-level risk features for fraud detection",
)


## Create Training base

In [0]:

# Base training dataset (labels + entity keys)
training_base = (
    accounts.select("account_id", "is_fraud_label")
    .join(account_device.select("account_id", "device_id"), on="account_id", how="left")
)

training_base.write.mode("overwrite").saveAsTable(f"{DB_NAME}.training_base")

print("Feature tables created:")
print(f"- {account_features_name}")
print(f"- {device_features_name}")
print("Training base table created:")
print(f"- {DB_NAME}.training_base")

## On-Demand Feature: last_batch_cutoff

This feature will be computed at inference time by calling the UC function, rather than being pre-calculated offline.

In [0]:
from databricks.feature_engineering import FeatureFunction, FeatureLookup

# Create a FeatureFunction that references your UC function
# This will be evaluated at inference time
on_demand_features = [
    FeatureFunction(
        udf_name=function_full_name,  # Your UC function: ramin_serverless_aws_catalog.mistplay_fraud_demo.last_batch_cutoff
        output_name="last_batch_cutoff_inference",  # Name for the feature in the output
        input_bindings={}  # Empty since your function takes no parameters
    )
]

print(f"On-demand feature configured: {function_full_name}")
print("This feature will be computed dynamically at inference time.")

## Feature Serving

In [0]:
from databricks.feature_engineering import FeatureEngineeringClient, FeatureLookup, FeatureFunction

# Initialize Feature Engineering client
fe = FeatureEngineeringClient()

# Define feature lookups for pre-computed features
feature_lookups = [
    FeatureLookup(
        table_name=f"{DB_NAME}.account_features",
        lookup_key="account_id",
        feature_names=[
            "country",
            "platform",
            "marketing_channel",
            "events_7d",
            "avg_session_minutes_7d",
            "vpn_rate_7d",
            "distinct_devices_7d",
            "rewards_7d",
            "reward_amount_7d",
            "reward_amount_since_last_batch"
        ]
    ),
    FeatureLookup(
        table_name=f"{DB_NAME}.device_features",
        lookup_key="device_id",
        feature_names=[
            "device_type",
            "os_version",
            "is_emulator",
            "device_risk_score",
            "accounts_per_device_7d"
        ]
    )
]

# Add on-demand feature (UC function)
on_demand_feature = FeatureFunction(
    udf_name=function_full_name,
    output_name="last_batch_cutoff_inference",
    input_bindings={}
)

# Combine FeatureLookup and FeatureFunction in the same list
features = feature_lookups + [on_demand_feature]

# Create feature spec
feature_spec_name = f"{DB_NAME}.mistplay_fraud_features"

fe.create_feature_spec(
    name=feature_spec_name,
    features=features  # Pass both FeatureLookup and FeatureFunction together
)

print(f"✓ Feature spec created: {feature_spec_name}")
print(f"  - Pre-computed features from account_features and device_features")
print(f"  - On-demand feature: {function_full_name}")

In [0]:
from databricks.feature_engineering.entities.feature_serving_endpoint import (
    ServedEntity,
    EndpointCoreConfig,
)

# Create feature serving endpoint
endpoint_name = "mistplay-fraud-features"

fe.create_feature_serving_endpoint(
    name=endpoint_name,
    config=EndpointCoreConfig(
        served_entities=ServedEntity(
            feature_spec_name=feature_spec_name,
            workload_size="Small",
            scale_to_zero_enabled=True
        )
    )
)

print(f"✓ Feature serving endpoint created: {endpoint_name}")
print(f"  Endpoint will serve features from: {feature_spec_name}")
print(f"  Workload size: Small with scale-to-zero enabled")

In [0]:
import mlflow.deployments

# Test the feature serving endpoint
client = mlflow.deployments.get_deploy_client("databricks")

# Sample request with entity keys
response = client.predict(
    endpoint=endpoint_name,
    inputs={
        "dataframe_records": [
            {"account_id": "3750", "device_id": "1088"},
            {"account_id": "3751", "device_id": "1433"},
        ]
    },
)

print("Feature serving response:")
print(response)
print("\n✓ Features retrieved successfully!")
print("  - Pre-computed features from online tables")
print(f"  - On-demand feature computed by {function_full_name}")