In [1]:
%load_ext autoreload
%autoreload 2

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier

import ads
import os 
from ads.common.model_export_util import prepare_generic_model

# create a spark session: 
spark = SparkSession \
    .builder \
    .appName('Titanic') \
    .getOrCreate()

# Read a csv from object storage

convert to spark dataframe

In [2]:
df = spark.read.format("csv").option('header', 'true').load("oci://hosted-ds-datasets@bigdatadatasciencelarge/titanic/titanic.csv")

In [3]:
df.show(5)

+--------+------+--------------------+------+---+-----------------------+-----------------------+-------+
|Survived|Pclass|                Name|   Sex|Age|Siblings/Spouses Aboard|Parents/Children Aboard|   Fare|
+--------+------+--------------------+------+---+-----------------------+-----------------------+-------+
|       0|     3|Mr. Owen Harris B...|  male| 22|                      1|                      0|   7.25|
|       1|     1|Mrs. John Bradley...|female| 38|                      1|                      0|71.2833|
|       1|     3|Miss. Laina Heikk...|female| 26|                      0|                      0|  7.925|
|       1|     1|Mrs. Jacques Heat...|female| 35|                      1|                      0|   53.1|
|       0|     3|Mr. William Henry...|  male| 35|                      0|                      0|   8.05|
+--------+------+--------------------+------+---+-----------------------+-----------------------+-------+
only showing top 5 rows



In [4]:
# Column types: 
df.dtypes

[('Survived', 'string'),
 ('Pclass', 'string'),
 ('Name', 'string'),
 ('Sex', 'string'),
 ('Age', 'string'),
 ('Siblings/Spouses Aboard', 'string'),
 ('Parents/Children Aboard', 'string'),
 ('Fare', 'string')]

## Data preparation and feature engineering

In [5]:
#Â Typecast columns. Survived is the target variable: 
dataset = df.select(col('Survived').cast('float'),
                         col('Pclass').cast('float'),
                         col('Sex'),
                         col('Age').cast('float'),
                         col('Fare').cast('float')
                        )

dataset.show()

+--------+------+------+----+-------+
|Survived|Pclass|   Sex| Age|   Fare|
+--------+------+------+----+-------+
|     0.0|   3.0|  male|22.0|   7.25|
|     1.0|   1.0|female|38.0|71.2833|
|     1.0|   3.0|female|26.0|  7.925|
|     1.0|   1.0|female|35.0|   53.1|
|     0.0|   3.0|  male|35.0|   8.05|
|     0.0|   3.0|  male|27.0| 8.4583|
|     0.0|   1.0|  male|54.0|51.8625|
|     0.0|   3.0|  male| 2.0| 21.075|
|     1.0|   3.0|female|27.0|11.1333|
|     1.0|   2.0|female|14.0|30.0708|
|     1.0|   3.0|female| 4.0|   16.7|
|     1.0|   1.0|female|58.0|  26.55|
|     0.0|   3.0|  male|20.0|   8.05|
|     0.0|   3.0|  male|39.0| 31.275|
|     0.0|   3.0|female|14.0| 7.8542|
|     1.0|   2.0|female|55.0|   16.0|
|     0.0|   3.0|  male| 2.0| 29.125|
|     1.0|   2.0|  male|23.0|   13.0|
|     0.0|   3.0|female|31.0|   18.0|
|     1.0|   3.0|female|22.0|  7.225|
+--------+------+------+----+-------+
only showing top 20 rows



In [6]:
# Index categorical columns with StringIndexer

dataset = StringIndexer(inputCol='Sex', 
                        outputCol='Gender', 
                        handleInvalid='keep').fit(dataset).transform(dataset)

# drop columns 
dataset = dataset.drop('Sex')
dataset = dataset.drop('Embarked')

dataset.show()

+--------+------+----+-------+------+
|Survived|Pclass| Age|   Fare|Gender|
+--------+------+----+-------+------+
|     0.0|   3.0|22.0|   7.25|   0.0|
|     1.0|   1.0|38.0|71.2833|   1.0|
|     1.0|   3.0|26.0|  7.925|   1.0|
|     1.0|   1.0|35.0|   53.1|   1.0|
|     0.0|   3.0|35.0|   8.05|   0.0|
|     0.0|   3.0|27.0| 8.4583|   0.0|
|     0.0|   1.0|54.0|51.8625|   0.0|
|     0.0|   3.0| 2.0| 21.075|   0.0|
|     1.0|   3.0|27.0|11.1333|   1.0|
|     1.0|   2.0|14.0|30.0708|   1.0|
|     1.0|   3.0| 4.0|   16.7|   1.0|
|     1.0|   1.0|58.0|  26.55|   1.0|
|     0.0|   3.0|20.0|   8.05|   0.0|
|     0.0|   3.0|39.0| 31.275|   0.0|
|     0.0|   3.0|14.0| 7.8542|   1.0|
|     1.0|   2.0|55.0|   16.0|   1.0|
|     0.0|   3.0| 2.0| 29.125|   0.0|
|     1.0|   2.0|23.0|   13.0|   0.0|
|     0.0|   3.0|31.0|   18.0|   1.0|
|     1.0|   3.0|22.0|  7.225|   1.0|
+--------+------+----+-------+------+
only showing top 20 rows



