# Train a churn model using Feathr as a Feature Store
In this sample we'll show how to use [Feathr](https://github.com/linkedin/feathr) to register the Features used to train a Machine Learning Model.

We'll train a simple classification model to predict a Customers' churn using features from our Feature Store. We'll use [Azure Machine Learning](https://azure.microsoft.com/en-us/services/machine-learning/) to train our model and [Azure Databricks](https://docs.microsoft.com/en-us/azure/databricks/scenarios/what-is-azure-databricks) as the Spark Engine to process the features. 

**Feathr** is the feature store that is used in production in LinkedIn for many years and was open sourced in April 2022. Read our announcement on Open Sourcing Feathr and Feathr on Azure.

Feathr lets you:

* Define features based on raw data sources (batch and streaming) using pythonic APIs.
* Register and get features by names during model training and model inferencing.
* Share features across your team and company.

Feathr automatically computes your feature values and joins them to your training data, using point-in-time-correct semantics to avoid data leakage, and supports materializing and deploying your features for use online in production ([source](https://github.com/linkedin/feathr#what-is-feathr)). 

## Configure Feathr client
We use a `feathr_config.yaml` to define the feathr configurations. In this repo we share a sample [feathr_config file](../feathr_config.yaml.sample) so you can replace with the configs from your own environment. We also use a `config.py` file to define some sensitive data for our Service Principal credential as well as redis-password and ADLS key. You can also use an [Azure Key Vault](https://docs.microsoft.com/en-us/azure/key-vault/general/basic-concepts) to save the credentials.

In [None]:
import config

from feathr import FeathrClient
client = FeathrClient(config_path='../feathr_config.yaml')

## Define your Source
First we need to define the feature source to be used in our process. You can find more details about this process in this [Feathr doc](https://linkedin.github.io/feathr/concepts/feature-definition.html#step1-define-sources-section).

We'll use a sample Dataset with calls from Customers. Below a view of this Dataset.

![Calls](../Images/1-Calls.png)

For each customer we have the number of calls made and the average of the call duration.

**!IMPORTANT**: You have to import the example Datasets to your environment. Please import the [Dataset Folders](../Dataset/) to your dbfs.


In [None]:
from pyspark.sql import SparkSession, DataFrame
from feathr import HdfsSource

batch_source = HdfsSource(name="Customer",
                          path="dbfs:/delta/Calls/")

## Define the [`Anchor and Features`](https://linkedin.github.io/feathr/concepts/feature-definition.html#step2-define-anchors-and-features).

We'll define two features `f_NumberOfCalls` and `f_AverageCallDuration`. Here we show a very simple example only to demonstrate how we can define the features. However for more complex scenarios, involving for example transformations and calculations, you can use other Feathr routines. Take a look in the [Feathr Doc](https://linkedin.github.io/feathr/concepts/feature-definition.html#window-aggregation-features) for more use cases.

We define a Feature Anchor to combine the features and sources.

In [None]:
from feathr import TypedKey, ValueType, Feature, FeatureAnchor, INT32, INPUT_CONTEXT

customer_id = TypedKey(key_column="CustomerId",
                       key_column_type=ValueType.INT32,
                       description="CustomerId",
                       full_name="CustomerId")

features = [
    Feature(name="f_NumberOfCalls",
            feature_type=INT32,
            key=customer_id,
            transform="NumberOfCalls"),
    Feature(name="f_AverageCallDuration",
            feature_type=INT32,
            key=customer_id,
            transform="AverageCallDuration"),
]

request_anchor = FeatureAnchor(name="request_features",
                               source=batch_source,
                               features=features)

## Build the Features
Now we can build the Features indicating the anchor

In [None]:
client.build_features(anchor_list=[request_anchor])

## Working with the Training Data
Now with the Features defined we can work with our [`Observation Dataset`](https://linkedin.github.io/feathr/concepts/feathr-concepts-for-beginners.html#what-are-observation-data-and-why-does-feathr-need-keys-anchor-source).

In this case we need to train a model to predict the Customer's churn. So, we need to combine some Customers data with our Features using the `CustomerId` as the key to join these data. 

Below a sample of this Dataset:

![Customer](../Images/2-Customer.png)

All of the Dataset used use the Delta format. So we have to indicate in the Feathr settings the input and output format using Delta.

The code below will create a Spark job on Databricks to process the Features and to combine the features with the Observation Dataset. The output could be seen in the `output_path`.


In [None]:
from feathr import FeatureQuery, ObservationSettings
from feathr import TypedKey, ValueType, INT32
from feathr import SparkExecutionConfiguration

output_path = 'dbfs:/feathrazure_output'

feature_query = FeatureQuery(
    feature_list=["f_NumberOfCalls", "f_AverageCallDuration"], key=customer_id)

settings = ObservationSettings(
    observation_path="dbfs:/delta/Customer/")

client.get_offline_features(observation_settings=settings,
                            feature_query=feature_query,
                            output_path=output_path,
                            execution_configuratons=SparkExecutionConfiguration({"spark.feathr.inputFormat": "delta", 
                                                                                 "spark.feathr.outputFormat": "delta"}))
client.wait_job_to_finish(timeout_sec=500)

### Get the Result
After the Spark job succeded we can use the output to train our model. Feathr has some utils to work with the spark result and to transform it to a pandas Dataframe.

![Training Data](../Images/3-Training-Data.png)

In [None]:
from feathr.job_utils import get_result_df
df_res = get_result_df(client, format="delta", res_url = output_path)

df_res

## Train the model
Now with the features we can train our ML Model. We can combine features from the Feature Store with other columns from the Observation Dataset.

In [None]:
from sklearn.model_selection import train_test_split
import numpy as np
from sklearn.ensemble import RandomForestClassifier

seed = 2022
target_column = 'Churn'

columns = ['CustomerId', 'NumberOfLoans', 'SatisfactionIndex', 'ActualBalance', 'CLTV', 'f_NumberOfCalls', 'f_AverageCallDuration', 'Churn']

customer_train_dataset = df_res[columns]

train, test = train_test_split(customer_train_dataset, random_state=seed, test_size=0.33)

drop_columns = [target_column, 'CustomerId'] 

X_train = train.drop(drop_columns, axis=1)
X_test = test.drop(drop_columns, axis=1)

y_train = train[target_column]
y_test = test[target_column]

n_estimators = 3

model = RandomForestClassifier(n_estimators=n_estimators, random_state=np.random.RandomState(seed))
model.fit(X_train, y_train)

## Materialize the Features to Online Store
After process our feature we can materialize them to serve online models. In this case we can provide low latency and fast queries to the consumers.

We'll use the [Azure Cache for Redis](https://docs.microsoft.com/en-us/azure/azure-cache-for-redis/cache-overview) an in-memory data store based on the Redis software.

In [None]:
from feathr import RedisSink, MaterializationSettings

redisSink = RedisSink(table_name="CustomerChurn")

# Materialize two features into a redis table.
settings = MaterializationSettings("CustomerChurnJob",
                                    sinks=[redisSink],
                                    feature_names=["f_NumberOfCalls", "f_AverageCallDuration"])

client.materialize_features(settings, execution_configuratons=SparkExecutionConfiguration({"spark.feathr.inputFormat": "delta", 
                                                                                           "spark.feathr.outputFormat": "delta"
                                                                                        }))

res = client.get_online_features('CustomerChurn', '9996', ['f_NumberOfCalls', 'f_AverageCallDuration'])
res

## Use an Azure ML Pipeline to train, register and deploy the model

### Train, Register and Deploy the model using an Azure ML experiment

In [None]:
from azureml.core import Workspace
from azureml.core import Experiment
from azureml.core import ScriptRunConfig, Environment
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.compute import ComputeTarget
from azureml.core.compute_target import ComputeTargetException
from azureml.pipeline.steps import PythonScriptStep
from azureml.pipeline.core import Pipeline
from azureml.data import OutputFileDatasetConfig

ws = Workspace.from_config()

#cluster_name = '<YOUR-CLUSTER-NAME>'
cluster_name = 'cluster-lab-01'
compute_target = ComputeTarget(workspace=ws, name=cluster_name)

experiment_name = 'churn-experiment'
experiment = Experiment(workspace=ws, name=experiment_name)

# Define the environment

# First time you run you should register the environment
# churn_env = Environment(workspace=ws, name="ChurnModel-Env")
# conda_dep = CondaDependencies("../Pipelines/conda.yaml")

# Adds dependencies to PythonSection of myenv
# churn_env.python.conda_dependencies=conda_dep
# churn_env.register(ws)

# If you already have the environment you can get it
churn_env = Environment.get(workspace=ws, name="ChurnModel-Env")

# The folder with your python scripts
project_folder = '../Pipelines'

datastore_output = ws.get_default_datastore()

output_data = OutputFileDatasetConfig(name="output_data", 
                                      destination=(datastore_output, "output_data/{run-id}/{output-name}"))

train_cfg = ScriptRunConfig(
    source_directory=project_folder,
    script="train.py",
    compute_target=compute_target,
    environment=churn_env,
)

train_step = PythonScriptStep(
    name="Train",
    source_directory=train_cfg.source_directory,
    script_name=train_cfg.script,
    runconfig=train_cfg.run_config,
    arguments=["--output", output_data]
)

deploy_cfg = ScriptRunConfig(
    source_directory=project_folder,
    script="deploy.py",
    compute_target=compute_target,
    environment=churn_env    
)

deploy_step = PythonScriptStep(
    name="Deploy",
    source_directory=train_cfg.source_directory,
    script_name=train_cfg.script,
    runconfig=train_cfg.run_config,
    inputs=[output_data]
)

pipeline = Pipeline(ws, steps=[train_step, deploy_step])

# Submit the run
run = experiment.submit(pipeline)

After the pipeline execution we'll have the model trained and registered in the Azure Machine Learning Workspace. We'll also have an realtime endpoint deployed on AKS.

![Azure ML Pipeline](../Images/4-Azure-ML-Pipeline.png)