d-sandbox

<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://databricks.com/wp-content/uploads/2018/03/db-academy-rgb-1200px.png" alt="Databricks Learning" style="width: 600px">
</div>

-sandbox
# Capstone Project: 3 Production Deployments

The goal of this project is to deploy a trained machine learning model into production using all three deployment paradigms: batch, streaming, and REST.  An optional exercise entails creating a monitoring and alerting infrastructure.

## ![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) In this capstone you:<br>
 - Apply a model trained in `sklearn` across a Spark DataFrame
 - Perform predictions on an incoming stream of data
 - Deploy a rest endpoint
 - _Optional:_ Create a monitoring and alerting solution
 
<img alt="Caution" title="Caution" style="vertical-align: text-bottom; position: relative; height:1.3em; top:0.0em" src="https://files.training.databricks.com/static/images/icon-warning.svg"/> **Please be sure to delete any infrastructure you build after the course so you don't incur unexpected expenses.**

In [0]:
%run "./Includes/Classroom-Setup"

Define a name for the stream to be used later in this lesson.

In [0]:
myStreamName = "capstone_pi"

Import the AirBnB dataset.  Create the following objects:<br><br>

* `pandasDF`: a Pandas DataFrame of all the data
* `pandasX`: a Pandas DataFrame of the `X` values
* `pandasy`: a Pandas DataFrame of the `y` values
* `sparkDF`: a Spark DataFrame of all the data

In [0]:
import pandas as pd
from sklearn.model_selection import train_test_split

# pandasDF = pd.read_csv("/dbfs/mnt/training/airbnb/sf-listings/airbnb-cleaned-mlflow.csv")
pandasDF = pd.read_csv("https://raw.githubusercontent.com/bernhard-42/mlflow-experiments/master/data/airbnb-cleaned-mlflow.csv")
pandasX = pandasDF.drop(["price"], axis=1)
pandasy = pandasDF["price"]

sparkDF = spark.createDataFrame(pandasDF)
display(sparkDF)



host_total_listings_count,neighbourhood_cleansed,zipcode,latitude,longitude,property_type,room_type,accommodates,bathrooms,bedrooms,beds,bed_type,minimum_nights,number_of_reviews,review_scores_rating,review_scores_accuracy,review_scores_cleanliness,review_scores_checkin,review_scores_communication,review_scores_location,review_scores_value,price
1.0,0,0,37.769310377340766,-122.43385634489,0,0,3.0,1.0,1.0,2.0,0,1.0,127.0,97.0,10.0,10.0,10.0,10.0,10.0,10.0,170.0
2.0,1,1,37.745112331410034,-122.42101788836888,0,0,5.0,1.0,2.0,3.0,0,30.0,112.0,98.0,10.0,10.0,10.0,10.0,10.0,9.0,235.0
10.0,2,0,37.766689597862175,-122.45250461761628,0,1,2.0,4.0,1.0,1.0,0,32.0,17.0,85.0,8.0,8.0,9.0,9.0,9.0,8.0,65.0
4.0,3,2,37.73074592978503,-122.44840862635226,1,1,1.0,2.0,1.0,1.0,0,3.0,76.0,95.0,9.0,9.0,10.0,10.0,9.0,9.0,60.0
10.0,2,0,37.76487219421756,-122.45182799146508,1,1,2.0,4.0,1.0,1.0,0,32.0,7.0,91.0,9.0,9.0,9.0,9.0,9.0,9.0,65.0
2.0,0,0,37.77524858589268,-122.43637374831292,1,0,5.0,1.5,2.0,2.0,0,5.0,26.0,97.0,10.0,10.0,10.0,10.0,10.0,10.0,575.0
1.0,0,3,37.78470745496073,-122.44555431261594,0,0,7.0,1.0,2.0,1.0,0,2.0,27.0,88.0,9.0,7.0,10.0,10.0,9.0,9.0,255.0
2.0,4,1,37.75918889708064,-122.42236687240562,0,1,3.0,1.0,1.0,2.0,0,1.0,559.0,98.0,10.0,10.0,10.0,10.0,10.0,9.0,139.0
1.0,4,1,37.75174004606522,-122.4094205953428,0,0,4.0,2.5,3.0,3.0,0,3.0,24.0,95.0,9.0,9.0,10.0,10.0,9.0,9.0,285.0
1.0,5,4,37.76258885144137,-122.40543055237004,1,1,2.0,1.0,1.0,1.0,0,1.0,386.0,93.0,9.0,9.0,10.0,10.0,9.0,9.0,135.0


