## Train a model

This section illustrates how to train a model using the pickup and dropoff features stored in Feature Store. It trains a LightGBM model to predict taxi fare.

In [0]:
raw_data = spark.read.format("delta").load("/databricks-datasets/nyctaxi-with-zipcodes/subsampled")

### Helper functions

In [0]:
from pyspark.sql import *
from databricks import feature_store
from pyspark.sql.functions import *
from pytz import timezone
from pyspark.sql.types import IntegerType
import math
from datetime import timedelta
import mlflow.pyfunc


def rounded_unix_timestamp(dt, num_minutes=15):
    """
    Ceilings datetime dt to interval num_minutes, then returns the unix timestamp.
    """
    nsecs = dt.minute * 60 + dt.second + dt.microsecond * 1e-6
    delta = math.ceil(nsecs / (60 * num_minutes)) * (60 * num_minutes) - nsecs
    return int((dt + timedelta(seconds=delta)).timestamp())


rounded_unix_timestamp_udf = udf(rounded_unix_timestamp, IntegerType())


def rounded_taxi_data(taxi_data_df):
    # Round the taxi data timestamp to 15 and 30 minute intervals so we can join with the pickup and dropoff features
    # respectively.
    taxi_data_df = (
        taxi_data_df.withColumn(
            "rounded_pickup_datetime",
            rounded_unix_timestamp_udf(taxi_data_df["tpep_pickup_datetime"], lit(15)),
        )
        .withColumn(
            "rounded_dropoff_datetime",
            rounded_unix_timestamp_udf(taxi_data_df["tpep_dropoff_datetime"], lit(30)),
        )
        .drop("tpep_pickup_datetime")
        .drop("tpep_dropoff_datetime")
    )
    taxi_data_df.createOrReplaceTempView("taxi_data")
    return taxi_data_df
  
def get_latest_model_version(model_name):
  latest_version = 1
  mlflow_client = MlflowClient()
  for mv in mlflow_client.search_model_versions(f"name='{model_name}'"):
    version_int = int(mv.version)
    if version_int > latest_version:
      latest_version = version_int
  return latest_version

In [0]:
taxi_data = rounded_taxi_data(raw_data)
fs = feature_store.FeatureStoreClient()

### Understanding how a training dataset is created

In order to train a model, you need to create a training dataset that is used to train the model.  The training dataset is comprised of:

1. Raw input data
1. Features from the feature store

The raw input data is needed because it contains:

1. Primary keys used to join with features.
1. Raw features like `trip_distance` that are not in the feature store.
1. Prediction targets like `fare` that are required for model training.

Here's a visual overview that shows the raw input data being combined with the features in the Feature Store to produce the training dataset:

<img src="https://docs.databricks.com/_static/images/machine-learning/feature-store/taxi_example_feature_lookup.png"/>

