In [0]:
from databricks import feature_store
from databricks.feature_store import feature_table,FeatureLookup


In [0]:
import pandas as pd
from pyspark.sql.functions import monotonically_increasing_id,expr, rand
import uuid 
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error, r2_score


In [0]:
raw_data = spark.read.load("/databricks-datasets/wine-quality/winequality-red.csv",format="csv",sep=";",inferSchema=True,header=True)


In [0]:
display(raw_data)

fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,total sulfur dioxide,density,pH,sulphates,alcohol,quality
7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,3.51,0.56,9.4,5
7.8,0.88,0.0,2.6,0.098,25.0,67.0,0.9968,3.2,0.68,9.8,5
7.8,0.76,0.04,2.3,0.092,15.0,54.0,0.997,3.26,0.65,9.8,5
11.2,0.28,0.56,1.9,0.075,17.0,60.0,0.998,3.16,0.58,9.8,6
7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,3.51,0.56,9.4,5
7.4,0.66,0.0,1.8,0.075,13.0,40.0,0.9978,3.51,0.56,9.4,5
7.9,0.6,0.06,1.6,0.069,15.0,59.0,0.9964,3.3,0.46,9.4,5
7.3,0.65,0.0,1.2,0.065,15.0,21.0,0.9946,3.39,0.47,10.0,7
7.8,0.58,0.02,2.0,0.073,9.0,18.0,0.9968,3.36,0.57,9.5,7
7.5,0.5,0.36,6.1,0.071,17.0,102.0,0.9978,3.35,0.8,10.5,5


In [0]:
print(raw_data.columns)

['fixed acidity', 'volatile acidity', 'citric acid', 'residual sugar', 'chlorides', 'free sulfur dioxide', 'total sulfur dioxide', 'density', 'pH', 'sulphates', 'alcohol', 'quality']


In [0]:
def addIdColumn(dataframe,id_column_name):
    """Add id column to dataframe"""
    columns = dataframe.columns
    new_df = dataframe.withColumn(id_column_name, monotonically_increasing_id())
    return new_df[[id_column_name]+columns]

def renameColumns(df):
    """Rename columns to be compatible with feature stores"""
    renamed_df = df
    for column in df.columns:
        renamed_df = renamed_df.withColumnRenamed(column, column.replace(" ", "_"))
    return renamed_df


In [0]:
renamed_df = renameColumns(raw_data)
df = addIdColumn(renamed_df, "wine_id")

In [0]:
display(df)

wine_id,fixed_acidity,volatile_acidity,citric_acid,residual_sugar,chlorides,free_sulfur_dioxide,total_sulfur_dioxide,density,pH,sulphates,alcohol,quality
0,7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,3.51,0.56,9.4,5
1,7.8,0.88,0.0,2.6,0.098,25.0,67.0,0.9968,3.2,0.68,9.8,5
2,7.8,0.76,0.04,2.3,0.092,15.0,54.0,0.997,3.26,0.65,9.8,5
3,11.2,0.28,0.56,1.9,0.075,17.0,60.0,0.998,3.16,0.58,9.8,6
4,7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,3.51,0.56,9.4,5
5,7.4,0.66,0.0,1.8,0.075,13.0,40.0,0.9978,3.51,0.56,9.4,5
6,7.9,0.6,0.06,1.6,0.069,15.0,59.0,0.9964,3.3,0.46,9.4,5
7,7.3,0.65,0.0,1.2,0.065,15.0,21.0,0.9946,3.39,0.47,10.0,7
8,7.8,0.58,0.02,2.0,0.073,9.0,18.0,0.9968,3.36,0.57,9.5,7
9,7.5,0.5,0.36,6.1,0.071,17.0,102.0,0.9978,3.35,0.8,10.5,5


In [0]:
features_df = df.drop("quality")
display(features_df)

