# Online Feature Store example notebook

This notebook illustrates the use of Databricks Feature Store to publish features to Databricks Online Tables for real-time 
serving and automated feature lookup. The problem is to predict the wine quality using a ML model
with a variety of static wine features and a realtime input.

This notebook creates an endpoint to predict the quality of a bottle of wine, given an ID and the realtime feature alcohol by volume (ABV).

The notebook is structured as follows:
 
1. Prepare the feature table.
2. Set up Databricks Online Table.
    * This notebook uses Databricks Online Tables. For a list of supported functionality, see the Databricks documentation ([AWS](https://docs.databricks.com/machine-learning/feature-store/online-tables.html) | [Azure](https://learn.microsoft.com/azure/databricks/machine-learning/feature-store/online-tables)).  
3. Train and deploy the model.
4. Serve realtime queries with automatic feature lookup.
5. Clean up.

### Data Set

This example uses the [Wine Quality Data Set](https://archive.ics.uci.edu/ml/datasets/wine+quality).

### Requirements

* Serverless Compute ([AWS](https://docs.databricks.com/compute/serverless/index.html) | [Azure](https://learn.microsoft.com/azure/databricks/compute/serverless/index)) (_recommended_)
* Classic compute running Databricks Runtime 14.2 for Machine Learning or above.

<img src="https://docs.databricks.com/_static/images/machine-learning/feature-store/online-tables-nb-diagram.png"/>

In [0]:
%pip install databricks-sdk==0.41.0
%pip install databricks-feature-engineering==0.8.0
%pip install mlflow>=2.9.0
dbutils.library.restartPython()


## Prepare the feature table

Suppose you need to build an endpoint to predict wine quality with just the `wine_id`. This requires a feature table saved in Feature Store where the endpoint can look up features of the wine by the `wine_id`. For the purpose of this demo, we need to prepare this feature table ourselves first. The steps are:

1. Load and clean the raw data.
2. Separate features and labels.
3. Save features into a feature table.

### Load and clean the raw data 

The raw data contains 12 columns including 11 features and the `quality` column. The `quality` column is an integer that ranges from 3 to 8. The goal is to build a model that predicts the `quality` value.

In [0]:
raw_data_frame = spark.read.load("/databricks-datasets/wine-quality/winequality-red.csv",format="csv",sep=";",inferSchema="true",header="true" )
display(raw_data_frame.limit(10))

In [0]:
# Have a look at the size of the raw data.
raw_data_frame.toPandas().shape

There are some problems with the raw data:
1. The column names contain space (' '), which is not compatible with Feature Store. 
2. We need to add ID to the raw data so they can be looked up later by Feature Store.

The following cell addresses these issues.

In [0]:
from sklearn.preprocessing import MinMaxScaler
from pyspark.sql.functions import monotonically_increasing_id


def addIdColumn(dataframe, id_column_name):
    columns = dataframe.columns
    new_df = dataframe.withColumn(id_column_name, monotonically_increasing_id())
    return new_df[[id_column_name] + columns]

def renameColumns(df):
    renamed_df = df
    for column in df.columns:
        renamed_df = renamed_df.withColumnRenamed(column, column.replace(' ', '_'))
    return renamed_df


# Rename columns so that they are compatible with Feature Store
renamed_df = renameColumns(raw_data_frame)

# Add id column
id_and_data = addIdColumn(renamed_df, 'wine_id')

display(id_and_data)

Let's assume that the alcohol by volume (ABV) is a variable that changes over time after the wine is opened. The value will be provided as a real-time input for online inference. 

Now, split the data into two parts and store only the part with static features to Feature Store. 

In [0]:
# wine_id and static features
id_static_features = id_and_data.drop('alcohol', 'quality')

# wine_id, realtime feature (alcohol), label (quality)
id_rt_feature_labels = id_and_data.select('wine_id', 'alcohol', 'quality')

### Create a feature table

Save the feature data `id_static_features` into a feature table.

In [0]:
# You must have `CREATE CATALOG` privileges on the catalog.
# If necessary, change the catalog and schema name here.
username = spark.sql("SELECT current_user()").first()["current_user()"]
username = username.split(".")[0]
catalog_name = username

# Fetch the username to use as the schema name.
schema_name = "online_tables"

spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog_name}")
spark.sql(f"USE CATALOG {catalog_name}")
spark.sql(f"CREATE DATABASE IF NOT EXISTS {catalog_name}.{schema_name}")
spark.sql(f"USE SCHEMA {schema_name}")

In [0]:
from databricks.feature_engineering import FeatureEngineeringClient

wine_table = f"{catalog_name}.{schema_name}.wine_static_features"
online_table_name = f"{catalog_name}.{schema_name}.wine_static_features_online"
fe = FeatureEngineeringClient()
fe.create_table(
    name=wine_table,
    primary_keys=["wine_id"],
    df=id_static_features,
    description="id and features of all wine",
)

The feature data has now been stored into the feature table. The next step is to set up a Databricks Online Table.

## Set up Databricks Online Tables

You can create an online table from the Catalog Explorer UI, Databricks SDK or Rest API. The steps to use Databricks python SDK are described below. For more details, see the Databricks documentation ([AWS](https://docs.databricks.com/en/machine-learning/feature-store/online-tables.html#create)|[Azure](https://learn.microsoft.com/azure/databricks/machine-learning/feature-store/online-tables#create)). For information about required permissions, see Permissions ([AWS](https://docs.databricks.com/en/machine-learning/feature-store/online-tables.html#user-permissions)|[Azure](https://learn.microsoft.com/azure/databricks/machine-learning/feature-store/online-tables#user-permissions)).

In [0]:
from pprint import pprint
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.catalog import OnlineTable, OnlineTableSpec, OnlineTableSpecTriggeredSchedulingPolicy

workspace = WorkspaceClient()

# Create an online table
spec = OnlineTableSpec(
  primary_key_columns = ["wine_id"],
  source_table_full_name = wine_table,
  run_triggered=OnlineTableSpecTriggeredSchedulingPolicy.from_dict({'triggered': 'true'}),
  perform_full_copy=True)

online_table = OnlineTable(name=online_table_name, spec=spec)

try:
  workspace.online_tables.create_and_wait(table=online_table)
  
except Exception as e:
  if "already exists" in str(e):
    print(f"Online table {online_table_name} already exists. Not recreating.")  
  else:
    raise e

pprint(workspace.online_tables.get(online_table_name))

## Train and deploy the model

Now, you will train a classifier using features in the Feature Store. You only need to specify the primary key, and Feature Store will fetch the required features.

In [0]:
from sklearn.ensemble import RandomForestClassifier

import pandas as pd
import logging
import mlflow.sklearn

from databricks.feature_engineering import FeatureLookup

First, define a `TrainingSet`. The training set accepts a `feature_lookups` list, where each item represents some features from a feature table in the Feature Store. This example uses `wine_id` as the lookup key to fetch all the features from table `online_feature_store_example.wine_features`.

In [0]:
training_set = fe.create_training_set(
    df=id_rt_feature_labels,
    label='quality',
    feature_lookups=[
        FeatureLookup(
            table_name=f"{catalog_name}.{schema_name}.wine_static_features",
            lookup_key="wine_id"
        )
    ],
    exclude_columns=['wine_id'],
)

# Load the training data from Feature Store
training_df = training_set.load_df()

display(training_df)

The next cell trains a RandomForestClassifier model.

In [0]:
X_train = training_df.drop('quality').toPandas()
y_train = training_df.select('quality').toPandas()

# Train model
model = RandomForestClassifier()
model.fit(X_train, y_train.values.ravel())

Save the trained model using `log_model`. `log_model` also saves lineage information between the model and the features (through `training_set`). So, during serving, the model automatically knows where to fetch the features by just the lookup keys.

In [0]:
import mlflow

mlflow.set_registry_uri("databricks-uc")

registered_model_name = f"{catalog_name}.{schema_name}.wine_classifier"
fe.log_model(
    model=model,
    artifact_path="model",
    flavor=mlflow.sklearn,
    training_set=training_set,
    registered_model_name=registered_model_name
)

# Get the latest model version
client = mlflow.tracking.MlflowClient()
versions = client.search_model_versions(f"name='{registered_model_name}'")
registered_model_version = max(int(v.version) for v in versions)

## Serve real-time queries with automatic feature lookup

After calling `log_model`, a new version of the model is saved. 

In [0]:
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.serving import EndpointCoreConfigInput, ServedEntityInput

# Create endpoint
endpoint_name = f"{username}_wine_classifier_endpoint"

try:
  status = workspace.serving_endpoints.create_and_wait(
    name=endpoint_name,
    config = EndpointCoreConfigInput(
      served_entities=[
        ServedEntityInput(
            entity_name=registered_model_name,
            entity_version=registered_model_version,
            scale_to_zero_enabled=True,
            workload_size="Small"
        )
      ]
    )
  )
  print(status)
except Exception as e:
  if "already exists" in str(e):
    print(f"Not creating endpoint {endpoint_name} since it already exists.")
  else:
    raise e

## Send a query

Now, suppose you opened a bottle of wine and you have a sensor to measure the current ABV from the bottle. Using the model and automated feature lookup with realtime serving, you can predict the quality of the wine using the measured ABV value as the realtime input "alcohol".

In [0]:
import mlflow.deployments

client = mlflow.deployments.get_deploy_client("databricks")
response = client.predict(
    endpoint=endpoint_name,
    inputs={
        "dataframe_records": [
            {"wine_id": 25, "alcohol": 7.9},
            {"wine_id": 25, "alcohol": 11.0},
            {"wine_id": 25, "alcohol": 27.9},
        ]
    },
)

pprint(response)

### Notes on request format and API versions


Here is an example of the request format:
```
{"dataframe_split": {"index": [0, 1, 2], "columns": ["wine_id", "alcohol"], "data": [[25, 7.9], [25, 11.0], [25, 27.9]]}}
```

Learn more about Databricks Model Serving ([AWS](https://docs.databricks.com/en/machine-learning/model-serving/index.html)|[Azure](https://learn.microsoft.com/en-us/azure/databricks/machine-learning/model-serving/)).

## Clean up

To clean up the resources created by this notebook, follow these steps:

1. Delete the Databricks Online Table from Catalog Explorer.  
  a. In the left sidebar, click **Catalog**.   
  b. Navigate to the online table.  
  c. From the kebab menu, select **Delete**.  
2. Delete the Serving Endpoint from the **Serving** tab.  
  a. In the left sidebar, click **Serving**.  
  b. Click the name of the endpoint.  
  c. From the kebab menu, select **Delete**.

In [0]:
#workspace.serving_endpoints.delete(name=endpoint_name)

In [0]:
#workspace.online_tables.delete(name=online_table_name)