###Step1: Ingest Data to Notebook

We will download the dataset hosted at  [**Kaggle**](https://www.kaggle.com/mathchi/churn-for-bank-customers)

#### Content
*   `RowNumber` —corresponds to the record (row) number and has no effect on the output.
*   `CustomerId` -contains random values and has no effect on customer leaving the bank.
*   `Surname` —the surname of a customer has no impact on their decision to leave the bank.
*   `CreditScore` —can have an effect on customer churn, since a customer with a higher credit score is less likely to leave the bank.
*   `Geography` —a customer’s location can affect their decision to leave the bank.
*   `Gender` —it’s interesting to explore whether gender plays a role in a customer leaving the bank
*   `Age` —this is certainly relevant, since older customers are less likely to leave their bank than younger ones.
*   `Tenure` —refers to the number of years that the customer has been a client of the bank. Normally, older clients are more loyal and less likely to leave a bank
*   `Balance` —also a very good indicator of customer churn, as people with a higher balance in their accounts are less likely to leave the bank compared to those with lower balances.
*   `NumOfProducts` —refers to the number of products that a customer has purchased through the bank.
*   `HasCrCard` —denotes whether or not a customer has a credit card. This column is also relevant, since people with a credit card are less likely to leave the bank.
*   `IsActiveMember` —active customers are less likely to leave the bank
*   `EstimatedSalary` —as with balance, people with lower salaries are more likely to leave the bank compared to those with higher salaries.
*   `Exited` —whether or not the customer left the bank.

## Import Data


In [0]:
import os
bank_df = spark.read.option("header", True).option("inferSchema", True).csv(f"file:{os.path.dirname(os.getcwd())}/data/churn.csv")
display(bank_df)

In [0]:
# Lets get unique value count in Surname
bank_df.select('Surname').distinct().count()

In [0]:
# Drop RowNumber and Surname as they add no value to our model
bank_df = bank_df.drop('RowNumber', 'Surname')
display(bank_df)

### Create Feature Table

#### 1. Defining a database to store feature tables.

In [0]:
%sql
SELECT current_catalog() AS current_catalog, current_schema() AS current_schema

In [0]:
DATABASE_NAME = "bank_churn_analysis"
#setup database that will hold our Feature tables in Delta format.
spark.sql(f"CREATE DATABASE IF NOT EXISTS {DATABASE_NAME}")

In [0]:
bank_df.write.format("delta").mode("overwrite").saveAsTable(f"{DATABASE_NAME}.raw_data")

#### 2. Defining a feature engineering function that will return a Spark dataframe with a unique primary key. 

In [0]:
import pyspark.pandas as ps
import numpy as np

def compute_features(spark_df):
    # https://spark.apache.org/docs/latest/api/python/migration_guide/koalas_to_pyspark.html?highlight=dataframe%20pandas_api
    # Convert to pyspark.pandas DataFrame
    ps_df = spark_df.pandas_api()
    
    # One-Hot Encoding for Geography and Gender
    ohe_ps_df = ps.get_dummies(
      ps_df, 
      columns=["Geography", "Gender"],
      dtype="int",
      drop_first=True
    )
    
    # Clean up column names
    ohe_ps_df.columns = ohe_ps_df.columns.str.replace(r' ', '', regex=True)
    ohe_ps_df.columns = ohe_ps_df.columns.str.replace(r'\(', '-', regex=True)
    ohe_ps_df.columns = ohe_ps_df.columns.str.replace(r'\)', '', regex=True)
    
    ## Additional example feature engineering steps

    # Create a binary feature indicating whether the balance is zero or not
    ohe_ps_df['Is_Balance_Zero'] = (ohe_ps_df['Balance'] == 0).astype('int')
    
    # Ratio of Tenure to Age
    ohe_ps_df['Tenure_to_Age'] = ohe_ps_df['Tenure'] / ohe_ps_df['Age']
    
    # Interaction feature: Balance to EstimatedSalary ratio
    ohe_ps_df['Balance_to_Salary'] = ohe_ps_df['Balance'] / ohe_ps_df['EstimatedSalary']
    
    return ohe_ps_df


In [0]:
# Disable ANSI mode
spark.conf.set("spark.sql.ansi.enabled", "false")

# Compute features
bank_features_df = compute_features(bank_df)
display(bank_features_df)

####3. Create the Feature Table

In [0]:
# Instantiate the feature store client using `FeatureStoreClient()`.
from databricks.feature_store import FeatureStoreClient
fs = FeatureStoreClient()

In [0]:
bank_feature_table = fs.create_table(
  name=f"{DATABASE_NAME}.bank_customer_features", # the name of the feature table
  primary_keys=["CustomerId"], # primary key that will be used to perform joins
  schema=bank_features_df.spark.schema(), # the schema of the Feature table
  description="This customer level table contains one-hot encoded categorical and scaled numeric features to predict bank customer churn."
)


#### 4. Populate the feature table using write_table

In [0]:
fs.write_table(df=bank_features_df.to_spark(), name=f"{DATABASE_NAME}.bank_customer_features", mode="overwrite")

#### Cleanup

In [0]:
 fs.drop_table(
   name=f"{DATABASE_NAME}.bank_customer_features"
)