wine_id,fixed_acidity,volatile_acidity,citric_acid,residual_sugar,chlorides,free_sulfur_dioxide,total_sulfur_dioxide,density,pH,sulphates,alcohol
0,7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,3.51,0.56,9.4
1,7.8,0.88,0.0,2.6,0.098,25.0,67.0,0.9968,3.2,0.68,9.8
2,7.8,0.76,0.04,2.3,0.092,15.0,54.0,0.997,3.26,0.65,9.8
3,11.2,0.28,0.56,1.9,0.075,17.0,60.0,0.998,3.16,0.58,9.8
4,7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,3.51,0.56,9.4
5,7.4,0.66,0.0,1.8,0.075,13.0,40.0,0.9978,3.51,0.56,9.4
6,7.9,0.6,0.06,1.6,0.069,15.0,59.0,0.9964,3.3,0.46,9.4
7,7.3,0.65,0.0,1.2,0.065,15.0,21.0,0.9946,3.39,0.47,10.0
8,7.8,0.58,0.02,2.0,0.073,9.0,18.0,0.9968,3.36,0.57,9.5
9,7.5,0.5,0.36,6.1,0.071,17.0,102.0,0.9978,3.35,0.8,10.5


In [0]:
spark.sql(f"CREATE DATABASE IF NOT EXISTS wine_db")

table_name = f"wine_db_" + str(uuid.uuid4())[:6]
print(table_name)

wine_db_3fb17e


In [0]:
fsc =feature_store.FeatureStoreClient()

In [0]:
fsc.create_table(
    name=table_name,
    primary_keys=["wine_id"],
    df = features_df,
    schema = features_df.schema,
    description = 'wine features'

)

2024/09/26 13:09:20 INFO databricks.feature_store._compute_client._compute_client: Setting columns ['wine_id'] of table 'databrickscourse_ws.default.wine_db_3fb17e' to NOT NULL.
2024/09/26 13:09:27 INFO databricks.feature_store._compute_client._compute_client: Setting Primary Keys constraint ['wine_id'] on table 'databrickscourse_ws.default.wine_db_3fb17e'.
2024/09/26 13:09:44 INFO databricks.feature_store._compute_client._compute_client: Created feature table 'databrickscourse_ws.default.wine_db_3fb17e'.
  yield prop, self.__getattribute__(prop)


<FeatureTable: keys=['wine_id'], tags={}>

In [0]:
inference_data_df = df.select("wine_id", "quality", (10*rand()).alias("real_time_measurement"))
display(inference_data_df)

wine_id,quality,real_time_measurement
0,5,4.548963131164349
1,5,9.722925207352986
2,5,9.997654162890557
3,6,3.681554829770235
4,5,5.324838478402365
5,5,6.350904610593941
6,5,1.7381432561172327
7,7,8.385127587683101
8,7,3.1434080804627484
9,5,7.396827155310547


In [0]:
def load_data(table_name,loop_key):
    model_feature_lookups = [FeatureLookup(table_name=table_name,lookup_key=loop_key)]  
    training_set = fsc.create_training_set(inference_data_df, model_feature_lookups,label='quality', exclude_columns="wine_id")
    training_pd = training_set.load_df().toPandas()

    X = training_pd.drop(["quality"], axis=1)
    y = training_pd["quality"]
    X_train, X_test, y_train, y_test = train_test_split(X,y, test_size=0.2, random_state=42)
    return X_train, X_test, y_train, y_test,training_set




In [0]:
X_train,X_test,y_train,y_test,training_set = load_data(table_name, "wine_id")
X_train.head()

Unnamed: 0,real_time_measurement,fixed_acidity,volatile_acidity,citric_acid,residual_sugar,chlorides,free_sulfur_dioxide,total_sulfur_dioxide,density,pH,sulphates,alcohol
493,3.207922,8.7,0.69,0.31,3.0,0.086,23.0,81.0,1.0002,3.48,0.74,11.6
354,7.841611,6.1,0.21,0.4,1.4,0.066,40.5,165.0,0.9912,3.25,0.59,11.9
342,5.219486,10.9,0.39,0.47,1.8,0.118,6.0,14.0,0.9982,3.3,0.75,9.8
834,6.596848,8.8,0.685,0.26,1.6,0.088,16.0,23.0,0.99694,3.32,0.47,9.4
705,1.569725,8.4,1.035,0.15,6.0,0.073,11.0,54.0,0.999,3.37,0.49,9.9


In [0]:
from mlflow.tracking.client import MlflowClient
client = MlflowClient()
try:
    client.delete_registered_model("wine_model")
except:
    None

In [0]:
import mlflow
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error, r2_score

mlflow.sklearn.autolog(log_models=False)

