d-sandbox

<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://databricks.com/wp-content/uploads/2018/03/db-academy-rgb-1200px.png" alt="Databricks Learning" style="width: 600px; height: 163px">
</div>

# Lab: Post-Processing on a Data Stream

## ![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) In this lab you:<br>
 - Create a data stream and train a random forest model
 - Defining post-processing logic 
 - Apply logic to a data stream
 - Write a DataFrame to a scalable Delta format

In [3]:
%run "./../Includes/Classroom-Setup"

## Creating Data Stream and Training Model

Import the same Airbnb dataset.

In [5]:
airbnbDF = spark.read.parquet("/mnt/training/airbnb/sf-listings/airbnb-cleaned-mlflow.parquet/")

display(airbnbDF)

Create a data stream to make predictions off of. Fill in the schema field with the appropriate airbnbDF schema.

In [7]:
spark.conf.set("spark.sql.shuffle.partitions", "8")

In [8]:
# ANSWER
streamingData = (spark
                 .readStream
                 .schema(airbnbDF.schema)
                 .option("maxFilesPerTrigger", 1)
                 .parquet("/mnt/training/airbnb/sf-listings/airbnb-cleaned-mlflow.parquet/")
                 .drop("price"))

Run the following cell to train a random forest model `rf` for making price predictions.

In [10]:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor

df = airbnbDF.toPandas()
X_train, X_test, y_train, y_test = train_test_split(df.drop(["price"], axis=1), df[["price"]].values.ravel(), random_state=42)

# new random forest model
rf = RandomForestRegressor(n_estimators=100, max_depth=25)

# fit and evaluate new rf model
rf.fit(X_train, y_train)

## Define Post-Processing Logic

-sandbox
When processing our data stream, we are interested in seeing, for each data point, whether the predicted price is "High", "Medium", or "Low". To accomplish this, we are going to define a model class which will apply the desired post-processing step to our random forest `rf`'s results with a `.predict()` call.

Complete the `postprocess_result()` function to change the predicted value from a number to one of 3 categorical labels, "High", "Medium", or "Low". Then finish the line in `predict()` to return the desired output.

<img alt="Side Note" title="Side Note" style="vertical-align: text-bottom; position: relative; height:1.75em; top:0.05em; transform:rotate(15deg)" src="https://files.training.databricks.com/static/images/icon-note.webp"/> This can be done in pure Python or, for a more performant solution, using broadcasting on a `pandas` series or DataFrame.

In [13]:
# ANSWER
import mlflow
from mlflow.pyfunc import PythonModel

# Define the model class
class streaming_model(PythonModel):

    def __init__(self, trained_rf):
        self.rf = trained_rf

    def postprocess_result(self, results):
        '''return post-processed results
        High: predicted price >= 120
        Medium: predicted price < 120 and >= 70
        Low: predicted price < 70'''
        output = []
        for result in results:
          if result >= 120:
            output.append("High")
          elif result >= 70:
            output.append("Medium")
          else:
            output.append("Low")
        return output
    
    def predict(self, context, model_input):
        results = self.rf.predict(model_input)
        return self.postprocess_result(results)

Run the following cell to create and save your model at `model_path`.

In [15]:
# Construct and save the model
model_path = userhome + "/ml-production/07_streaming_model/"
dbutils.fs.rm(model_path.replace("/dbfs", ""), True) # remove folder if already exists

model = streaming_model(trained_rf = rf)
mlflow.pyfunc.save_model(dst_path=model_path, python_model=model)

The next cell will test your `streaming_model`'s `.predict()` function on fixed data `X_test` (not a data stream). You should see a list of price labels output underneath the cell.

In [17]:
# Load the model in `python_function` format
loaded_model = mlflow.pyfunc.load_pyfunc(model_path)

# Apply the model
loaded_model.predict(X_test)

## Apply Post-Processing Step to Data Stream

Finally, after confirming that your model works properly, apply it in parallel on all rows of `streamingData`.

In [20]:
# ANSWER
import mlflow.pyfunc

# Load the model in as a spark UDF
pyfunc_udf = mlflow.pyfunc.spark_udf(spark, model_path, result_type="string")

# Apply UDF to data stream
predictionsDF = streamingData.withColumn("prediction", pyfunc_udf(*streamingData.columns))

display(predictionsDF.select("prediction"))

## Write DataFrame to Parquet

Now continuously write `predictionsDF` to a parquet file as they get created by the model.

In [23]:
checkpointLocation = userhome + "/academy/stream.checkpoint"
writePath = userhome + "/academy/predictions"

(streamingData
  .writeStream                                           # Write the stream
  .format("delta")                                       # Use the delta format
  .partitionBy("zipcode")                                # Specify a feature to partition on
  .option("checkpointLocation", checkpointLocation)      # Specify where to log metadata
  .option("path", writePath)                             # Specify the output path
  .outputMode("append")                                  # Append new records to the output path
  .start()                                               # Start the operation
)

Check that your predictions are indeed being written out to `writePath`.

In [25]:
dbutils.fs.ls(writePath)

Run the following cell to terminate all active streams.

In [27]:
# stop streams
[q.stop() for q in spark.streams.active]


-sandbox
&copy; 2019 Databricks, Inc. All rights reserved.<br/>
Apache, Apache Spark, Spark and the Spark logo are trademarks of the <a href="http://www.apache.org/">Apache Software Foundation</a>.<br/>
<br/>
<a href="https://databricks.com/privacy-policy">Privacy Policy</a> | <a href="https://databricks.com/terms-of-use">Terms of Use</a> | <a href="http://help.databricks.com/">Support</a>