# MLflow introduction.

This tutorial covers an example of how to use the integrated MLflow tracking capabilities to track your model training with the integrated feature store.
  - Import data that was previously registered in the feature store table.
  - Create a baseline model for churn prediction and store it in the integrated MLflow tracking server.

In [0]:
#install latest version of sklearn
%pip install -U scikit-learn

### Step 1) Importing the desired libraries and defining few constants and creating training set from the registered feature table.

In [0]:
from databricks.feature_engineering import FeatureEngineeringClient, FeatureLookup
import typing

from sklearn import metrics
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
import mlflow
import pandas as pd

In [0]:
#Name of the model
MODEL_NAME = "juan_dev.data_science.random_forest_classifier_featurestore"
#This is the name for the entry in model registry
MODEL_REGISTRY_NAME = "juan_dev.data_science.bank_customer_churn"
#The email you use to authenticate in the Databricks workspace
USER_EMAIL = "juan.lamadrid@databricks.com"
#Location where the MLflow experiement will be listed in user workspace
EXPERIMENT_NAME = f"/Users/{USER_EMAIL}/bank_customer_churn_analysis_experiment"
# we have all the features backed into a Delta table so we will read directly
FEATURE_TABLE = "juan_dev.data_science.bank_customer_features"


In [0]:

# this code is just for demonstration and you can utilize this as starting point and build more errorhandling around it.
class Feature_Lookup_Input_Tuple(typing.NamedTuple):
  fature_table_name: str
  feature_list: typing.Union[typing.List[str], None] 
  lookup_key: typing.List[str]

# this code is going to generate feature look up based on on the list of feature mappings provided.
def generate_feature_lookup(feature_mapping: typing.List[Feature_Lookup_Input_Tuple]) -> typing.List[FeatureLookup]:  
  lookups = []
  for fature_table_name, feature_list, lookup_key in feature_mapping:
    lookups.append(
          FeatureLookup(
          table_name = fature_table_name,
          feature_names = feature_list,
          lookup_key = lookup_key 
      )
    )
  return lookups


### Step 2) Build a simplistic model that uses the feature store table as its source for training and validation.

In [0]:
import mlflow
mlflow.set_registry_uri("databricks-uc")

In [0]:
fe = FeatureEngineeringClient()
mlflow.set_experiment(EXPERIMENT_NAME)

with mlflow.start_run():  
  TEST_SIZE = 0.20
  
  #define the list of features we want to get from feature table
  #If we havse to combine data from multiple feature tables then we can provide multiple mappings for feature tables 
  features = [Feature_Lookup_Input_Tuple(FEATURE_TABLE,["CreditScore" , "Age", "Tenure",\
              "Balance", "NumOfProducts", "HasCrCard",\
              "IsActiveMember", "EstimatedSalary", "Geography_Germany",\
              "Geography_Spain", "Gender_Male"], ["CustomerId"] )]

  lookups = generate_feature_lookup(features)
  
  #Now we will simulate receiving only ID's of customers and the label as input at the  time of inference
  training_df = spark.table(FEATURE_TABLE).select("CustomerId", "Exited")
  
  #Using the training set we will combine the training dataframe with the features stored in the feature tables.
  training_data = fe.create_training_set(
    df=training_df,
    feature_lookups=lookups,
    label="Exited",
    exclude_columns=['CustomerId']
  )
  
  #convert the dataset to pandas so that we can fit sklearn RandomForestClassifier on it
  train_df = training_data.load_df().toPandas()
  
  #The train_df represents the input dataframe that has all the feature columns along with the new raw input in the form of training_df.
  X = train_df.drop(['Exited'], axis=1)
  y = train_df['Exited']
  X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=TEST_SIZE, random_state=54, stratify=y)
  
  #here we will are not doing any hyperparameter tuning however, in future we will see how to perform hyperparameter tuning in scalable manner on Databricks.
  model = RandomForestClassifier(n_estimators=100).fit(X_train, y_train)
  
  signature = mlflow.models.signature.infer_signature(X_train, model.predict(X_train))
  
  predictions = model.predict(X_test)
  fpr, tpr, _ = metrics.roc_curve(y_test, predictions, pos_label=1)
  auc = metrics.auc(fpr, tpr)
  accuracy = metrics.accuracy_score(y_test, predictions)
 
  #get the calculated feature importances.
  importances = dict(zip(model.feature_names_in_, model.feature_importances_))  
  #log artifact
  mlflow.log_dict(importances, "feature_importances.json")
  #log metrics
  mlflow.log_metric("auc", auc)
  mlflow.log_metric("accuracy", accuracy)
  #log parameters
  mlflow.log_param("split_size", TEST_SIZE)
  mlflow.log_params(model.get_params())
  #set tag
  mlflow.set_tag(MODEL_NAME, "mlflow and feature store demo")
  #log the model itself in mlflow tracking server
  mlflow.sklearn.log_model(model, MODEL_NAME, signature=signature, input_example=X_train.iloc[:4, :])

  # finally to make the feature store track what features are being used by our model we call log_model with the feature store client
  fe.log_model(
    model=model,
    artifact_path=MODEL_NAME,
    flavor=mlflow.sklearn,
    training_set=training_data,
    registered_model_name=MODEL_REGISTRY_NAME #
  )
  
  

### Step 3) Now that we have the model logged to the MLflow tracking server, we can get the latest version from the experiment and use it.

In [0]:
from mlflow.tracking.client import MlflowClient

#initialize the mlflow client
client = MlflowClient()

#get the experiment id 
experiment_id = mlflow.get_experiment_by_name(EXPERIMENT_NAME).experiment_id
#get the latest run id which will allow us to directly access the metrics, and attributes and all th einfo
run_id = mlflow.search_runs(experiment_id, order_by=["start_time DESC"]).head(1)["run_id"].values[0]

- With the feature store registration associated with the MLflow model, we don't have to specify any data loading and processing to happen other than a point to the raw data that features will be calculated from. 
- We can do batch predictions simply by accessing the feature store instance, providing the run_id and the model's name (MODEL_NAME below) with the raw data specified as the second argument. 
- If we want to provide new values for certain feature that is already part of the feature table, just include it in the new dataframe that we want to perform the prediction on.

In [0]:
#at the time of infernce you can provide just the CustomerId. This is the key that will perform all the lookup for the features automatically.
predictions = fe.score_batch(model_uri=f"runs:/{run_id}/{MODEL_NAME}", df=spark.table(FEATURE_TABLE).select("CustomerId"))

In [0]:
display(predictions)


##Cleanup

In [0]:
#Uncomment to lines below and execute for cleaning up.
'''
from mlflow.tracking import MlflowClient

#get all the information about the current experiment
experiment_id = mlflow.get_experiment_by_name(EXPERIMENT_NAME).experiment_id

#list all the runs that are part of this experiment and delete them
runs = mlflow.list_run_infos(experiment_id=experiment_id)
for run in runs:
  mlflow.delete_run(run_id = run.run_id)

#finally delete the experiment  
mlflow.delete_experiment(experiment_id=experiment_id)  

client = MlflowClient()
#delete the model registered in the registry to clear the linkage in thefeature store
client.delete_registered_model(name=MODEL_REGISTRY_NAME)
'''