Copyright (c) Microsoft Corporation. All rights reserved.

Licensed under the MIT License.

In [2]:
import os
import json
import shutil

In [3]:
from reco_utils.dataset.criteo import get_spark_schema, load_spark_df

from azureml.core import Workspace
from azureml.core import VERSION as azureml_version

from azureml.core.model import Model
from azureml.core.conda_dependencies import CondaDependencies 
from azureml.core.webservice import Webservice, AksWebservice
from azureml.core.image import ContainerImage
from azureml.core.compute import AksCompute, ComputeTarget

# Check core SDK version number
print("Azure ML SDK version: {}".format(azureml_version))

## Configure Scoring Service Variables

In [5]:
MODEL_NAME = "criteo-lgbm.model"  # this name must exactly match the name used to save the pipeline model in the estimation notebook
CONDA_FILE = "deploy_conda.yaml"

## Create the conda dependencies file

In [7]:
# azureml-sdk is required to load the registered model
conda_file = CondaDependencies.create(pip_packages=['azureml-sdk', 'requests']).serialize_to_string()

with open(CONDA_FILE, "w") as f:
    f.write(conda_file)

## Define the Scoring Script

Next we, need to create the driver script that will be executed when the service is called. The functions that need to be defined for scoring are `init()` and `run()`. The `init()` function is run when the service is created, and the `run()` function is run each time the service is called.

In our example, we use the `init()` function to load all the libraries, initialize the spark session, start the spark streaming service and load the model pipeline. We use the `run()` method to route the input to the spark streaming service to generate predictions (in this case the probability of an interaction) then return the output.

## Version 1 - LightGBM with streaming

In [10]:
DRIVER_FILE = "mmlspark_streamscore.py"

driver_file = '''
import os
import json
from time import sleep
from uuid import uuid4
from zipfile import ZipFile

from pyspark.ml import PipelineModel
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
import requests


def init():
    """One time initialization of pyspark and model server"""

    spark = SparkSession.builder.appName("Model Server").getOrCreate()
    import mmlspark  # this is needed to load mmlspark libraries

    # extract and load model
    model_path = Model.get_model_path('{model_name}')
    with ZipFile(model_path, 'r') as f:
        f.extractall('model')
    model = PipelineModel.load('model')

    # load data schema saved with model
    with open(os.path.join('model', 'schema.json'), 'r') as f:
        schema = StructType.fromJson(json.load(f))

    input_df = (
        spark.readStream.continuousServer()
        .address("localhost", 8089, "predict")
        .load()
        .parseRequest(schema)
    )

    output_df = (
        model.transform(input_df)
        .makeReply("probability")
    )

    checkpoint = os.path.join('/tmp', 'checkpoints', uuid4().hex)
    server = (
        output_df.writeStream.continuousServer()
        .trigger(continuous="1 second")
        .replyTo("predict")
        .queryName("prediction")
        .option("checkpointLocation", checkpoint)
        .start()
    )

    # let the server finish starting
    sleep(1)


def run(input_json):
    try:
        response = requests.post(data=input_json, url='http://localhost:8089/predict')
        result = response.json()['probability']['values'][1]
    except Exception as e:
        result = str(e)
    
    return json.dumps({{"result": result}})
    
'''.format(model_name=MODEL_NAME)

# check syntax
exec(driver_file)

with open(DRIVER_FILE, "w") as f:
    f.write(driver_file)

## Version 2 - LightGBM - base

In [12]:
DRIVER_FILE = "mmlspark_basescore.py"

score_sparkml = """

import json
from zipfile import ZipFile

 
def init():
    # One-time initialization of PySpark and predictive model
    import pyspark
    from pyspark.ml import PipelineModel
    from mmlspark import LightGBMClassifier
    from azureml.core.model import Model
    from pyspark.ml import PipelineModel
    from pyspark.sql.types import StructType, StructField, IntegerType, StringType

    global trainedModel
    global spark
    global schema

    spark = pyspark.sql.SparkSession.builder.appName("LightGBM Criteo Predictions").getOrCreate()
    model_path = Model.get_model_path('{model_name}')
    with ZipFile(model_path, 'r') as f:
        f.extractall('model')
    trainedModel = PipelineModel.load('model')
    
def run(input_json):
    if isinstance(trainedModel, Exception):
        return json.dumps({{"trainedModel":str(trainedModel)}})
      
    try:
        sc = spark.sparkContext
        input_list = json.loads(input_json)
        input_rdd = sc.parallelize(input_list)
        input_df = spark.read.json(input_rdd)
        
        # Compute prediction
        predictions = trainedModel.transform(input_df).collect()
        #Get probability of a click for each row and conver to a str
        click_prob = [str(x.probability[1]) for x in predictions]

        # you can return any data type as long as it is JSON-serializable
        result = ",".join(click_prob)
        return [result]
    except Exception as e:
        result = str(e)
        return result
""".format(model_name=MODEL_NAME)
 
