import libraries

In [0]:
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
#
from pyspark.sql.functions import *
#
import mlflow
import logging
#
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error
#
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
#
from databricks import feature_store

In [0]:
logging.getLogger("mlflow").setLevel(logging.FATAL)

In [0]:
diamonds_df = sns.load_dataset('diamonds').drop(columns=['cut', 'clarity', 'color'], axis=1)
diamonds_df.sample(3)

drop duplicates and split the data 

In [0]:
diamonds_sdf = spark.createDataFrame(diamonds_df).dropDuplicates()
#
# Spark Dataframes
test_sdf = diamonds_sdf.orderBy(rand()).limit(int(33*diamonds_sdf.count()/100))
train_sdf = diamonds_sdf.subtract(test_sdf)
#
# Pandas Dataframes
test_df = test_sdf.toPandas()
train_df = train_sdf.toPandas()
#
print(f"Number of rows test set: {test_sdf.count()}")
print(f"Number of rows train set: {train_sdf.count()}")
print(f"Sum of count rows of train and test set: {train_sdf.count() + test_sdf.count()}")
print(f"Total number of rows of initial dataframe: {diamonds_sdf.count()}")

Train using scikit learn library 

In [0]:
# Prepare features and target dataframes
X = train_df.drop('price', axis=1)
y = train_df['price']
#
# train model (is automatically logged to mlflow)
rf = RandomForestRegressor(n_estimators=100, max_depth=5)
rf.fit(X, y)
#
# get latest run_id programmaticaly
latest_run_id = mlflow.search_runs().sort_values(by="end_time", ascending=False).head(1)['run_id'][0]
#
# uri to latest run (by default, artifact_path is 'model')
uri_scikit_learn = f"runs:/{latest_run_id}/model"
#
# register latest logged model
mlflow.register_model(uri_scikit_learn, name="scikit-learn_model")
#
# load latest registered model
scikit_learn_model = mlflow.pyfunc.load_model(uri_scikit_learn)
#
# prediction of test set using loaded model
pd.DataFrame(scikit_learn_model.predict(test_df.drop('price', axis=1)), columns=['predictions']).head(5)

Train using MLLIb library 

In [0]:
# set vector assembler parameters
assembler_inputs = [c for c in train_sdf.columns if c not in ['price']]
vec_assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")
#
# instantiate model
mllib_rfr = LinearRegression(featuresCol="features", labelCol='price')
#
# define pipeline stages
stages = [vec_assembler, mllib_rfr]
#
# set pipeline
pipeline = Pipeline(stages=stages)
#
# fit pipeline to train set
model_mllib = pipeline.fit(train_sdf)
#
# get latest run_id programmaticaly
latest_run_id = mlflow.search_runs().sort_values(by="end_time", ascending=False).head(1)['run_id'][0]
#
# uri to latest run (by default, artifact_path is 'model')
uri_mllib = f"runs:/{latest_run_id}/model"
#
# register latest logged model
mlflow.register_model(uri_mllib, name="mllib_model")
#
# load latest registered model
mllib_model = mlflow.pyfunc.load_model(uri_mllib)
#
# Here predictions can be done using same input as for model trained using scikit learn library
pd.DataFrame(mllib_model.predict(test_df.drop('price', axis=1)), columns=['predictions']).head(5)

In [0]:
# load model into a spark udf
predict_scikit_learn = mlflow.pyfunc.spark_udf(spark, uri_scikit_learn)
#
# make predictions on the spark test dataframe
display(test_sdf.withColumn("prediction", predict_scikit_learn(*[c for c in test_sdf.columns if c not in ['price']])).select("price", "prediction").limit(5))

Z-ordering  

Z-Ordering: colocates related information in the same set of files

Z-Ordering is a form of multi-dimensional clustering that colocates related information in the same set of files. It reduces the amount of data that needs to be read. See more here.

Here after is an example of use of Z-ordering.

