In [1]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

In [2]:
import mlflow
import mlflow.spark

loading the dataset

In [3]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext.getOrCreate()
sc.stop()
sc = SparkContext('local')
spark = SparkSession(sc)



In [4]:
file_path = "../../datasets/data_sf-airbnb/sf-airbnb-clean.parquet/"

In [5]:
airbnb_df = spark.read.parquet(file_path)
(train_df, test_df) = airbnb_df.randomSplit([0.8, 0.2], seed=42)

In [6]:
categorical_cols = [field for (field, dataType) in train_df.dtypes if dataType == "string"]

index_output_cols = [x + "Index" for x in categorical_cols]

string_indexer = StringIndexer(inputCols=categorical_cols,
                              outputCols=index_output_cols,
                              handleInvalid="skip")

numeric_cols = [field for (field, dataType) in train_df.dtypes
               if ((dataType == "double") & (field != "price"))]

assembler_inputs = index_output_cols + numeric_cols

vec_assembler = VectorAssembler(inputCols=assembler_inputs,
                               outputCol="features")

rf = RandomForestRegressor(labelCol="price", maxBins=40, maxDepth=5,
                          numTrees=100, seed=42)

pipeline = Pipeline(stages=[string_indexer, vec_assembler, rf])

**To start logging with MLFlow**, we need to start a run using mlflow.start_run(), we won't call mlflow.end_run(), the clause at the end will automatically end the block**

in example below pipeline is the SPARK PIPELINE, we created above

In [7]:
import pandas as pd

with mlflow.start_run(run_name="random-forest") as run:
    # log params: num_trees and max_depth
    mlflow.log_param("num_trees", rf.getNumTrees())
    mlflow.log_param("max_depth", rf.getMaxDepth())
    
    #Log model
    pipeline_model = pipeline.fit(train_df)
    mlflow.spark.log_model(pipeline_model, "model")
    
    # Log metrics: RMSE and R2
    # first get the pre from the model
    pred_df = pipeline_model.transform(test_df)
    
    # create a regression evaluator R2 and RMSE
    regression_evaluator = RegressionEvaluator(predictionCol="prediction",
                                              labelCol="price")
    rmse = regression_evaluator.setMetricName("rmse").evaluate(pred_df)
    r2 = regression_evaluator.setMetricName("r2").evaluate(pred_df)
    # now define a MLFlow metric 
    mlflow.log_metrics({"rmse": rmse, "r2": r2})
    
    # Log artefact: feature importance scores
    rf_model = pipeline_model.stages[-1]
    pandas_df = (pd.DataFrame(list(zip(vec_assembler.getInputCols(), 
                                    rf_model.featureImportances)), 
                           columns=["feature", "importance"])
              .sort_values(by="importance", ascending=False))
    
    # First write to local filesystem then tell MLFlow where to 
    # find that file
    pandas_df.to_csv("feature-importance.csv", index=False)
    mlflow.log_artifact("feature-importance.csv")

now we can examine the MLFlow UI, which can be accessed by runing **mlflow ui** command in the terminal and navigating to **localhost:5000**

The UI stores all the runs for a given experiment. **You can search across all the runs, filter for those that meet particular criteria, compare runs side by side, etc. If you wish, you can also export the contents as a CSV file to analyze locally.** Click on the run in the UI named "random-forest".<br>
You’ll notice that it **keeps track of the source code used for this MLflow run, as well as storing all the corresponding parameters, metrics, etc. You can add notes about this run in free text, as well as tags. You cannot modify the parameters or metrics after the run has finished.**

**You can also query the tracking server using the MlflowClient or REST API:**

In [8]:
from mlflow.tracking import MlflowClient

client = MlflowClient()
runs = client.search_runs(run.info.experiment_id,
                         order_by=["attributes.start_time desc"],
                         max_results=1)
run_id = runs[0].info.run_id
runs[0].data.metrics

{'r2': 0.22794251914574226, 'rmse': 211.5096898777315}

##### ROLE OF YAML FILE

**NOTE** We have YAML file that defines which libraries are required to run the code so, it could be seaminglessly deployed on another machine/environments.

below is the conda.yaml file for running this mlflow model

conda.yaml file

name: mlflow-project-example
channels:
  - conda-forge
dependencies:
  - python=3.7
  - pandas=0.24
  - pip=19.0.3
  - pip:
    - mlflow==1.8
    - pyspark==3.0.0

we can run the project having proper file folder structure and code, as below:

**below statement runs perfectly but we don't need to run it right now**

**mlflow.run(
  "https://github.com/databricks/LearningSparkV2/#mlflow-project-example", 
  parameters={"max_depth": 5, "num_trees": 100})**

**we can run mlflow from command line as well** by issuing the following command:<br><br>
**mlflow run https://github.com/databricks/LearningSparkV2/#mlflow-project-example
-P max_depth=5 -P num_trees=100**

## Model Deployment Option With MLlib