def train_model(X_train, y_train, X_test, y_test, training_set, fsc):
    with mlflow.start_run() as run:
        rf = RandomForestRegressor(
            n_estimators=20, 
            max_depth=5, 
            random_state=42
        )
        rf.fit(X_train, y_train)
        y_pred = rf.predict(X_test)

        mlflow.log_metric("test_mse", mean_squared_error(y_test, y_pred))
        mlflow.log_metric("test_r2", r2_score(y_test, y_pred))

        fsc.log_model(
            model=rf, 
            artifact_path="wine_quality_prediction",
            flavor=mlflow.sklearn,
            training_set=training_set,
            registered_model_name="wine_model"
        )

train_model(X_train, y_train, X_test, y_test, training_set, fsc)

Successfully registered model 'databrickscourse_ws.default.wine_model'.
2024/09/26 13:28:47 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation. Model name: wine_model, version 1
Created version '1' of model 'databrickscourse_ws.default.wine_model'.


In [0]:
batch_input_df = inference_data_df.drop("quality")
predictions_df = fsc.score_batch("models:/wine_model/1", batch_input_df)

display(predictions_df.select("wine_id", "prediction"))

2024/09/26 13:31:08 INFO mlflow.models.flavor_backend_registry: Selected backend for flavor 'python_function'


wine_id,prediction
0,5.15686633106182
1,5.296298602107789
2,5.386206289373755
3,5.429567014849836
4,5.144710018905508
5,5.161201819293447
6,5.111569821219467
7,5.633815372563396
8,5.234926597194323
9,5.388167478220707


In [0]:
so2_cols = ["free_sulfur_dioxide", "total_sulfur_dioxide"]
new_features_df = (features_df.withColumn("average so2", expr("+".join(so2_cols))/2))

In [0]:
display(new_features_df)

wine_id,fixed_acidity,volatile_acidity,citric_acid,residual_sugar,chlorides,free_sulfur_dioxide,total_sulfur_dioxide,density,pH,sulphates,alcohol,average so2
0,7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,3.51,0.56,9.4,22.5
1,7.8,0.88,0.0,2.6,0.098,25.0,67.0,0.9968,3.2,0.68,9.8,46.0
2,7.8,0.76,0.04,2.3,0.092,15.0,54.0,0.997,3.26,0.65,9.8,34.5
3,11.2,0.28,0.56,1.9,0.075,17.0,60.0,0.998,3.16,0.58,9.8,38.5
4,7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,3.51,0.56,9.4,22.5
5,7.4,0.66,0.0,1.8,0.075,13.0,40.0,0.9978,3.51,0.56,9.4,26.5
6,7.9,0.6,0.06,1.6,0.069,15.0,59.0,0.9964,3.3,0.46,9.4,37.0
7,7.3,0.65,0.0,1.2,0.065,15.0,21.0,0.9946,3.39,0.47,10.0,18.0
8,7.8,0.58,0.02,2.0,0.073,9.0,18.0,0.9968,3.36,0.57,9.5,13.5
9,7.5,0.5,0.36,6.1,0.071,17.0,102.0,0.9978,3.35,0.8,10.5,59.5


In [0]:
# Rename columns to remove invalid characters
new_features_df = new_features_df.toDF(
    *[col.replace(' ', '_').replace(';', '_').replace('{', '_')
      .replace('}', '_').replace('(', '_').replace(')', '_')
      .replace('\n', '_').replace('\t', '_').replace('=', '_')
      for col in new_features_df.columns]
)

# Write the DataFrame to the Delta table
fsc.write_table(
    name=table_name,
    df=new_features_df,
    mode="merge"
)

In [0]:
train_model(X_train, y_train, X_test, y_test, training_set, fsc)

Registered model 'wine_model' already exists. Creating a new version of this model...
2024/09/26 13:37:46 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation. Model name: wine_model, version 2
Created version '2' of model 'databrickscourse_ws.default.wine_model'.


In [0]:
batch_input_df = inference_data_df.drop("quality")
predictions_df = fsc.score_batch("models:/wine_model/2", batch_input_df)

display(predictions_df.select("wine_id", "prediction"))

2024/09/26 13:39:27 INFO mlflow.models.flavor_backend_registry: Selected backend for flavor 'python_function'


wine_id,prediction
0,5.15686633106182
1,5.296298602107789
2,5.386206289373755
3,5.429567014849836
4,5.144710018905508
5,5.161201819293447
6,5.111569821219467
7,5.633815372563396
8,5.234926597194323
9,5.388167478220707
