#### Churn Prediction Feature Engineering

#### 01. Exploratory Data Analysis

In [0]:
# Read into Spark Dataframe
telcoDF = spark.read.table("workspace.customer_churn.churn_bronze_customers")
display(telcoDF)

#### 02. Feature Engineering

#### Compute the number of active services

In [0]:
from pyspark.sql import DataFrame as SparkDataFrame
from pyspark.sql.functions import pandas_udf, col, when, lit


#  Count the number of optional services enabled, like streaming TV
def compute_service_features(inputDF: SparkDataFrame) -> SparkDataFrame:
  # Create pandas UDF function
  @pandas_udf('double')
  def num_optional_services(*cols):
    # Nested helper function to count the number of optional services in a pandas dataframe
    return sum(map(lambda s: (s == "Yes").astype('double'), cols))

  return inputDF.\
    withColumn("num_optional_services",
        num_optional_services("online_security", "online_backup", "device_protection", "tech_support", "streaming_tv", "streaming_movies"))

#### Define featurization function

In [0]:
def clean_churn_features(dataDF: SparkDataFrame) -> SparkDataFrame:
  """
  Simple cleaning function leveraging the Pandas API
  """

  # Convert to pandas on spark dataframe
  data_psdf = dataDF.pandas_api()

  # Convert some columns
  data_psdf = data_psdf.astype({"senior_citizen": "string"})
  data_psdf["senior_citizen"] = data_psdf["senior_citizen"].map({"1" : "Yes", "0" : "No"})

  data_psdf["total_charges"] = data_psdf["total_charges"].apply(lambda x: float(x) if x.strip() else 0)

  # Fill some missing numerical values with 0
  data_psdf = data_psdf.fillna({"tenure": 0.0})
  data_psdf = data_psdf.fillna({"monthly_charges": 0.0})
  data_psdf = data_psdf.fillna({"total_charges": 0.0})

  # Add/Force semantic data types for specific columns (to facilitate autoML)
  data_cleanDF = data_psdf.to_spark()
  data_cleanDF = data_cleanDF.withMetadata("customer_id", {"spark.contentAnnotation.semanticType":"native"})
  data_cleanDF = data_cleanDF.withMetadata("num_optional_services", {"spark.contentAnnotation.semanticType":"numeric"})

  return data_cleanDF

#### Compute Churn Features and append a timestamp

In [0]:
from datetime import datetime
from pyspark.sql.functions import lit


# Add current scoring timestamp
this_time = (datetime.now()).timestamp()
churn_features_n_predsDF = clean_churn_features(compute_service_features(telcoDF)) \
                            .withColumn("transaction_ts", lit(this_time).cast("timestamp"))

display(churn_features_n_predsDF)

#### Extract ground-truth labels in a separate table and drop them from the feature table

In [0]:
import pyspark.sql.functions as F


# Best practice: specify train-val-test split as categorical label (to be used by automl and/or model validation jobs)
train_ratio, val_ratio, test_ratio = 0.7, 0.2, 0.1

churn_features_n_predsDF.select("customer_id", "transaction_ts", "churn") \
                        .withColumn("random", F.rand(seed=42)) \
                        .withColumn("split",
                                    F.when(F.col("random") < train_ratio, "train")
                                    .when(F.col("random") < train_ratio + val_ratio, "validate")
                                    .otherwise("test")) \
                        .drop("random") \
                        .write.format("delta") \
                        .mode("overwrite").option("overwriteSchema", "true") \
                        .saveAsTable(f"workspace.customer_churn.advanced_churn_label_table")

churn_featuresDF = churn_features_n_predsDF.drop("churn")

In [0]:
display(churn_featuresDF)


#### Add primary key constraints to the label table for feature lookup

In [0]:
# set the catalog and schema

catalog = "workspace"
schema = "customer_churn"

spark.sql(f"USE {catalog}.{schema}")

In [0]:
%sql
ALTER TABLE advanced_churn_label_table DROP CONSTRAINT IF EXISTS advanced_churn_label_table_pk;
ALTER TABLE advanced_churn_label_table ALTER COLUMN customer_id SET NOT NULL;
ALTER TABLE advanced_churn_label_table ALTER COLUMN transaction_ts SET NOT NULL;
ALTER TABLE advanced_churn_label_table ADD CONSTRAINT advanced_churn_label_table_pk PRIMARY KEY(customer_id, transaction_ts);

#### 03. Write the feature table to Unity Catalog

#### Drop any existing online table (optional)

In [0]:
# set the catalog and schema

catalog = "workspace"
db = "customer_churn"

spark.sql(f"USE {catalog}.{db}")

In [0]:
from pprint import pprint
from databricks.sdk import WorkspaceClient


# Create a workspace client instance
w = WorkspaceClient()

# Remove any existing online feature table
try:
  online_table_specs = w.online_tables.get(f"{catalog}.{db}.advanced_churn_feature_table_online_table")
  # Drop existing online feature table
  w.online_tables.delete(f"{catalog}.{db}.advanced_churn_feature_table_online_table")
  print(f"Dropping online feature table: {catalog}.{db}.advanced_churn_feature_table_online_table")

except Exception as e:
  pprint(e)

#### Drop the feature table if it already exists

In [0]:
%sql
-- We are creating the feature table from scratch.
-- Let's drop any existing feature table if it exists
DROP TABLE IF EXISTS advanced_churn_feature_table;

#### Import Feature Store Client

In [0]:
from databricks.feature_engineering import FeatureEngineeringClient

fe = FeatureEngineeringClient()

#### Create "feature"/UC table

In [0]:
bronze_table_name = "churn_bronze_customers"
churn_feature_table = fe.create_table(
  name=f"{catalog}.{db}.advanced_churn_feature_table", # f"{catalog}.{dbName}.{feature_table_name}"
  primary_keys=["customer_id", "transaction_ts"],
  schema=churn_featuresDF.schema,
  timeseries_columns="transaction_ts",
  description=f"These features are derived from the {catalog}.{db}.{bronze_table_name} table in the lakehouse. We created service features and cleaned up their names.  No aggregations were performed. [Warning: This table doesn't store the ground truth and can now be used with AutoML's feature table integration."
)

#### Write the feature values to a feature table

In [0]:
fe.write_table(
  name=f"{catalog}.{db}.advanced_churn_feature_table",
  df=churn_featuresDF, # can be a streaming dataframe as well
  mode='merge' #'merge' supports schema evolution
)

#### 04. Define Featurization Logic for on-demand feature functions

In [0]:
%sql
CREATE OR REPLACE FUNCTION avg_price_increase(
  monthly_charges_in DOUBLE, 
  tenure_in DOUBLE, 
  total_charges_in DOUBLE
)
RETURNS FLOAT
LANGUAGE PYTHON
COMMENT "[Feature Function] Calculate potential average price increase for tenured customers based on last monthly charges and updated tenure"
AS $$
def avg_price_increase(monthly_charges_in, tenure_in, total_charges_in):
    if tenure_in > 0:
        return monthly_charges_in - total_charges_in / tenure_in
    else:
        return 0
$$

In [0]:
%sql
DESCRIBE FUNCTION avg_price_increase;