Machine learning model deployment have different meaning for every organization and use case. Business constraints will different requirements for latency, throughput,cost etc.
Throughput and latency has trade-off for different deployment options for generating predictions. We car about both concurrent requests and the size of those requests.

**$\;\;$$\;\;$$\;\;$$\;\;$$\;\;$ | Throughput$\;\;$L| atency$\;\;$| Example application**<br>
Batch$\;\;$$\;\;$$\;\;$| High$\;\;$High (hours to days)| $\;\;$Customer churn prediction<br>
Streaming$\;\;$Medium$\;\;| $Medium (seconds to minutes)| $\;\;$Dynamic pricing<br>
Real-time$\;\;$Low$\;\;$| Low (milliseconds)$\;\;$| Online ad bidding

**Batch processing**<br>
- Batch processing generates predictions on a regular schedule and writes the result out to persistent storage to be served elsewhere. 
- Cheapest and easiest deployment option as we only pay for the compute during scheduled run.
- Much efficient per data point.
- **DRAWBACK** its main drawback is latency, as it is typically scheduled with a period of hours or days to generate the next batch of predictions.

**Streaming processing**
- Streaming provides a nice trade-off between throughput and latency.
- You will continuously make predictions on micro-batches of data and get your predictions in seconds to minutes.
- If you are using Structured Streaming, almost all of your code will look identical to the batch use case, making it easy to go back and forth between these two options. 
- **DRAWBACK** With streaming, you will have to pay for the VMs or computing resources you use to continually stay up and running, and ensure that you have configured the stream properly to be fault tolerant and provide buffering if there are spikes in the incoming data.

**Realtime processing**
- Real-time deployment prioritizes latency over throughput and generates predictions in a few milliseconds. 
- Your **infrastructure will need to support load balancing** and be able to scale to many concurrent requests if there is a large spike in demand (e.g., for online retailers around the holidays).
- **Sometimes when people say “real-time deployment” they mean extracting precomputed predictions in real time, but here we’re referring to generating model predictions in real time.**
- **Real-time deployment is the only option that Spark cannot meet the latency requirements for, so to use it you will need to export your model outside of Spark.** 
- **DRAWBACK of Spark** For example, if you intend to use a REST endpoint for real-time model inference (say, computing predictions in under 50 ms), **MLlib does not meet the latency requirements necessary for this application**. You will need to get your feature preparation and model out of Spark, which can be time-consuming and difficult.

Before you begin the modeling process, you need to define your model deployment requirements. MLlib and Spark are just a few tools in your toolbox, and you need to understand when and where they should be applied. The remainder of this section discusses the deployment options for MLlib in more depth, and then we’ll consider the deployment options with Spark for non-MLlib models.

### BATCH DEPLOYMENT
- Handles majority of use case and easiest
- we run regular job to generate predictions and save the results to a table, database, data lake, etc, for downstream consumption.
- we already did batch preictions before by using .transform()

In [9]:
import mlflow.spark
pipeline_model = mlflow.spark.load_model(f"runs:/{run_id}/model")

# Generate predictions
input_df = spark.read.parquet("../../datasets/data_sf-airbnb/sf-airbnb-clean.parquet/")
pred_df = pipeline_model.transform(input_df)

2020/07/16 05:22:42 INFO mlflow.spark: 'runs:/5ae37bc3587241e39835bd937f011264/model' resolved as 'file:///home/wajeeh-machine/projects/data_science_portfolio/machine_learning_pipeline/notebooks/mlruns/0/5ae37bc3587241e39835bd937f011264/artifacts/model'
2020/07/16 05:22:42 INFO mlflow.spark: File 'file:///home/wajeeh-machine/projects/data_science_portfolio/machine_learning_pipeline/notebooks/mlruns/0/5ae37bc3587241e39835bd937f011264/artifacts/model/sparkml' is already on DFS, copy is not necessary.


**Important for batch deployment**<br>
- How frequently will you generate predictions?
    - There is a trade-off between latency and throughput. 
    - You will get higher throughput batching many predictions together, but then the time it takes to receive any individual predictions will be much longer, delaying your ability to act on these predictions.
- How often will you retrain the model?
    - Unlike libraries like sklearn or TensorFlow, MLlib does not support online updates or warm starts. **If you’d like to retrain your model to incorporate the latest data, you’ll have to retrain the entire model from scratch, rather than getting to leverage the existing parameters.**
    - **In terms of the frequency of retraining, some people will set up a regular job to retrain the model (e.g., once a month), while others will actively monitor the model drift to identify when they need to retrain.**
- How will you version the model?
    - You can use the **MLflow Model Registry** to keep track of the models you are using and **control how they are transitioned to/from staging, production, and archived.** You can use the Model Registry with the other deployment options too.

In addition to using the MLflow UI to manage your models, you can also manage them programmatically. For example, once you have registered your production model, it has a consistent URI that you can use to retrieve the latest version:

In [11]:
model_name="random-forest"

In [12]:
model_production_uri = F"models:/{model_name}/production"
model_production = mlflow.spark.load_model(model_production_uri)

