In [0]:
!pip install mlflow=='3.6.0'
dbutils.library.restartPython()

In [0]:
from pyspark.sql import functions as F
from pyspark.ml import PipelineModel
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from mlflow.models.signature import infer_signature
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
import mlflow.pyfunc
import mlflow.spark
import numpy as np

import mlflow
from mlflow import MlflowClient
import os
from mlflow.models import infer_signature
import time

In [0]:
mlflow.__version__

In [0]:
os.environ["DISABLE_MLFLOWDBFS"] = "True"
mlflow.set_registry_uri("databricks-uc")

In [0]:
CATALOG_NAME = 'pedroz_catalog'
SCHEMA_NAME = 'mlops_schema'
FEATURE_TABLE_NAME = 'linear_regression_data'
MODEL_NAME = 'mllib_linear_regression'
TEMP_DIR_VOLUME_NAME = 'temp_dir'

In [0]:
# Create synthetic dataset

# Define coefficients for linear relationship
coefficients = [2.5, -1.8, 3.2, 0.5, -4.1]
intercept = 10.0

# Generate 50M rows with 5 features
df = spark.range(0, 50_000_000).select(
    F.rand(seed=42).alias("feature1"),
    F.randn(seed=42).alias("feature2"),
    F.rand(seed=24).alias("feature3"),
    F.randn(seed=24).alias("feature4"),
    F.rand(seed=123).alias("feature5")
).withColumn(
    "target",
    coefficients[0]*F.col("feature1") + 
    coefficients[1]*F.col("feature2") +
    coefficients[2]*F.col("feature3") +
    coefficients[3]*F.col("feature4") +
    coefficients[4]*F.col("feature5") +
    intercept + F.randn(seed=42)*0.1
)

# Save to Unity Catalog
df.write.format("delta").mode("overwrite").saveAsTable(f"{CATALOG_NAME}.{SCHEMA_NAME}.{FEATURE_TABLE_NAME}")


In [0]:
# Split train and test datasets

train_df, test_df = spark.table(f"{CATALOG_NAME}.{SCHEMA_NAME}.{FEATURE_TABLE_NAME}").randomSplit([0.8, 0.2])

In [0]:
experiment_name = f"/Users/{dbutils.notebook.entry_point.getDbutils().notebook().getContext().userName().get()}/{MODEL_NAME}_{CATALOG_NAME}"

# Create an MLFlow experiment
mlflow.set_experiment(experiment_name)

In [0]:
with mlflow.start_run():

  # Train model
  vec_assembler = VectorAssembler(
      inputCols=["feature1", "feature2", "feature3", "feature4", "feature5"],
      outputCol="features"
  )
  lr = LinearRegression(featuresCol="features", labelCol="target")
  pipeline = Pipeline(stages=[vec_assembler, lr])
  model = pipeline.fit(train_df)

  # Get model signature
  X_example = train_df.limit(10).toPandas()[["feature1", "feature2", "feature3", "feature4", "feature5"]]
  y_example = model.transform(train_df.limit(10)).toPandas()[["prediction"]]
  signature = infer_signature(X_example, y_example)

  #log model
  mlflow.spark.log_model(
      spark_model=model,
      artifact_path="model",
      registered_model_name=f"{CATALOG_NAME}.{SCHEMA_NAME}.{SCHEMA_NAME}",
      signature=signature,
      dfs_tmpdir=f"/Volumes/{CATALOG_NAME}/{SCHEMA_NAME}/{TEMP_DIR_VOLUME_NAME}"
  )

In [0]:
# Inference comparison

In [0]:
# Load input data
features_df = spark.table("pedroz_catalog.mlops_schema.linear_regression_data")
input_features = features_df.select("feature1", "feature2", "feature3", "feature4", "feature5")
input_features.limit(5).display()

In [0]:
# Get the model_uri for the latest model
client = MlflowClient()
model_name = f"{CATALOG_NAME}.{SCHEMA_NAME}.{MODEL_NAME}"

# Get latest version (you can also replace this by getting a model by its alias, e.g. "champion" model)
latest_version = max(
    [int(mv.version) for mv in client.search_model_versions(f"name='{model_name}'")]
)
print(f"The latest registered version of model {model_name} is: {latest_version}")

# Define the model_uri
model_uri = f'models:/{model_name}/{latest_version}'

In [0]:
# Approach 1: load model and transform (recommended for Spark MLlib, since transform already runs distributed inferences)

In [0]:
start_time = time.time()

# Load registered model
loaded_model = mlflow.spark.load_model(model_uri, dfs_tmpdir=f'/Volumes/{CATALOG_NAME}/{SCHEMA_NAME}/{TEMP_DIR_VOLUME_NAME}')

# Use .transform() to run predictions
loaded_model.transform(input_features).write.mode("overwrite").saveAsTable(f"{CATALOG_NAME}.{SCHEMA_NAME}.output_table_transform")

end_time = time.time()

print('Inference took %.2f seconds' % (end_time - start_time))

In [0]:
# Approach 2: PyFunc UDF (can be advantageous over regular load_model + predict for other frameworks, since inferences are distributed across all worked nodes)

In [0]:
start_time = time.time()

# Create UDF
loaded_model_udf = mlflow.pyfunc.spark_udf(
    spark,
    model_uri
)

input_features.withColumn("prediction", loaded_model_udf()).write.mode("overwrite").saveAsTable(f"{CATALOG_NAME}.{SCHEMA_NAME}.output_table_udf")

end_time = time.time()

print('Running the predictions with a UDF took %.2f seconds' % (end_time - start_time))

In [0]:
# For Spark MLlib, in particular, running inferences using the PyFunc UDF approach is not advantageous, because Spark MLlib natively runs the inferences in a distributed manner, and when you run the PyFunc UDF approach, you first need to convert the Spark binaries into a PyFunc to then distribute it. 