-sandbox
Train an AdaBoost regressor.  AdaBoost is meta-estimator that works by fitting one regressor to a dataset and then fits many additional copies of that same regressor to the dataset but with different weights for different errors.

<img alt="Side Note" title="Side Note" style="vertical-align: text-bottom; position: relative; height:1.75em; top:0.05em; transform:rotate(15deg)" src="https://files.training.databricks.com/static/images/icon-note.webp"/> Knowing how this algorithm works is not necessary for this capstone.  To deploy a model, we just need to know its inputs and outputs.  To read more about AdaBoost, <a href="https://scikit-learn.org/stable/modules/generated/sklearn.ensemble.AdaBoostRegressor.html#sklearn.ensemble.AdaBoostRegressor" target="_blank">see the `sklearn` documentation.</a>

In [0]:
from sklearn.ensemble import AdaBoostRegressor, RandomForestRegressor
from sklearn.metrics import mean_squared_error

seed = 42

rf = RandomForestRegressor(n_estimators=100, max_depth=4, random_state=seed)
ada = AdaBoostRegressor(base_estimator=rf, n_estimators=400)
ada.fit(pandasX, pandasy)

predictions = pandasX.copy()
predictions["prediction"] = ada.predict(pandasX)

mse = mean_squared_error(pandasy, predictions["prediction"]) # This is on the same data the model was trained

In [0]:
print(mse)

### Apply a model trained in `sklearn` across a Spark DataFrame

Perform the following steps to apply the AdaBoost model to a Spark DataFrame:<br><br>

1. Log the model `ada` and evaluation metric `mse` using `mlflow`
2. Create a Spark UDF from this logged model
3. Apply the UDF to `sparkDF` (you may have to drop columns)

In [0]:
# ANSWER
import mlflow.sklearn

with mlflow.start_run(run_name="Final RF Model") as run: 
  mlflow.sklearn.log_model(ada, "adaboost-model")
  mlflow.log_metric("Train data MSE", mse)
  
  run_info = run.info



In [0]:
# ANSWER
predict = mlflow.pyfunc.spark_udf(spark, run_info.artifact_uri + "/adaboost-model")

2023/07/26 12:21:41 INFO mlflow.models.flavor_backend_registry: Selected backend for flavor 'python_function'


In [0]:
# ANSWER
predictionDF = (sparkDF
  .withColumn("prediction", predict(*sparkDF.drop("price").columns))
)

display(predictionDF)

host_total_listings_count,neighbourhood_cleansed,zipcode,latitude,longitude,property_type,room_type,accommodates,bathrooms,bedrooms,beds,bed_type,minimum_nights,number_of_reviews,review_scores_rating,review_scores_accuracy,review_scores_cleanliness,review_scores_checkin,review_scores_communication,review_scores_location,review_scores_value,price,prediction
1.0,0,0,37.769310377340766,-122.43385634489,0,0,3.0,1.0,1.0,2.0,0,1.0,127.0,97.0,10.0,10.0,10.0,10.0,10.0,10.0,170.0,185.3146423334348
2.0,1,1,37.745112331410034,-122.42101788836888,0,0,5.0,1.0,2.0,3.0,0,30.0,112.0,98.0,10.0,10.0,10.0,10.0,10.0,9.0,235.0,259.93110583884925
10.0,2,0,37.766689597862175,-122.45250461761628,0,1,2.0,4.0,1.0,1.0,0,32.0,17.0,85.0,8.0,8.0,9.0,9.0,9.0,8.0,65.0,119.74435827287392
4.0,3,2,37.73074592978503,-122.44840862635226,1,1,1.0,2.0,1.0,1.0,0,3.0,76.0,95.0,9.0,9.0,10.0,10.0,9.0,9.0,60.0,117.90281418587124
10.0,2,0,37.76487219421756,-122.45182799146508,1,1,2.0,4.0,1.0,1.0,0,32.0,7.0,91.0,9.0,9.0,9.0,9.0,9.0,9.0,65.0,125.75259032365923
2.0,0,0,37.77524858589268,-122.43637374831292,1,0,5.0,1.5,2.0,2.0,0,5.0,26.0,97.0,10.0,10.0,10.0,10.0,10.0,10.0,575.0,389.96882590902607
1.0,0,3,37.78470745496073,-122.44555431261594,0,0,7.0,1.0,2.0,1.0,0,2.0,27.0,88.0,9.0,7.0,10.0,10.0,9.0,9.0,255.0,316.455310339529
2.0,4,1,37.75918889708064,-122.42236687240562,0,1,3.0,1.0,1.0,2.0,0,1.0,559.0,98.0,10.0,10.0,10.0,10.0,10.0,9.0,139.0,126.67749673566634
1.0,4,1,37.75174004606522,-122.4094205953428,0,0,4.0,2.5,3.0,3.0,0,3.0,24.0,95.0,9.0,9.0,10.0,10.0,9.0,9.0,285.0,412.7334329619083
1.0,5,4,37.76258885144137,-122.40543055237004,1,1,2.0,1.0,1.0,1.0,0,1.0,386.0,93.0,9.0,9.0,10.0,10.0,9.0,9.0,135.0,125.64278640209062