exec(score_sparkml)
 
with open(DRIVER_FILE, "w") as file:
    file.write(score_sparkml)

## Version 3 - streaming select

In [14]:
DRIVER_FILE = "mmlspark_streamselect.py"

driver_file = '''
import os
import json
from time import sleep
from uuid import uuid4
from zipfile import ZipFile

from azureml.core.model import Model
from pyspark.ml import PipelineModel
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
import requests


def init():
    """One time initialization of pyspark and model server"""

    spark = SparkSession.builder.appName("Model Server").getOrCreate()
    import mmlspark  # this is needed to load mmlspark libraries

    # extract and load model
    model_path = Model.get_model_path('{model_name}')
    with ZipFile(model_path, 'r') as f:
        f.extractall('model')
    model = PipelineModel.load('model')

    # load data schema saved with model
    with open(os.path.join('model', 'schema.json'), 'r') as f:
        schema = StructType.fromJson(json.load(f))

    input_df = (
        spark.readStream.continuousServer()
        .address("localhost", 8089, "predict")
        .load()
        .parseRequest(schema)
    )

    output_df = (
        input_df.select("cat00")
        .makeReply("probability")
    )

    checkpoint = os.path.join('/tmp', 'checkpoints', uuid4().hex)
    server = (
        output_df.writeStream.continuousServer()
        .trigger(continuous="1 second")
        .replyTo("predict")
        .queryName("prediction")
        .option("checkpointLocation", checkpoint)
        .start()
    )

    # let the server finish starting
    sleep(1)


def run(input_json):
    try:
        response = requests.post(data=input_json, url='http://localhost:8089/predict')
        result = response.json()['cat00']
    except Exception as e:
        result = str(e)
    
    return json.dumps({{"result": result}})
    
'''.format(model_name=MODEL_NAME)

# check syntax
exec(driver_file)

with open(DRIVER_FILE, "w") as f:
    f.write(driver_file)

## Version 4 - base select

In [16]:
DRIVER_FILE = "mmlspark_baseselect.py"

score_sparkml = """

import json
from zipfile import ZipFile
 
def init():
    # One-time initialization of PySpark and predictive model
    import pyspark
    from pyspark.ml import PipelineModel
    from mmlspark import LightGBMClassifier
    from azureml.core.model import Model
    from pyspark.ml import PipelineModel
    from pyspark.sql.types import StructType, StructField, IntegerType, StringType

    global trainedModel
    global spark
    global schema

    spark = pyspark.sql.SparkSession.builder.appName("LightGBM Criteo Predictions").getOrCreate()
    model_path = Model.get_model_path('{model_name}')
    with ZipFile(model_path, 'r') as f:
        f.extractall('model')
    trainedModel = PipelineModel.load('model')
    
def run(input_json):
    if isinstance(trainedModel, Exception):
        return json.dumps({{"trainedModel":str(trainedModel)}})
      
    try:
        sc = spark.sparkContext
        input_list = json.loads(input_json)
        input_rdd = sc.parallelize(input_list)
        input_df = spark.read.json(input_rdd)
        
        # Compute prediction - in this case, just a select
        predictions = input_df.select('cat00').collect()
        #Get probability of a click for each row and conver to a str
        click_prob = [x.cat00 for x in predictions]

        # you can return any data type as long as it is JSON-serializable
        result = ",".join(click_prob)
        return [result]
    except Exception as e:
        result = str(e)
        return result
""".format(model_name=MODEL_NAME)
 
exec(score_sparkml)
 
with open(DRIVER_FILE, "w") as file:
    file.write(score_sparkml)

In [17]:
%sh

ls mmlspark*py