The SparkPipelineModel
class in ADS is designed to allow you to rapidly get a PySpark model into production. The .prepare()
method creates the model artifacts that are needed to deploy a functioning model without you having to configure it or write code. However, you can customize the required score.py
file.
The following steps take your trained PySpark
model and deploy it into production with a few lines of code.
Create a Spark Pipeline Model
Generate a synthetic dataset:
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.getOrCreate()
training = spark.createDataFrame(
[
(0, "a b c d e spark", 1.0),
(1, "b d", 0.0),
(2, "spark f g h", 1.0),
(3, "hadoop mapreduce", 0.0),
],
["id", "text", "label"],
)
test = spark.createDataFrame(
[
(4, "spark i j k"),
(5, "l m n"),
(6, "spark hadoop spark"),
(7, "apache hadoop"),
],
["id", "text"],
)
Create a Spark Pipeline. (Note that a Spark Pipeline can be made with just 1 stage.)
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
model = pipeline.fit(training)
import tempfile
from ads.model.framework.spark_model import SparkPipelineModel
from ads.common.model_metadata import UseCaseType
artifact_dir=tempfile.mkdtemp()
spark_model = SparkPipelineModel(estimator=model, artifact_dir=artifact_dir)
spark_model.prepare(inference_conda_env="pyspark32_p38_cpu_v2",
X_sample=training,
force_overwrite=True,
use_case_type=UseCaseType.BINARY_CLASSIFICATION)
Instantiate a SparkPipelineModel()
object with a PySpark model. Each instance accepts the following parameters:
artifact_dir: str
. Artifact directory to store the files needed for deployment.auth: (Dict, optional)
: Defaults toNone
. The default authentication is set using theads.set_auth
API. To override the default, useads.common.auth.api_keys()
orads.common.auth.resource_principal()
and create the appropriate authentication signer and the**kwargs
required to instantiate theIdentityClient
object.estimator: Callable
. Any model object generated by the PySpark framework.properties: (ModelProperties, optional)
. Defaults toNone
. TheModelProperties
object required to save and deploy model.
model_id = spark_model.save()
Start loading model.joblib from model directory /tmp/tmphdo8dfn3 ...
Model is successfully loaded.
['input_schema.json', 'runtime.yaml', 'model_input_data_schema.json', 'model', 'score.py']
'ocid1.datasciencemodel.oc1.xxx.xxxxx'
spark_model.deploy(
display_name="Spark Pipeline Model For Classification",
deployment_log_group_id="ocid1.loggroup.oc1.xxx.xxxxx",
deployment_access_log_id="ocid1.log.oc1.xxx.xxxxx",
deployment_predict_log_id="ocid1.log.oc1.xxx.xxxxx",
)
print(f"Endpoint: {spark_model.model_deployment.url}")
# https://modeldeployment.{region}.oci.customer-oci.com/ocid1.datasciencemodeldeployment.oc1.xxx.xxxxx
spark_model.predict(test)['prediction']
# [0.0, 0.0, 1.0, 0.0]
Model deployment endpoints can be invoked with the OCI-CLI. This example invokes a model deployment with the CLI with a json
payload:
>>> # Prepare data sample for prediction and save it to file 'payload'
>>> print(json.dumps(test.toJSON().collect()))
["{\"id\":4,\"text\":\"spark i j k\"}", "{\"id\":5,\"text\":\"l m n\"}",
"{\"id\":6,\"text\":\"spark hadoop spark\"}", "{\"id\":7,\"text\":\"apache hadoop\"}"]
Use printed output of the data and endpoint to invoke prediction with raw-request command in terminal:
export uri=https://modeldeployment.{region}.oci.customer-oci.com/ocid1.datasciencemodeldeployment.oc1.xxx.xxxxx/predict
export data='{"data": ["{\"id\":4,\"text\":\"spark i j k\"}", ... "{\"id\":7,\"text\":\"apache hadoop\"}"]}'
oci raw-request \
--http-method POST \
--target-uri $uri \
--request-body "$data"
{
"data": {
"prediction": [
0.0,
0.0,
1.0,
0.0
]
},
"headers": {
"Connection": "keep-alive",
"Content-Length": "32",
"Content-Type": "application/json",
"Date": "Thu, 08 Dec 2022 18:45:12 GMT",
"X-Content-Type-Options": "nosniff",
"opc-request-id": "C2E73B1679B34BAD8358B49D20619055/0EE2E5F93F48142725525D7A5BA7F5FB/049A66AA38AA0163DBBC70F225285851",
"server": "uvicorn"
},
"status": "200 OK"
}
Adapted from an example provided by Apache in the PySpark API Reference Documentation.
import tempfile
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import SparkSession
from ads.model.framework.spark_model import SparkPipelineModel
from ads.common.model_metadata import UseCaseType
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.getOrCreate()
artifact_dir=tempfile.mkdtemp()
training = spark.createDataFrame(
[
(0, "a b c d e spark", 1.0),
(1, "b d", 0.0),
(2, "spark f g h", 1.0),
(3, "hadoop mapreduce", 0.0),
],
["id", "text", "label"],
)
test = spark.createDataFrame(
[
(4, "spark i j k"),
(5, "l m n"),
(6, "spark hadoop spark"),
(7, "apache hadoop"),
],
["id", "text"],
)
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
model = pipeline.fit(training)
spark_model = SparkPipelineModel(estimator=model, artifact_dir=artifact_dir)
spark_model.prepare(inference_conda_env="pyspark32_p38_cpu_v2",
X_sample=training,
force_overwrite=True,
use_case_type=UseCaseType.BINARY_CLASSIFICATION)
# Check if the artifacts are generated correctly.
# The verify method invokes the ``predict`` function defined inside ``score.py`` in the artifact_dir
prediction = spark_model.verify(test)
# Register the model
spark_model.save(display_name="Spark Pipeline Model")
# Deploy and create an endpoint for the Spark model
spark_model.deploy(
display_name="Spark Pipeline Model For Classification",
deployment_log_group_id="ocid1.loggroup.oc1.xxx.xxxxx",
deployment_access_log_id="ocid1.log.oc1.xxx.xxxxx",
deployment_predict_log_id="ocid1.log.oc1.xxx.xxxxx",
)
# Generate prediction by invoking the deployed endpoint
spark_model.predict(test)["prediction"]
# To delete the deployed endpoint uncomment the line below
# spark_model.delete_deployment(wait_for_completion=True)