### Perform predictions on an incoming stream of data

Perform the following steps to apply the AdaBoost model to a stream of incoming data:<br><br>

1. Run the logic defined for you to create the schema and stream
2. Apply the UDF to `sparkDF` defined above to the stream
3. Write the results to a delta table

In [0]:
from pyspark.sql.types import DoubleType, IntegerType, StructType

schema = (StructType()
  .add("host_total_listings_count", DoubleType())
  .add("neighbourhood_cleansed", IntegerType())
  .add("zipcode", IntegerType())
  .add("latitude", DoubleType())
  .add("longitude", DoubleType())
  .add("property_type", IntegerType())
  .add("room_type", IntegerType())
  .add("accommodates", DoubleType())
  .add("bathrooms", DoubleType())
  .add("bedrooms", DoubleType())
  .add("beds", DoubleType())
  .add("bed_type", IntegerType())
  .add("minimum_nights", DoubleType())
  .add("number_of_reviews", DoubleType())
  .add("review_scores_rating", DoubleType())
  .add("review_scores_accuracy", DoubleType())
  .add("review_scores_cleanliness", DoubleType())
  .add("review_scores_checkin", DoubleType())
  .add("review_scores_communication", DoubleType())
  .add("review_scores_location", DoubleType())
  .add("review_scores_value", DoubleType())
  .add("price", DoubleType())
)

spark.conf.set("spark.sql.shuffle.partitions", "8")
streamingData = (spark
  .readStream
  .schema(schema)
  .option("maxFilesPerTrigger", 1)
  .parquet("/mnt/training/airbnb/sf-listings/airbnb-cleaned-mlflow.parquet/")
)

In [0]:
# ANSWER
predictionsDF = streamingData.withColumn("prediction", predict(*streamingData.drop("price").columns))

# Preview our stream - you must stop the stream before resuming
display(predictionsDF, streamName = myStreamName)

