In [None]:
dbutils.widgets.text("RESOURCE_PREFIX", "")
dbutils.widgets.text("REDIS_KEY", "")

# Feathr Feature Store on Databricks Demo Notebook

This notebook illustrates the use of Feature Store to create a model that predicts NYC Taxi fares. The dataset comes from [here](https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page).

This notebook is specifically written for Databricks and is relying on some of the Databricks packages such as `dbutils`. The intention here is to provide a "one click run" example with minimum configuration. For example:
- In this notebook skips feature registry which requires running Azure Purview. 
- You will need to configure the Redis endpoint to make the online feature query work. 

The full-fledged notebook can be found from [here](https://github.com/feathr-ai/feathr/blob/main/feathr_project/feathrcli/data/feathr_user_workspace/nyc_driver_demo.ipynb).


## Prerequisite

Feathr has native cloud integration. First step is to provision required cloud resources if you want to use Feathr.

Follow the [Feathr ARM deployment guide](https://feathr-ai.github.io/feathr/how-to-guides/azure-deployment-arm.html) to run Feathr on Azure. This allows you to quickly get started with automated deployment using Azure Resource Manager template. For more details, please refer [README.md](https://github.com/feathr-ai/feathr#%EF%B8%8F-running-feathr-on-cloud-with-a-few-simple-steps).

Additionally, to run this notebook, you'll need to install `feathr` pip package. For local spark, simply run `pip install feathr` on the machine that runs this notebook. To use Databricks or Azure Synapse Analytics, please see dependency management documents:
- [Azure Databricks dependency management](https://learn.microsoft.com/en-us/azure/databricks/libraries/)
- [Azure Synapse Analytics dependency management](https://learn.microsoft.com/en-us/azure/synapse-analytics/spark/apache-spark-azure-portal-add-libraries)


## Notebook Steps

This tutorial demonstrates the key capabilities of Feathr, including:

1. Install Feathr and necessary dependencies.
1. Create shareable features with Feathr feature definition configs.
1. Create training data using point-in-time correct feature join
1. Train and evaluate a prediction model.
1. Materialize feature values to the online store.
1. Fetch feature value in real-time from online store for online scoring.

The overall data flow is as follows:

<img src="https://github.com/linkedin/feathr/blob/main/docs/images/feature_flow.png?raw=true" width="800">

Install notebook-scoped library. For details, please see [Azure Databricks dependency management document.](https://learn.microsoft.com/en-us/azure/databricks/libraries/) 

In [None]:
%pip install feathr

In [None]:
# Check Databricks runtime version
spark.conf.get("spark.databricks.clusterUsageTags.sparkVersion")

In [None]:
from datetime import datetime, timedelta
import glob
import json
from math import sqrt
import os
import requests
from tempfile import NamedTemporaryFile

from feathr import (
    FeathrClient,
    # Feature data types
    BOOLEAN, FLOAT, INT32, ValueType,
    # Feature data sources
    INPUT_CONTEXT, HdfsSource,
    # Feature aggregations
    TypedKey, WindowAggTransformation,
    # Feature types and anchor
    DerivedFeature, Feature, FeatureAnchor,
    # Materialization
    BackfillTime, MaterializationSettings, RedisSink,
    # Offline feature computation
    FeatureQuery, ObservationSettings,
)
from feathr.spark_provider.feathr_configurations import SparkExecutionConfiguration
import pandas as pd
from pyspark.sql import DataFrame
import pyspark.sql.functions as F

## 2. Create Shareable Features with Feathr Feature Definition Configs

In this notebook, we define all the necessary resource key values for authentication. These values can also be retrieved by using [Azure Key Vault](https://azure.microsoft.com/en-us/services/key-vault/) cloud key value store.
Please refer to [how-to guide documents for granting key-vault access](https://feathr-ai.github.io/feathr/how-to-guides/azure-deployment-arm.html#3-grant-key-vault-and-synapse-access-to-selected-users-optional) for more details.

In [None]:
RESOURCE_PREFIX = dbutils.widgets.get("RESOURCE_PREFIX")
PROJECT_NAME = "feathr_getting_started"
ROOT_PATH = "./"  # Could be Azure Blob File System path, abfs or wasbs too.

REDIS_KEY = dbutils.widgets.get("REDIS_KEY")

Get the required databricks credentials automatically:

In [None]:
# Redis credential
os.environ['REDIS_PASSWORD'] = REDIS_KEY

# To use a databricks cluster:
ctx = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
databricks_config = {
    'run_name': "FEATHR_FILL_IN",
    'existing_cluster_id': ctx.tags().get('clusterId').get(),
    'libraries': [{'jar': "FEATHR_FILL_IN"}],
    'spark_jar_task': {
        'main_class_name': "FEATHR_FILL_IN",
        'parameters': ["FEATHR_FILL_IN"],
    },
}
os.environ['spark_config__spark_cluster'] = "databricks"
os.environ['spark_config__databricks__workspace_instance_url'] = "https://" + ctx.tags().get('browserHostName').get()
os.environ['spark_config__databricks__config_template'] = json.dumps(databricks_config)
os.environ['spark_config__databricks__work_dir'] = "dbfs:/feathr_getting_started"
os.environ['DATABRICKS_WORKSPACE_TOKEN_VALUE'] = ctx.apiToken().get()

### Configurations

Feathr uses a yaml file to define configurations. Please refer to [feathr_config.yaml]( https://github.com/linkedin/feathr/blob/main/feathr_project/feathrcli/data/feathr_user_workspace/feathr_config.yaml) for the meaning of each field.

In [None]:
yaml_config = f"""
api_version: 1

project_config:
  project_name: {PROJECT_NAME}
  required_environment_variables:
    - 'REDIS_PASSWORD'
    - 'AZURE_CLIENT_ID'
    - 'AZURE_TENANT_ID'
    - 'AZURE_CLIENT_SECRET'
    
feature_registry:
  api_endpoint: 'https://{RESOURCE_PREFIX}webapp.azurewebsites.net/api/v1'

spark_config:
  # Currently support: 'azure_synapse', 'databricks', and 'local'
  spark_cluster: 'local'
  spark_result_output_parts: '1'

offline_store:
  wasb:
    wasb_enabled: true

online_store:
  # You can skip this part if you don't have Redis and skip materialization later in this notebook.
  redis:
    host: '{RESOURCE_PREFIX}redis.redis.cache.windows.net'
    port: 6380
    ssl_enabled: true
"""

tmp = NamedTemporaryFile(mode='w', delete=False)
with open(tmp.name, "w") as config_file:
    config_file.write(yaml_config)

All the configurations can be overwritten by environment variables with concatenation of `__` for different layers of the config file. For example, `feathr_runtime_location` for databricks config can be overwritten by setting `spark_config__databricks__feathr_runtime_location` environment variable.

### Initialize Feathr Client

In [None]:
client = FeathrClient(config_path=tmp.name)

### View the NYC taxi fare dataset

In [None]:
DATA_FILEPATH = "wasbs://public@azurefeathrstorage.blob.core.windows.net/sample_data/green_tripdata_2020-04_with_index.csv"
pd.read_csv("https://azurefeathrstorage.blob.core.windows.net/public/sample_data/green_tripdata_2020-04_with_index.csv").head()

### Defining features with Feathr

In Feathr, a feature is viewed as a function, mapping a key and timestamp to a feature value. For more details, please see [Feathr Feature Definition Guide](https://github.com/feathr-ai/feathr/blob/main/docs/concepts/feature-definition.md).

* The feature key (a.k.a. entity id) identifies the subject of feature, e.g. a user_id or location_id.
* The feature name is the aspect of the entity that the feature is indicating, e.g. the age of the user.
* The feature value is the actual value of that aspect at a particular time, e.g. the value is 30 at year 2022.

Note that, in some cases, a feature could be just a transformation function that has no entity key or timestamp involved, e.g. *the day of week of the request timestamp*.

There are two types of features -- anchored features and derivated features:

* **Anchored features**: Features that are directly extracted from sources. Could be with or without aggregation. 
* **Derived features**: Features that are computed on top of other features.

#### Define anchored features

A feature source is needed for anchored features that describes the raw data in which the feature values are computed from. A source value should be either `INPUT_CONTEXT` (the features that will be extracted from the observation data directly) or `feathr.source.Source` object.

In [None]:
TIMESTAMP_COL = "lpep_dropoff_datetime"
TIMESTAMP_FORMAT = "yyyy-MM-dd HH:mm:ss"

In [None]:
# We define f_trip_distance and f_trip_time_duration features separately
# so that we can reuse them later for the derived features.
f_trip_distance = Feature(
    name="f_trip_distance",
    feature_type=FLOAT,
    transform="trip_distance",
)
f_trip_time_duration = Feature(
    name="f_trip_time_duration",
    feature_type=FLOAT,
    transform="cast_float((to_unix_timestamp(lpep_dropoff_datetime) - to_unix_timestamp(lpep_pickup_datetime)) / 60)",
)

features = [
    f_trip_distance,
    f_trip_time_duration,
    Feature(
        name="f_is_long_trip_distance",
        feature_type=BOOLEAN,
        transform="trip_distance > 30.0",
    ),
    Feature(
        name="f_day_of_week",
        feature_type=INT32,
        transform="dayofweek(lpep_dropoff_datetime)",
    ),
    Feature(
        name="f_day_of_month",
        feature_type=INT32,
        transform="dayofmonth(lpep_dropoff_datetime)",
    ),
    Feature(
        name="f_hour_of_day",
        feature_type=INT32,
        transform="hour(lpep_dropoff_datetime)",
    ),
]

# After you have defined features, bring them together to build the anchor to the source.
feature_anchor = FeatureAnchor(
    name="feature_anchor",
    source=INPUT_CONTEXT,  # Pass through source, i.e. observation data.
    features=features,
)

We can define the source with a preprocessing python function.

In [None]:
def preprocessing(df: DataFrame) -> DataFrame:
    import pyspark.sql.functions as F
    df = df.withColumn("fare_amount_cents", (F.col("fare_amount") * 100.0).cast("float"))
    return df

batch_source = HdfsSource(
    name="nycTaxiBatchSource",
    path=DATA_FILEPATH,
    event_timestamp_column=TIMESTAMP_COL,
    preprocessing=preprocessing,
    timestamp_format=TIMESTAMP_FORMAT,
)

For the features with aggregation, the supported functions are as follows:

| Aggregation Function | Input Type | Description |
| --- | --- | --- |
|SUM, COUNT, MAX, MIN, AVG	|Numeric|Applies the the numerical operation on the numeric inputs. |
|MAX_POOLING, MIN_POOLING, AVG_POOLING	| Numeric Vector | Applies the max/min/avg operation on a per entry bassis for a given a collection of numbers.|
|LATEST| Any |Returns the latest not-null values from within the defined time window |

In [None]:
agg_key = TypedKey(
    key_column="DOLocationID",
    key_column_type=ValueType.INT32,
    description="location id in NYC",
    full_name="nyc_taxi.location_id",
)

agg_window = "90d"

# Anchored features with aggregations
agg_features = [
    Feature(
        name="f_location_avg_fare",
        key=agg_key,
        feature_type=FLOAT,
        transform=WindowAggTransformation(
            agg_expr="fare_amount_cents",
            agg_func="AVG",
            window=agg_window,
        ),
    ),
    Feature(
        name="f_location_max_fare",
        key=agg_key,
        feature_type=FLOAT,
        transform=WindowAggTransformation(
            agg_expr="fare_amount_cents",
            agg_func="MAX",
            window=agg_window,
        ),
    ),
]

agg_feature_anchor = FeatureAnchor(
    name="agg_feature_anchor",
    source=batch_source,  # External data source for feature. Typically a data table.
    features=agg_features,
)

#### Define derived features

We also define a derived feature, `f_trip_time_distance`, from the anchored features `f_trip_distance` and `f_trip_time_duration` as follows:

In [None]:
derived_features = [
    DerivedFeature(
        name="f_trip_time_distance",
        feature_type=FLOAT,
        input_features=[
            f_trip_distance,
            f_trip_time_duration,
        ],
        transform="f_trip_distance / f_trip_time_duration",
    )
]

### Build features

Finally, we build the features.

In [None]:
client.build_features(
    anchor_list=[feature_anchor, agg_feature_anchor],
    derived_feature_list=derived_features,
)

## 3. Create Training Data Using Point-in-Time Correct Feature Join

After the feature producers have defined the features (as described in the Feature Definition part), the feature consumers may want to consume those features. Feature consumers will use observation data to query from different feature tables using Feature Query.

To create a training dataset using Feathr, one needs to provide a feature join configuration file to specify
what features and how these features should be joined to the observation data. 

To learn more on this topic, please refer to [Point-in-time Correctness](https://github.com/linkedin/feathr/blob/main/docs/concepts/point-in-time-join.md)

In [None]:
DATA_FORMAT = "parquet"
offline_features_path = f"dbfs:/feathr_output/{PROJECT_NAME}/features.{DATA_FORMAT}"

In [None]:

feature_names = [feature.name for feature in features + derived_features]
print("Features that will be extracted directly from the observation data:", feature_names)

agg_feature_names = [feature.name for feature in agg_features]
print("Features that will be extracted from the offline source data:", agg_feature_names)

In [None]:
# Features that we want to request. Can use a subset of features
query = FeatureQuery(
    feature_list=feature_names + agg_feature_names,
    key=agg_key,
)
settings = ObservationSettings(
    observation_path=DATA_FILEPATH,
    event_timestamp_column=TIMESTAMP_COL,
    timestamp_format=TIMESTAMP_FORMAT,
)
client.get_offline_features(
    observation_settings=settings,
    feature_query=query,
    # TODO - this doesn't work. It keeps storing as "avro" files
    execution_configurations=SparkExecutionConfiguration({
        "spark.feathr.inputFormat": DATA_FORMAT,
        "spark.feathr.outputFormat": DATA_FORMAT,
    }),
    output_path=offline_features_path,
)

client.wait_job_to_finish(timeout_sec=500)

## 4. Train and Evaluate a Prediction Model

After generating all the features, we train and evaluate a machine learning model to predict the NYC taxi fare prediction. In this example, we use Spark MLlib's [GBTRegressor](https://spark.apache.org/docs/latest/ml-classification-regression.html#gradient-boosted-tree-regression)

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import GBTRegressor

# TODO you may need to add {"spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.3.0"}


### Load Train and Test Data from the Offline Feature Values

In [None]:
df = spark.read.format(DATA_FORMAT).load(offline_features_path)
df.limit(5).toPandas()

In [None]:
# Train / test split
train_df, test_df = (
    df
    .withColumn("label", F.col("fare_amount").cast("double"))
    .where(F.col("f_trip_time_duration") > 0)
    .fillna(0)
    .randomSplit([0.8, 0.2])
)

### Build a ML Pipeline

In [None]:
# Generate a feature vector column for SparkML
vector_assembler = VectorAssembler(
    inputCols=[x for x in df.columns if x in feature_names + agg_feature_names],
    outputCol="features",
)

# Define a model
gbt = GBTRegressor(
    featuresCol="features",
    maxIter=100,
    maxDepth=5,
    maxBins=16,
)

# Create a ML pipeline
ml_pipeline = Pipeline(stages=[
    vector_assembler,
    gbt,
])

### Train and Evaluate the Model

In [None]:
# Train a model
model = ml_pipeline.fit(train_df)

# Make predictions
predictions = model.transform(test_df)

# Evaluate
evaluator = RegressionEvaluator(
    labelCol="label",
    predictionCol="prediction",
    metricName="rmse",
)
rmse = evaluator.evaluate(predictions)

print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

## 5. Materialize Feature Values to the Online Store

While we computed feature values on-the-fly at request time via Feathr, we can pre-compute the feature values and materialize them to offline or online storages such as Redis.

Note, only the features anchored to offline data source can be materialized, in our case `agg_features`.

In [None]:
FEATURE_TABLE_NAME = "nycTaxiDemoFeature"

# Time range to materialize -- TODO how to properly set this?
backfill_time = BackfillTime(
    start=datetime(2020, 5, 20),
    end=datetime(2020, 5, 20),
    step=timedelta(days=1),
)

# Destinations
redis_sink = RedisSink(table_name=FEATURE_TABLE_NAME)

settings = MaterializationSettings(
    name=FEATURE_TABLE_NAME + ".job",  # job name -- TODO if not important, automate this, e.g. redis_sink.table_name + ".job"
    backfill_time=backfill_time,
    sinks=[redis_sink],
    feature_names=agg_feature_names,
)

client.materialize_features(
    settings=settings,
    execution_configurations={"spark.feathr.outputFormat": "parquet"},
)

client.wait_job_to_finish(timeout_sec=500)

## 7. Fetching Feature Values for Online Inference

Unknown Issue: Local Spark Feature Gen Job is not working. Job will hang at `RedisOutputUtils.scala:37` when writing to Redis. Still investigating.

In [None]:
client.get_online_features(
    feature_table=FEATURE_TABLE_NAME,
    key="137",
    feature_names=agg_feature_names,
)

In [None]:
client.multi_get_online_features(
    feature_table=FEATURE_TABLE_NAME,
    keys=["137", "265"],
    feature_names=agg_feature_names,
)