## Setup SparkContext and SQLContext

In [None]:
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SQLContext

sparkContext = SparkContext.getOrCreate()
sqlContext = SQLContext(sparkContext)

sqlContext

<pyspark.sql.context.SQLContext at 0x7f35005212e8>

## Load Training Dataset from S3 into Spark

In [None]:
data = sqlContext.read.format("csv") \
  .option("inferSchema", "true").option("header", "true") \
  .load("s3a://datapalooza/airbnb/airbnb.csv.bz2")

data.head()

## Use Spark ML Pipeline to build Decision Tree Classifier

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import RFormula
from pyspark.ml.classification import DecisionTreeClassifier

formula = RFormula(formula = "price ~ .")
classifier = DecisionTreeClassifier()
pipeline = Pipeline(stages = [formula, classifier])
pipelineModel = pipeline.fit(data)

print(pipelineModel)

In [None]:
print(pipelineModel.stages[1].toDebugString)

## Convert Spark ML Pipeline to PMML

In [None]:
from jpmml import toPMMLBytes

pmmlBytes = toPMMLBytes(sparkContext, data, pipelineModel)

pmmlBytes.decode("utf-8")

## Deployment Option 1:  Mutable Model Deployment

### Deploy PMML to Live Prediction Server

In [None]:
import urllib.request

update_url = 'http://prediction/update-pmml/census'

update_headers = {}
update_headers['Content-type'] = 'application/xml'

req = urllib.request.Request(update_url, headers=update_headers, data=pmmlBytes)
resp = urllib.request.urlopen(req)

print(resp.status) # Should return Http Status 200 

### Test New Model on Live Prediction Server

In [None]:
import urllib.parse
import json

evaluate_url = 'http://prediction/evaluate-pmml/census'

evaluate_headers = {}
evaluate_headers['Content-type'] = 'application/json'
input_params = '{"age":39,"workclass":"State-gov","education":"Bachelors","education_num":13,"marital_status":"Never-married","occupation":"Adm-clerical","relationship":"Not-in-family","race":"White","sex":"Male","capital_gain":2174,"capital_loss":0,"hours_per_week":40,"native_country":"United-States"}' 
encoded_input_params = input_params.encode('utf-8')

req = urllib.request.Request(evaluate_url, headers=evaluate_headers, data=encoded_input_params)
resp = urllib.request.urlopen(req)

print(resp.read()) # Should return valid classification with probabilities

## Deployment Option 2:  Immutable Model Deployment

### Deploy New Prediction Service with New Model

In [None]:
!mkdir -p pmml

with open('/root/source.ml/demos.ml/serving/data/census/census.pmml', 'wb') as f:
  f.write(pmmlBytes)

!cat /root/source.ml/demos.ml/serving/data/census/census.pmml

In [None]:
!kubectl get pod

In [None]:
!/root/datasticks-push.sh

In [None]:
!kubectl get pod

In [None]:
!/root/datasticks-scale.sh

In [None]:
!kubectl get pod