MlflowException: Model Registry features are not supported by the store with URI: 'file:///home/wajeeh-machine/projects/data_science_portfolio/machine_learning_pipeline/notebooks/mlruns'. Stores with the following URI schemes are supported: ['databricks', 'http', 'https', 'postgresql', 'mysql', 'sqlite', 'mssql'].

## STREAMING

- Instead of waiting for an hourly or nightly job to process your data and generate predictions, Structured Streaming can continuously perform inference on incoming data.
- While this approach is more costly than a batch solution as you have to continually pay for compute time (and get lower throughput), you get the added benefit of generating predictions more frequently so you can act on them sooner.
- Streaming solutions in general are more complicated to maintain and monitor than batch solutions, but they offer lower latency.

**In Spark we only need to use spark.readStream() instead of stream.read() to convert batch predictions to streaming predictions.**
We will define a schema even if we are using parquet files b/c we define schema before working with streaming predictions/data. <br>
We will utilize random forest model saved in previous example. **We will load using MLFlow.** We have partitioned the source file into one hundred small Parquet files so you can see the output changing at every trigger interval:

In [13]:
# load saved model with MLFLow
pipeline_model = mlflow.spark.load_model(f"runs:/{run_id}/model")

2020/07/16 05:55:53 INFO mlflow.spark: 'runs:/5ae37bc3587241e39835bd937f011264/model' resolved as 'file:///home/wajeeh-machine/projects/data_science_portfolio/machine_learning_pipeline/notebooks/mlruns/0/5ae37bc3587241e39835bd937f011264/artifacts/model'
2020/07/16 05:55:53 INFO mlflow.spark: File 'file:///home/wajeeh-machine/projects/data_science_portfolio/machine_learning_pipeline/notebooks/mlruns/0/5ae37bc3587241e39835bd937f011264/artifacts/model/sparkml' is already on DFS, copy is not necessary.


Now we will create a parquet file for streaming data in local directory with 100 partitions

we don't have 100p partitioned data right now, we will check below code chunk later

In [15]:
# setup simulated streaming data
repartitioned_path = "../../datasets/data_sf-airbnb/sf-airbnb-clean-100p.parquet"
schema = spark.read.parquet(repartitioned_path).schema

streaming_data = (spark
                 .readStream
                 .schema(schema) # Can set the schema this way
                 .option("maxFilesPerTrigger", 1)
                 .parquet(repartitioned_path))

# Generate predictions
streamed_pred = pipeline_model.transform(streaming_data)

AnalysisException: Path does not exist: file:/home/wajeeh-machine/projects/data_science_portfolio/datasets/data_sf-airbnb/sf-airbnb-clean-100p.parquet;

### Model Export Patterns For Real-Time Inference

- There are some domains where real-time inference is required, including fraud detection, ad recommendation, and the like. While making predictions with a small number of records may achieve the low latency required for real-time inference, **you will need to contend with load balancing (handling many concurrent requests) as well as geolocation in latency-critical tasks**. 
- **There are popular managed solutions, such as AWS SageMaker and Azure ML, that provide low-latency model serving solutions.** In this section we’ll show you how to export your MLlib models so they can be deployed to those services.
- **One way to export your model out of Spark is to reimplement the model natively in Python, C, etc.** While it may seem simple to extract the coefficients of the model, exporting all the feature engineering and preprocessing steps along with them (OneHotEncoder, VectorAssembler, etc.) **quickly gets troublesome and is very error-prone.** 
- *There are a few open source libraries, such as MLeap and ONNX, that can help you automatically export a supported subset of the MLlib models to remove their dependency on Spark. However, as of the time of this writing the company that developed MLeap is no longer supporting it. Nor does MLeap yet support Scala 2.12/Spark 3.0.*


#####  ONYX
ONNX (Open Neural Network Exchange), on the other hand, has become the de facto open standard for machine learning interoperability. Some of you might recall other ML interoperability formats, like PMML (Predictive Model Markup Language), but those never gained quite the same traction as ONNX has now. ONNX is very popular in the deep learning community as a tool that allows developers to easily switch between libraries and languages, and at the time of this writing it has experimental support for MLlib.

##### #rd party libraries for exporting models
There are other third-party libraries that integrate with Spark that are convenient to deploy in real-time scenarios, such as XGBoost and H2O.ai’s Sparkling Water (whose name is derived from a combination of H2O and Spark).

## XGBoost advantages
- **XGBoost is one of the most successful algorithms in Kaggle competitions for structured data problems**, and it’s a very popular library among data scientists.
- Although XGBoost is not technically part of MLlib, the XGBoost4J-Spark library allows you to integrate distributed XGBoost into your MLlib pipelines.
- **A benefit of XGBoost is the ease of deployment: after you train your MLlib pipeline, you can extract the XGBoost model and save it as a non-Spark model for serving in Python**, as demonstrated here:

In [17]:
! pip install xgboost



#####  loading an XGBoost model is as simple as:

import xgboost as xgb
bst = xgb.Booster({'nthread': 4})
bst.load_model("xgboost_native_model")