In [3]:
import os
os.environ['SPARK_HOME'] = os.environ['HOME'] + "/Codecookies/sparkworkspace/spark-open-src/dist"
import findspark
findspark.init()

In [10]:
from sklearn.datasets import load_iris
import pandas as pd
import numpy as np
from pyspark.ml.classification import *
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.feature import *
from pyspark.sql import SparkSession

In [6]:
spark = SparkSession.builder.appName("ppmml_test").master("local").getOrCreate()

# Train iris data and export spark model and schema

In [15]:
iris_data = load_iris(True)
(X, y) = iris_data
pandas_df = pd.DataFrame(X)
features = ['x1', 'x2', 'x3', 'x4']
label = 'label'
pandas_df.columns = features
pandas_df[label] = y

In [16]:
df = spark.createDataFrame(pandas_df)
df.show(5)

+---+---+---+---+-----+
| x1| x2| x3| x4|label|
+---+---+---+---+-----+
|5.1|3.5|1.4|0.2|    0|
|4.9|3.0|1.4|0.2|    0|
|4.7|3.2|1.3|0.2|    0|
|4.6|3.1|1.5|0.2|    0|
|5.0|3.6|1.4|0.2|    0|
+---+---+---+---+-----+
only showing top 5 rows



In [17]:
vector_assembler = VectorAssembler(inputCols=features, outputCol="features")
lr = LogisticRegression(tol=1e-4, featuresCol="features", labelCol="label")
stages = [vector_assembler, lr]
pipeline = Pipeline(stages=stages)
model = pipeline.fit(df)

In [22]:
base_path = "/tmp/pmml-models/spark/"
model_output = base_path + "logistic_regression_model"
model.write().overwrite().save(model_output)

In [23]:
schema_output = base_path + "logistic_regression.json"
with open(schema_output, 'w+') as f:
    f.write(df.schema.json())

# Export PMML File with PPMML

In [24]:
import ppmml

In [25]:
pmml_output = base_path + "logistic_regression.pmml"
ppmml.to_pmml(model_input=model_output, schema_input=schema_output, pmml_output=pmml_output, model_type="spark")

INFO: 17-12-28 17:34:14: base_converter.py:89 * 140735235661824 Starting to convert model file /tmp/pmml-models/spark/logistic_regression_model to pmml file
INFO: 17-12-28 17:34:26: base_converter.py:96 * 140735235661824 Successfully generate pmml file: /tmp/pmml-models/spark/logistic_regression.pmml


# Predict with pmml file

In [27]:
# Prepare test data
test_data_input = base_path + "test.csv"
pandas_df.to_csv(test_data_input, header=True, index=False)

In [28]:
predict_output = base_path + "predict.csv"
ppmml.predict(pmml_input=pmml_output,
              data_input=test_data_input, data_output=predict_output, options={"separator": ","})

INFO: 17-12-28 17:36:35: evaluator.py:62 * 140735235661824 Starting to make predictions of pmml file: /tmp/pmml-models/spark/logistic_regression.pmml, data_input: /tmp/pmml-models/spark/test.csv, data_output: /tmp/pmml-models/spark/predict.csv
INFO: 17-12-28 17:36:37: evaluator.py:80 * 140735235661824 Successfully generate predictions to path: /tmp/pmml-models/spark/predict.csv


In [29]:
pd.read_csv(predict_output).head(5)

Unnamed: 0,x1,x2,x3,x4,label,label.1,pmml(prediction),prediction,probability(0),probability(1),probability(2)
0,5.1,3.5,1.4,0.2,0,0,0,0.0,1.0,2.68887e-31,4.154027e-58
1,4.9,3.0,1.4,0.2,0,0,0,0.0,1.0,1.40848e-20,1.005972e-45
2,4.7,3.2,1.3,0.2,0,0,0,0.0,1.0,2.164983e-27,2.591802e-53
3,4.6,3.1,1.5,0.2,0,0,0,0.0,1.0,1.551617e-24,3.055816e-49
4,5.0,3.6,1.4,0.2,0,0,0,0.0,1.0,2.022037e-34,2.049216e-61