# Build a simple Pipeline Object

In [7]:
# Input features: 
required_features = ['Pclass',
                    'Age',
                    'Fare',
                    'Gender']

# 80/20 data split 
(training_data, test_data) = dataset.randomSplit([0.8,0.2])

assembler = VectorAssembler(inputCols=required_features, outputCol='features')
rf = RandomForestClassifier(labelCol='Survived', 
                            featuresCol='features',
                            maxDepth=5)

pipeline = Pipeline(stages=[assembler, rf])

In [8]:
pipeline_model = pipeline.fit(training_data)

In [9]:
results = pipeline_model.transform(dataset)
results.limit(10).show()

+--------+------+----+-------+------+--------------------+--------------------+--------------------+----------+
|Survived|Pclass| Age|   Fare|Gender|            features|       rawPrediction|         probability|prediction|
+--------+------+----+-------+------+--------------------+--------------------+--------------------+----------+
|     0.0|   3.0|22.0|   7.25|   0.0| [3.0,22.0,7.25,0.0]|[17.8566101677140...|[0.89283050838570...|       0.0|
|     1.0|   1.0|38.0|71.2833|   1.0|[1.0,38.0,71.2833...|[0.99662422569703...|[0.04983121128485...|       1.0|
|     1.0|   3.0|26.0|  7.925|   1.0|[3.0,26.0,7.92500...|[9.43397278245809...|[0.47169863912290...|       1.0|
|     1.0|   1.0|35.0|   53.1|   1.0|[1.0,35.0,53.0999...|[1.13394024301305...|[0.05669701215065...|       1.0|
|     0.0|   3.0|35.0|   8.05|   0.0|[3.0,35.0,8.05000...|[17.8264474683963...|[0.89132237341981...|       0.0|
|     0.0|   3.0|27.0| 8.4583|   0.0|[3.0,27.0,8.45829...|[17.7496419822158...|[0.88748209911079...|    

# Saving the Model Object to disk

In [10]:
artifact_dir = "./model-artifact"

if not os.path.exists(artifact_dir):
    os.makedirs(artifact_dir)
pipeline_model.write().overwrite().save(f"{artifact_dir}/my-model")
#pipeline_model.write().overwrite().save("oci://<my-bucket>@<my-namespace/my-model")

# Preparing a model artifact with ADS

In [11]:
ads_artifact = prepare_generic_model(artifact_dir, 
                                     force_overwrite=True, 
                                     function_artifacts=False, 
                                     data_science_env=True)

HBox(children=(HTML(value='loop1'), FloatProgress(value=0.0, max=4.0), HTML(value='')))

INFO:ADS:We give you the option to specify a different inference conda environment for model deployment purposes. By default it is assumed to be the same as the conda environment used to train the model. If you wish to specify a different environment for inference purposes, please assign the path of a published or data science conda environment to the optional parameter `inference_conda_env`. 


## Overwriting score.py to support spark models 

Added a bunch of print statements for logging purposes. 

In [12]:
%%writefile {artifact_dir}/score.py

from pyspark.sql import SparkSession
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.sql.functions import col


sc = SparkSession \
    .builder \
    .appName('Titanic2') \
    .getOrCreate()

"""
   Inference script. This script is used for prediction by scoring server when schema is known.
"""


def load_model():
    """
    Loads model from the serialized format

    Returns
    -------
    model:  a model instance on which predict API can be invoked
    """
    print('loading model')
    # use this path for model deployment: 
    pm2 = PipelineModel.load('/home/datascience/model-server/app/deployed_model/my-model')
    # use this path for testing locally: 
    #pm2 = PipelineModel.load('/home/datascience/sparkml/model-artifact/my-model/')
 
    print('done reading model')

    print(pm2.__class__)
    return pm2


def predict(data, model=load_model()) -> dict:
    """
    Returns prediction given the model and data to predict

    Parameters
    ----------
    model: Model instance returned by load_model API
    data: Data format as expected by the predict API of the core estimator. For eg. in case of sckit models it could be numpy array/List of list/Panda DataFrame

    Returns
    -------
    predictions: Output from scoring server
        Format: { 'prediction': output from `model.predict` method }

    """
    print('before reading data')
    tmp = sc.read.json(sc.sparkContext.parallelize([data]))
    print(tmp.show())
    print(tmp.printSchema())
    print('after reading data')
    tmp = tmp.select(col('Pclass').cast('float'),
                      col('Gender').cast('double'),
                      col('Age').cast('float'),
                      col('Fare').cast('float')
                     )
    print('after selecting subset of columns and typecasting')   
    results = model.transform(tmp)
    print('after prediction') 
    print(results.show())
    res = results.toPandas()['prediction']
    print("prediction = ", res)
    return { 'prediction': str(list(res)) }

Overwriting ./model-artifact/score.py


# Testing the model locally 

In [23]:
import sys 
sys.path.insert(0, artifact_dir)

from score import load_model, predict

In [24]:
_ = load_model()

loading model
done reading model
<class 'pyspark.ml.pipeline.PipelineModel'>


In [13]:
# Create a sample payload as a json string
test = test_data.toPandas()
testjson = test[:1].to_json(orient='records',lines=True)

In [14]:
testjson

'{"Survived":0.0,"Pclass":1.0,"Age":18.0,"Fare":108.9000015259,"Gender":0.0}'

In [27]:
res = predict(testjson, _)

before reading data
+---+--------------+------+------+--------+
|Age|          Fare|Gender|Pclass|Survived|
+---+--------------+------+------+--------+
|2.0|151.5500030518|   1.0|   1.0|     0.0|
+---+--------------+------+------+--------+

None
root
 |-- Age: double (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Gender: double (nullable = true)
 |-- Pclass: double (nullable = true)
 |-- Survived: double (nullable = true)

None
after reading data
after selecting subset of columns and typecasting
after prediction
+------+------+---+------+--------------------+--------------------+--------------------+----------+
|Pclass|Gender|Age|  Fare|            features|       rawPrediction|         probability|prediction|
+------+------+---+------+--------------------+--------------------+--------------------+----------+
|   1.0|   1.0|2.0|151.55|[1.0,2.0,151.5500...|[0.31171453926536...|[0.01558572696326...|       1.0|
+------+------+---+------+--------------------+-------------------

In [None]:
# Saving the artifact to the Catalog 

In [15]:
ads_artifact.save(display_name="my-spark-model", 
                  description="pipeline object",
                  project_id=os.environ['PROJECT_OCID'],
                  compartment_id=os.environ['NB_SESSION_COMPARTMENT_OCID'])

INFO:ADS:{
  "git_branch": "None",
  "git_commit": "None",
  "repository_url": "None",
  "script_dir": "/home/datascience/sparkml/model-artifact",
  "training_script": "None"
}


HBox(children=(HTML(value='loop1'), FloatProgress(value=0.0, max=4.0), HTML(value='')))

Unnamed: 0,Unnamed: 1
id,ocid1.datasciencemodel.oc1.iad.amaaaaaanif7xwiachppmyp5sgixa56ffgtweh3kki6mt3trc2uavab5ssjq
compartment_id,ocid1.compartment.oc1..aaaaaaaalcio324mqxi6egudwmc2wzix3yclcysmmji4cggvnj4b5timvw2q
project_id,ocid1.datascienceproject.oc1.iad.amaaaaaanif7xwiaot7v42xns7rha7cfvno76gzn4n2yhknw5c4i3jo5wpfq
display_name,my-spark-model
description,pipeline object
lifecycle_state,ACTIVE
time_created,2021-09-24 04:12:51.346000+00:00
created_by,ocid1.user.oc1..aaaaaaaabfrlcbiyvjmjvgh3ns6trdyoewxytqywwta3yqmy3ah3fa3uw76q
freeform_tags,{}
defined_tags,"{'Oracle-Tags': {'CreatedBy': 'jr_local', 'CreatedOn': '2021-09-24T04:12:51.204Z'}}"


# Deploy the model through the console 

# Invoke the model 

In [16]:
import requests
import oci
from oci.signer import Signer

# User principal auth: 
#config = oci.config.from_file("~/.oci/config") # replace with the location of your oci config file
#auth = Signer(
#  tenancy=config['tenancy'],
#  user=config['user'],
#  fingerprint=config['fingerprint'],
#  private_key_file_location=config['key_file'],
#  pass_phrase=config['pass_phrase'])

# Resource principal auth:
auth = oci.auth.signers.get_resource_principals_signer()

# replace with your own endpoint: 
endpoint = 'https://modeldeployment.us-ashburn-1.oci.customer-oci.com/ocid1.datasciencemodeldeployment.oc1.iad.amaaaaaanif7xwiaxyz2jly5hd6lofjpa7e7sinzs6gzcb6mt5bnb5sax43a/predict'

requests.post(endpoint, json=testjson, auth=auth).json()

{'prediction': '[0.0]'}

In [17]:
%%time 
requests.post(endpoint, json=testjson, auth=auth).json()

CPU times: user 81.6 ms, sys: 15.2 ms, total: 96.8 ms
Wall time: 3.29 s


{'prediction': '[0.0]'}