These concepts are described further in the Creating a Training Dataset documentation ([AWS](https://docs.databricks.com/applications/machine-learning/feature-store.html#create-a-training-dataset)|[Azure](https://docs.microsoft.com/en-us/azure/databricks/applications/machine-learning/feature-store#create-a-training-dataset)|[GCP](https://docs.gcp.databricks.com/applications/machine-learning/feature-store.html#create-a-training-dataset)).

The next cell loads features from Feature Store for model training by creating a `FeatureLookup` for each needed feature.

In [0]:
from databricks.feature_store import FeatureLookup
import mlflow

pickup_features_table = "feature_store_taxi_example_km.trip_pickup_features"
dropoff_features_table = "feature_store_taxi_example_km.trip_dropoff_features"

pickup_feature_lookups = [
    FeatureLookup( 
      table_name = pickup_features_table,
      feature_name = "mean_fare_window_1h_pickup_zip",
      lookup_key = ["pickup_zip", "rounded_pickup_datetime"],
    ),
    FeatureLookup( 
      table_name = pickup_features_table,
      feature_name = "count_trips_window_1h_pickup_zip",
      lookup_key = ["pickup_zip", "rounded_pickup_datetime"],
    ),
]

dropoff_feature_lookups = [
    FeatureLookup( 
      table_name = dropoff_features_table,
      feature_name = "count_trips_window_30m_dropoff_zip",
      lookup_key = ["dropoff_zip", "rounded_dropoff_datetime"],
    ),
    FeatureLookup( 
      table_name = dropoff_features_table,
      feature_name = "dropoff_is_weekend",
      lookup_key = ["dropoff_zip", "rounded_dropoff_datetime"],
    ),
]

### Create a Training Dataset

When `fs.create_training_set(..)` is invoked below, the following steps will happen:

1. A `TrainingSet` object will be created, which will select specific features from Feature Store to use in training your model. Each feature is specified by the `FeatureLookup`'s created above. 

1. Features are joined with the raw input data according to each `FeatureLookup`'s `lookup_key`.

The `TrainingSet` is then transformed into a DataFrame to train on. This DataFrame includes the columns of taxi_data, as well as the features specified in the `FeatureLookups`.

In [0]:
mlflow.set_experiment("/Users/kimberly.mahoney@databricks.com/gbm_taxi_data")

In [0]:
# End any existing runs (in the case this notebook is being run for a second time)
mlflow.end_run()

# Start an mlflow run, which is needed for the feature store to log the model
mlflow.start_run() 

# Since the rounded timestamp columns would likely cause the model to overfit the data 
# unless additional feature engineering was performed, exclude them to avoid training on them.
exclude_columns = ["rounded_pickup_datetime", "rounded_dropoff_datetime"]

# Create the training set that includes the raw input data merged with corresponding features from both feature tables
training_set = fs.create_training_set(
  taxi_data,
  feature_lookups = pickup_feature_lookups + dropoff_feature_lookups,
  label = "fare_amount",
  exclude_columns = exclude_columns
)

# Load the TrainingSet into a dataframe which can be passed into sklearn for training a model
training_df = training_set.load_df()

In [0]:
# Display the training dataframe, and note that it contains both the raw input data and the features from the Feature Store, like `dropoff_is_weekend`
display(training_df)

trip_distance,pickup_zip,dropoff_zip,mean_fare_window_1h_pickup_zip,count_trips_window_1h_pickup_zip,count_trips_window_30m_dropoff_zip,dropoff_is_weekend,fare_amount
5.35,10003,11238,14.333333,3.0,1.0,0.0,18.5
6.5,10282,10001,21.5,1.0,2.0,0.0,21.5
0.9,10119,10003,5.5,1.0,1.0,0.0,5.5
3.5,10001,11222,,,,,13.5
0.3,10028,10028,4.25,2.0,1.0,0.0,3.5
0.0,10038,10005,5.0,1.0,1.0,0.0,5.0
6.57,10001,11377,21.5,1.0,1.0,0.0,21.5
1.08,10103,10167,16.75,2.0,1.0,0.0,11.5
0.8,10003,10011,6.25,2.0,2.0,1.0,7.0
1.3,10199,10020,9.0,1.0,2.0,1.0,9.0


Train a LightGBM model on the data returned by `TrainingSet.to_df`, then log the model with `FeatureStoreClient.log_model`. The model will be packaged with feature metadata.

In [0]:
from sklearn.model_selection import train_test_split
from mlflow.tracking import MlflowClient
import lightgbm as lgb
import mlflow.lightgbm
from mlflow.models.signature import infer_signature

# mlflow.start_run()
# mlflow.autolog()
mlflow.lightgbm.autolog()

features_and_label = training_df.columns

# Collect data into a Pandas array for training
data = training_df.toPandas()[features_and_label]

train, test = train_test_split(data, random_state=123)
X_train = train.drop(["fare_amount"], axis=1)
X_test = test.drop(["fare_amount"], axis=1)
y_train = train.fare_amount
y_test = test.fare_amount

mlflow.lightgbm.autolog()
train_lgb_dataset = lgb.Dataset(X_train, label=y_train.values)
test_lgb_dataset = lgb.Dataset(X_test, label=y_test.values)

param = {"num_leaves": 32, "objective": "regression", "metric": "rmse"}
num_rounds = 100

# Train a lightGBM model
model = lgb.train(
  param, train_lgb_dataset, num_rounds
)

mlflow.log_metric("random_metric", 1)

In [0]:
# Log the trained model with MLflow and package it with feature lookup information. 
fs.log_model(
  model,
  artifact_path="model_packaged",
  flavor=mlflow.lightgbm,
  training_set=training_set,
  registered_model_name="taxi_example_fare_packaged_km"
)

In [0]:
mlflow.end_run()