#Scalable scoring with Databricks pipeline

#### After training and selecting best model, it's now time to run inferencing on large datasets. This notebook introduce a method where you can setup a scoring pipeline to run your model against large dataset using Spark distributed processing framework

In [3]:
# import the Workspace class and check the azureml SDK version
# exist_ok checks if workspace exists or not.
from azureml.core import Workspace
from azureml.core.authentication import InteractiveLoginAuthentication
subscription_id = dbutils.secrets.get("commonakv", "subscriptionid") #you should be owner or contributor
resource_group = "hcnDthCommon" #you should be owner or contributor

workspace_name = "amlcommonws" #your workspace name

workspace_region = "westus2" #your region
ws = Workspace.create(name = workspace_name,
                      subscription_id = subscription_id,
                      auth = InteractiveLoginAuthentication(force=True, tenant_id=dbutils.secrets.get("commonakv", "tenantid")),
                      resource_group = resource_group, 
                      location = workspace_region,
                      
                      exist_ok=True)




## 1. Load spark mlib model and score

It's straight forward to score against a model trained and stored as spark ml

In [6]:
#Download the model from the best run to a local folder. Use this during inference. 

import os
from azureml.core.model import Model
import shutil
from pyspark.ml import Pipeline, PipelineModel

model_name = "crime_prediction_RDF.mml"
#spark ml can only load model from hdfs/dbfs file, not local file
model_name_dbfs_client_path = "/dbfs/mnt/models/"
model_name_dbfs_server_path = "dbfs:/mnt/models/"
#Initialize model and loading from Azure ML using latest version
model = Model(name = model_name,workspace = ws)
if os.path.isfile(model_name) or os.path.isdir(model_name):
    shutil.rmtree(model_name)

model.download(model_name_dbfs_client_path, exist_ok=True)
dbutils.fs.cp("file:"+model_name_dbfs_client_path,model_name_dbfs_server_path, True)

model_p_best = PipelineModel.load(model_name_dbfs_server_path+"/"+model_name)

In [7]:
#score the result
crime_df = spark.sql("select * from crime_dataset").na.drop()
result = model_p_best.transform(crime_df)
result.write.format("delta").mode("overwrite").saveAsTable("spark_mlprediction")
display(spark.sql("select * from spark_mlprediction"))

## 2. Load SKLearn model (or ternsorflow etc..) and score

## If you trained your model with SKLearn or Tensorflow (not using Pyspark), it's still possible to run scalable scoring against large dataset. The approach below uses Pandas UDF to score batches of data from loaded SKLearn model you trained in prediction notebook

In [10]:
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType
import pandas as pd
from sklearn.externals import joblib


def make_predictions(sc, df, feature_cols, model_path, tf_path):
    """
    Make predictions.
    
    Arguments:
        sc: SparkContext.
        df (pyspark.sql.DataFrame): Input data frame containing feature_cols.
        feature_cols (list[str]): List of feature columns in df.
        model_path (str): Path to model on Spark driver

    Returns:
        df (pyspark.sql.DataFrame): Output data frame with probability column.
    """
    # Load classifier and broadcast to executors.
    model = sc.broadcast(joblib.load(model_path))
    tf = sc.broadcast(joblib.load(tf_path))

    # Define Pandas UDF
    @F.pandas_udf(returnType=DoubleType(), functionType=F.PandasUDFType.SCALAR)
    def predict(*cols):
        # Columns are passed as a tuple of Pandas Series'.
        # Combine into a Pandas DataFrame
        pdCrime = pd.concat(cols, axis=1)
        pdCrime.columns = feature_cols
        X = tf.value.transform(pdCrime)

#         # Make predictions .
        predictions = model.value.predict(X)
        # Return Pandas Series of predictions.
        return pd.Series(predictions)

    # Make predictions on Spark DataFrame.
    df = df.withColumn("predictions", predict(*feature_cols))
    
    return df

In [11]:
#Download the model from the best run to a local folder. Use this during inference. 

import os
from azureml.core.model import Model
import shutil
from sklearn.externals import joblib

from pyspark.ml import Pipeline, PipelineModel
#Initialize model and loading from Azure ML using latest version

model_name = "ml.joblib"
#spark ml can only load model from hdfs/dbfs file, not local file
model_name_dbfs_client_path = "/dbfs/mnt/models/"+model_name

ft_file = 'ft.joblib'
ml_file = 'ml.joblib'

ft_path = os.path.join(model_name_dbfs_client_path+"/"+model_name,ft_file )
ml_path = os.path.join(model_name_dbfs_client_path+"/"+model_name,ml_file )

model = Model(name = model_name,workspace = ws)
if os.path.isfile(model_name) or os.path.isdir(model_name):
    shutil.rmtree(model_name)

model.download(model_name_dbfs_client_path, exist_ok=True)
crime_df = spark.sql("select * from crime_dataset").na.drop()
feature_cols = ['week', 'day', 'district', 'primary_type', 'school_test_performance',
       'population', 'Unemployment_Rte', 'Median_Household_Income',
       'Average_Commute_Time', 'Area', 'PRCP', 'SNOW', 'SNWD', 'TMAX', 'TMIN',
       'TOBS', 'WT01', 'WT03', 'WT04', 'WT05', 'WT06', 'WT11']
sc = spark.sparkContext

pred=  make_predictions(sc, crime_df, feature_cols, ml_path, ft_path)
pred.write.format("delta").mode("overwrite").saveAsTable("sk_learn_prediction")
display(spark.sql("select * from sk_learn_prediction"))

## 3. Scoring with AKS deployed services

In [13]:
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType
import pandas as pd
import json
import requests
import ast
import time
def make_predictions_aks(sc, df, feature_cols):
    """
    Make predictions.
    
    Arguments:
        sc: SparkContext.
        df (pyspark.sql.DataFrame): Input data frame containing feature_cols.
        feature_cols (list[str]): List of feature columns in df.

    Returns:
        df (pyspark.sql.DataFrame): Output data frame with probability column.
    """
    scoring_uri ='http://13.66.160.122/api/v1/service/crime-pred-service-1/score'
    api_key ='i4J6SsvDPI3XuifhtI9NIhxrYGtt2Caz'
    # Define Pandas UDF
    @F.pandas_udf(returnType=DoubleType(), functionType=F.PandasUDFType.SCALAR)
    def predict(*cols):
        # Columns are passed as a tuple of Pandas Series'.
        # Combine into a Pandas DataFrame
        pdCrime = pd.concat(cols, axis=1)
        pdCrime.columns = feature_cols
        json_raw = json.dumps(pdCrime.to_json(orient='split'))
        headers = {'Content-Type':'application/json',  'Authorization':('Bearer '+ api_key)} 
        
        
        response = requests.post(scoring_uri, data=json_raw, headers=headers)
        while response.status_code != 200:
          time.sleep(1)
          response = requests.post(scoring_uri, data=json_raw, headers=headers)


#         # Make predictions .
        predictions_json = json.loads(response.content.decode("utf-8"))
        pred_list =ast.literal_eval(predictions_json)
        predictions =pd.Series(pred_list)
        # Return Pandas Series of predictions.
        return predictions

    # Make predictions on Spark DataFrame.
    df = df.withColumn("predictions", predict(*feature_cols))
    
    return df

In [14]:

crime_df = spark.sql("select * from crime_dataset").na.drop()
feature_cols = ['week', 'day', 'district', 'primary_type', 'school_test_performance',
       'population', 'Unemployment_Rte', 'Median_Household_Income',
       'Average_Commute_Time', 'Area', 'PRCP', 'SNOW', 'SNWD', 'TMAX', 'TMIN',
       'TOBS', 'WT01', 'WT03', 'WT04', 'WT05', 'WT06', 'WT11']
sc = spark.sparkContext

pred=  make_predictions_aks(sc, crime_df, feature_cols)
pred.write.format("delta").mode("overwrite").saveAsTable("aks_prediction")
display(spark.sql("select * from aks_prediction"))
# display(pred)

#Write result to SQL DW Table

In [16]:
#Using High PErformance Databricks SQL DW driver. STep 1 is to setup spark conf with intermediary blob credentials
storage_account = "i360pocstorage"
container= "sqldwdatabricks"
spark.conf.set(
  "fs.azure.account.key."+storage_account+".blob.core.windows.net",dbutils.secrets.get("commonakv", "i360pocstoragekey"))
spark.conf.set("spark.network.timeout","10000000")
#Step 2: fill in SQL DW credentials, in real development, use secret scope insftead
username = dbutils.secrets.get("commonakv", "sqldwusername")

password = dbutils.secrets.get("commonakv", "sqldwpassword")
servername = dbutils.secrets.get("commonakv", "sqldwserver")
database = dbutils.secrets.get("commonakv", "sqldwdb")

tempdir = "wasbs://"+container+"@"+storage_account+".blob.core.windows.net/"
connection_string = "jdbc:sqlserver://{0}.database.windows.net:1433;database={1};user={2}@{3};password={4}".format(servername, database,username,servername,password,servername)


In [17]:
#Read data from the table you stored prediction result in previous step into a dataframe
sql_sk_pred_df = spark.sql("select * from sk_learn_prediction")

In [18]:
#Write out to a table in SQL DW
sql_sk_pred_df.write \
  .format("com.databricks.spark.sqldw") \
  .option("url", connection_string) \
  .option("forwardSparkAzureStorageCredentials", "true") \
  .option("dbTable", "dbo.sk_learn_prediction") \
  .option("tempDir", tempdir) \
  .mode("overwrite") \
  .save()

In [19]:
#You can also read from SQL DW

pred_out = spark.read \
  .format("com.databricks.spark.sqldw") \
  .option("url", connection_string) \
  .option("tempDir",tempdir ) \
  .option("forwardSparkAzureStorageCredentials", "true") \
  .option("dbTable", "dbo.sk_learn_prediction") \
  .load()
display(pred_out)