Let's first write a dataframe as a Delta table:

In [0]:
(train_sdf.write
          .format("delta")
          .mode("overwrite")
          .option("overwriteSchema", "true")
          .saveAsTable("train_set_diamonds"))

In [0]:
display(spark.sql("describe table extended train_set_diamonds"))

In [0]:
display(spark.sql("describe table extended train_set_diamonds").filter("col_name in ('Location')"))

Z-Ordering re-organizes the data to cluster on
certain columns. For queries frequently filtered by a
particular column (e.g., customer_id or date),
Z-Ordering reduces I/O by enabling efficient data
skipping.

In [0]:
delta_partitioned_path = "dbfs:/user/hive/warehouse/train_set_diamonds"
#
spark.sql(f"OPTIMIZE delta.`{delta_partitioned_path}` ZORDER BY (carat)");

Partitioning  

In [0]:
(spark.createDataFrame(sns.load_dataset('diamonds')).write
                                                    .format("delta")
                                                    .mode("overwrite")
                                                    .option("overwriteSchema", "true")
                                                    .saveAsTable("diamonds_df_not_partitioned"))

In [0]:
display(spark.sql("describe table extended diamonds_df_not_partitioned").filter("col_name in ('Location')"))

In [0]:
for file in dbutils.fs.ls("dbfs:/user/hive/warehouse/diamonds_df_not_partitioned"):
    print(file.path)

In [0]:
display(spark.table("diamonds_df_not_partitioned").groupBy("cut").count().orderBy(desc('count')))

In [0]:
(spark.table("diamonds_df_not_partitioned")
      .write.partitionBy("cut")
      .format("delta")
      .mode("overwrite")
      .option("overwriteSchema", "true")
      .saveAsTable("diamonds_df_partitioned"))

In [0]:
for file in dbutils.fs.ls("dbfs:/user/hive/warehouse/diamonds_df_partitioned"):
    print(file.path)

Score batch

In [0]:
import seaborn as sns

In [0]:
pd_diamonds = sns.load_dataset('diamonds').reset_index()
#
diamonds_full = spark.createDataFrame(pd_diamonds).withColumnRenamed('x', 'x_r')
#
display(diamonds_full.limit(5))

In [0]:
from pyspark.sql.functions import rand
y_test  = diamonds_full.select("price", "index").orderBy(rand()).limit(int(33*diamonds_full.count()/100))
y_train = diamonds_full.select("price", "index").subtract(y_test)
#
display(y_test.limit(5))
display(y_train.limit(5))

In [0]:
from pyspark.sql.functions import lit
import pandas as pd
new_diamond = (diamonds_full.limit(1).withColumn('index',   lit(88887777).cast('long'))
                                     .withColumn('carat',   lit(2).cast('double'))
                                     .withColumn('cut',     lit('Good').cast('string'))
                                     .withColumn('color',   lit('E').cast('string'))
                                     .withColumn('clarity', lit('VS1').cast('string'))
                                     .withColumn('depth',   lit(40).cast('double'))
                                     .withColumn('table',   lit(64).cast('double'))
                                     .withColumn('x_r',     lit(4.14).cast('double'))
                                     .withColumn('y',       lit(3.5).cast('double'))
                                     .withColumn('z',       lit(2.1).cast('double')))
#
new_diamond_with_price = spark.createDataFrame(pd.DataFrame({'index': [88887777], 'price': [4500]}))
#
new_diamond_without_price = spark.createDataFrame(pd.DataFrame({'index': [88887777]}))
#
diamond_unknown = spark.createDataFrame(pd.DataFrame({'index': [98989898]}))
#
display(new_diamond)
display(new_diamond_with_price)
display(new_diamond_without_price)

We verify that score_batch predicts either with/without the price of the new data, the only requirement is the primary key - in this particular case, column index - of the new diamond data in the Feature Store:

And verify that if a primary key is not found in the Feature Store, it results in an error: