In [0]:
# install requried packages


%pip install databricks-feature-engineering optuna
import pandas as pd
from databricks.feature_engineering import FeatureEngineeringClient, FeatureLookup,FeatureFunction
import mlflow
import mlflow.sklearn
import optuna
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score
from pyspark.sql.functions import monotonically_increasing_id

In [0]:

#load intergarted data
features_df = spark.table("jpmc_group_catalog.mlops.wine_features_engineered_dabs") 

# Load or jpmc_group_catalog.mlops.winequality_dataiginal table with target
labels_df = spark.table("jpmc_group_catalog.mlops.wine_data_poc_dabs").select("quality", * [col for col in spark.table("jpmc_group_catalog.mlops.wine_data_poc_dabs").columns if col != "quality"])

# Check if wine_id exists, if not, add it

if "wine_id" not in features_df.columns:
    features_df = features_df.withColumn("wine_id", monotonically_increasing_id())
    features_df.write.format("delta").mode("overwrite").option("mergeschema", "true").saveAsTable("jpmc_group_catalog.mlops.wine_features_engineered_dabs")

if "wine_id" not in labels_df.columns:
    labels_df = labels_df.withColumn("wine_id", monotonically_increasing_id())
    labels_df.write.format("delta").mode("overwrite").option("mergeschema", "true").saveAsTable("jpmc_group_catalog.mlops.wine_data_poc_dabs")

In [0]:


# Load engineered features from the feature table
features_df = spark.table("jpmc_group_catalog.mlops.wine_features_engineered_dabs")




In [0]:
# Load labels (wine quality) with wine_id for joining
labels_df = spark.table("jpmc_group_catalog.mlops.wine_data_poc_dabs").select("wine_id", "quality")

In [0]:
# Load labels with additional acidity columns for on-demand feature computation 
labels_feature_df = spark.table("jpmc_group_catalog.mlops.wine_data_poc_dabs").select ("wine_id", "quality", "citric_acid", "fixed_acidity", "volatile_acidity")


In [0]:
# Initialize Feature Engineering Client
fe = FeatureEngineeringClient()

# Define feature lookups: pre-computed features and on-demand UDF computation 
model_feature_lookups = [

         # Lookup pre-engineered features from the feature table
         FeatureLookup(

            table_name="jpmc_group_catalog.mlops.wine_features_engineered_dabs",
            lookup_key="wine_id"
         ),

# Compute average acidity on-demand using registered UDF
FeatureFunction(
           udf_name = "jpmc_group_catalog.mlops.avg_acidity_udf_fn_dabs", 
           input_bindings = {"fixed_acidity":"fixed_acidity",
           "volatile_acidity":"volatile_acidity", "citric_acid":"citric_acid"},
 
           output_name = "acidity_avg_output"
),]


In [0]:
# Create training set by joining labels with features

training_set = fe.create_training_set(
   df=labels_feature_df,
   feature_lookups=model_feature_lookups,
   label="quality",
   exclude_columns="wine_id"
   )

# Load the training DataFrame
training_df = training_set.load_df() 
training_df.display()

In [0]:
print("Training columns:", training_df.columns)
training_df.display(5)

In [0]:
training_pd = training_df.toPandas() 
print(training_pd.head())

In [0]:

# Drop wine_id and quality from features

X = training_pd.drop(["quality"], axis=1)
y = training_pd["quality"]
# Train/test split
x_train, x_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [0]:

def objective(trial):
   n_estimators = trial.suggest_int("n_estimators", 50, 300) 
   max_depth = trial.suggest_int("max_depth", 3, 20)
   min_samples_split = trial.suggest_int("min_samples_split", 2, 10) 
   min_samples_leaf = trial.suggest_int("min_samples_leaf", 1, 10)


   rf = RandomForestRegressor(
   n_estimators=n_estimators,
   max_depth=max_depth,
   min_samples_split=min_samples_split,
   min_samples_leaf=min_samples_leaf,
   random_state=42
   )    

   rf.fit(x_train, y_train)
   y_pred = rf.predict(x_test)
   mse = mean_squared_error(y_test, y_pred)
   
   return mse

study = optuna.create_study (direction="minimize") 
study.optimize(objective, n_trials=30) 
best_params = study.best_params
print("Best hyperparameters: ", best_params)




In [0]:

rf_best = RandomForestRegressor(
       n_estimators=best_params["n_estimators"],
       max_depth=best_params["max_depth"],
       min_samples_split=best_params["min_samples_split"], 
       min_samples_leaf=best_params["min_samples_leaf"],
       random_state=42
)

rf_best.fit(x_train, y_train) 
y_pred = rf_best.predict(x_test)
mse = mean_squared_error(y_test, y_pred) 
r2 = r2_score(y_test, y_pred)
print(f"Test MSE: {mse:.4f}, Test R2: {r2:.4f}")

In [0]:
mlflow.set_registry_uri("databricks-uc")
model_name = "jpmc_group_catalog.mlops.wine_rf_model_dabs"


mlflow.sklearn.autolog (log_models=False)
fe = FeatureEngineeringClient()

with mlflow.start_run() as run:
   mlflow.log_params (best_params)
   mlflow.log_metric("test_mse", mse) 
   mlflow.log_metric ("test_r2_score", r2)
   fe.log_model(
       model=rf_best,
       artifact_path="wine_rf_model",
       flavor=mlflow.sklearn,
       training_set=training_set,
       registered_model_name=model_name,
    )   

In [0]:
import joblib
import os

volume_path = "/108173_ctg_dev/m14hr/Volumes/wine_rf_model_artifacts_dabs" 
os.makedirs(volume_path, exist_ok=True)
joblib.dump(rf_best, f"{volume_path}/rf_best_model.joblib") 
print(f"Model saved to (volume_path}/rf_best_model.joblib")