[0;31m---------------------------------------------------------------------------[0m
[0;31mIllegalArgumentException[0m                  Traceback (most recent call last)
File [0;32m<command-3761410829409792>:5[0m
[1;32m      2[0m predictionsDF [38;5;241m=[39m streamingData[38;5;241m.[39mwithColumn([38;5;124m"[39m[38;5;124mprediction[39m[38;5;124m"[39m, predict([38;5;241m*[39mstreamingData[38;5;241m.[39mdrop([38;5;124m"[39m[38;5;124mprice[39m[38;5;124m"[39m)[38;5;241m.[39mcolumns))
[1;32m      4[0m [38;5;66;03m# Preview our stream - you must stop the stream before resuming[39;00m
[0;32m----> 5[0m display(predictionsDF, streamName [38;5;241m=[39m myStreamName)

File [0;32m/databricks/python_shell/dbruntime/display.py:76[0m, in [0;36mDisplay.display[0;34m(self, input, *args, **kwargs)[0m
[1;32m     74[0m [38;5;28;01melif[39;00m [38;5;28misinstance[39m([38;5;28minput[39m, DataFrame):
[1;32m     75[0m     [38;5;28;01mif[39;00m [38;5;28

In [0]:
# Wait until the stream is ready for processing
untilStreamIsReady(myStreamName)

The stream is active and ready.


In [0]:
# Stop the stream once you are done previewing it
# and before moving on to the next step...
stopAllStreams()

Stopping capstone_pi


In [0]:
# Delete any files that might exist from previous runs
dbutils.fs.rm(workingDir + "ml-deployment-capstone", True)

# Write the results to a delta table location identified by:
writePath = workingDir + "/ml-deployment-capstone/predictions"

# Make sure to specify the following checkpoint-location as identified by:
checkpointLocation = workingDir + "/ml-deployment-capstone/stream.checkpoint"

In [0]:
workingDir

Out[17]: 'dbfs:/user/laurent.raffalli@laposte.net/ml_deployment/09_capstone_project'

In [0]:
# ANSWER
(predictionsDF
  .writeStream                                      # Write the stream
  .queryName(myStreamName)                          # Name the stream
  .format("delta")                                  # Use the delta format
  .partitionBy("zipcode")                           # Specify a feature to partition on
  .option("checkpointLocation", checkpointLocation) # Specify where to log metadata
  .option("path", writePath)                        # Specify the output path
  .outputMode("append")                             # Append new records to the output path
  .start()                                          # Start the operation
)

Out[18]: <pyspark.sql.streaming.query.StreamingQuery at 0x7f2e4c6f4760>

In [0]:
# Wait until the stream is ready for processing
untilStreamIsReady(myStreamName)

Check to see if your files are there

In [0]:
from pyspark.sql.utils import AnalysisException

try:
  print(spark.read.format("delta").load(writePath).count())
except AnalysisException:
  print("Files not found.  This could be because the stream hasn't initialized.  Try again in a moment.")

Files not found.  This could be because the stream hasn't initialized.  Try again in a moment.


In [0]:
display(spark.read.format("delta").load(writePath))

host_total_listings_count,neighbourhood_cleansed,zipcode,latitude,longitude,property_type,room_type,accommodates,bathrooms,bedrooms,beds,bed_type,minimum_nights,number_of_reviews,review_scores_rating,review_scores_accuracy,review_scores_cleanliness,review_scores_checkin,review_scores_communication,review_scores_location,review_scores_value,price,prediction
3.0,8,1,37.747362709305264,-122.42489529846124,6,1,2.0,1.5,1.0,1.0,0,1.0,24.0,100.0,10.0,10.0,10.0,10.0,10.0,10.0,125.0,124.57084245655116
1.0,1,1,37.73330121656639,-122.41979720416168,1,0,6.0,2.0,3.0,3.0,0,3.0,3.0,100.0,10.0,10.0,10.0,10.0,9.0,10.0,357.0,399.18438907341806
1.0,4,1,37.75013920397615,-122.40964224773604,1,1,2.0,1.0,1.0,1.0,0,5.0,5.0,100.0,10.0,10.0,10.0,10.0,9.0,10.0,59.0,122.886043988619
2.0,1,1,37.7453003422857,-122.42133197701132,6,0,4.0,1.0,2.0,2.0,0,3.0,2.0,100.0,10.0,10.0,10.0,10.0,10.0,9.0,250.0,321.42086693936955
1.0,4,1,37.7546554335168,-122.41197151919526,1,0,8.0,1.0,4.0,4.0,0,3.0,28.0,99.0,10.0,10.0,10.0,10.0,10.0,10.0,445.0,393.6415648157287
1.0,4,1,37.75488403376368,-122.40769956788748,13,0,2.0,1.0,0.0,1.0,0,3.0,80.0,95.0,10.0,10.0,10.0,10.0,10.0,9.0,125.0,185.36494409302435
4.0,1,1,37.74181683002036,-122.40895657437218,1,1,2.0,2.0,1.0,2.0,3,30.0,4.0,100.0,10.0,10.0,10.0,10.0,10.0,10.0,120.0,123.92075432219472
1.0,4,1,37.76221601240075,-122.42332215339168,0,0,4.0,1.0,1.0,3.0,0,2.0,84.0,97.0,10.0,10.0,10.0,10.0,10.0,10.0,240.0,243.1750587200811
1.0,4,1,37.7614820537443,-122.40925392305104,0,0,2.0,1.0,1.0,1.0,0,1.0,39.0,98.0,10.0,10.0,10.0,10.0,10.0,9.0,200.0,185.3146423334348
3.0,4,1,37.76171886662949,-122.42348307335553,0,1,2.0,1.0,1.0,1.0,0,1.0,31.0,99.0,10.0,10.0,10.0,10.0,10.0,10.0,155.0,126.67749673566634


Stop the streams.

In [0]:
[q.stop() for q in spark.streams.active]

Out[24]: [None]

-sandbox
<img alt="Caution" title="Caution" style="vertical-align: text-bottom; position: relative; height:1.3em; top:0.0em" src="https://files.training.databricks.com/static/images/icon-warning.svg"/> **Please be sure to delete any infrastructure you build after the course so you don't incur unexpected expenses.**

<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> All done!</h2>

Thank you for your participation!

-sandbox
&copy; 2021 Databricks, Inc. All rights reserved.<br/>
Apache, Apache Spark, Spark and the Spark logo are trademarks of the <a href="http://www.apache.org/">Apache Software Foundation</a>.<br/>
<br/>
<a href="https://databricks.com/privacy-policy">Privacy Policy</a> | <a href="https://databricks.com/terms-of-use">Terms of Use</a> | <a href="http://help.databricks.com/">Support</a>