###### Initialize Spark Session 

In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages ml.combust.mleap:mleap-spark_2.11:0.16.0 pyspark-shell'

In [2]:
import findspark
findspark.init()

from pyspark.sql.session import SparkSession
spark = SparkSession.builder.appName('Hello_APP2').master('local[*]').getOrCreate()

###### Initialize MLeap enviornment

In [3]:
import sys
sys.path.append('mleap/python')
import mleap.pyspark

In [4]:
from pyspark.ml.feature import VectorAssembler, StandardScaler, OneHotEncoder, StringIndexer
from pyspark.ml import Pipeline, PipelineModel
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

In [5]:
#Create a test data frame
schema = StructType(
    [
        StructField("SepalLength", DoubleType(), True),
        StructField("SepalWidth", DoubleType(), True),
        StructField("PetalLength", DoubleType(), True),
        StructField("PetalWidth", DoubleType(), True),
        StructField("Species", StringType(), True),
    ]
)

irisdf = spark.read.csv('iris.csv', schema = schema, header = True)
irisdf.printSchema()

root
 |-- SepalLength: double (nullable = true)
 |-- SepalWidth: double (nullable = true)
 |-- PetalLength: double (nullable = true)
 |-- PetalWidth: double (nullable = true)
 |-- Species: string (nullable = true)



In [6]:
(trainingData, testData) = irisdf.randomSplit([0.7, 0.3], seed = 100)

trainingData.cache()
testData.cache()

print(trainingData.count())
print(testData.count())

103
47


In [7]:
from pyspark.ml.feature import VectorAssembler

vecAssembler = VectorAssembler(inputCols=["SepalLength", "SepalWidth", "PetalLength", "PetalWidth"], outputCol="features")

In [8]:
from pyspark.ml.feature import StringIndexer
# Convert target into numerical categories
labelIndexer = StringIndexer(inputCol="Species", outputCol="label")

In [9]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml import Pipeline

# Train a NaiveBayes model
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")

# Chain labelIndexer, vecAssembler and NBmodel in a 
pipeline = Pipeline(stages=[labelIndexer, vecAssembler, nb])

# Run stages in pipeline and train model
model = pipeline.fit(trainingData)

In [10]:
predictions = model.transform(testData)
predictions.select("label", "prediction", "probability").show()

+-----+----------+--------------------+
|label|prediction|         probability|
+-----+----------+--------------------+
|  2.0|       2.0|[0.29698237383357...|
|  2.0|       2.0|[0.21798511589122...|
|  2.0|       2.0|[0.15141773405828...|
|  2.0|       2.0|[0.20436085847774...|
|  2.0|       2.0|[0.24788699074567...|
|  2.0|       2.0|[0.22533897917645...|
|  2.0|       2.0|[0.21072257132948...|
|  2.0|       2.0|[0.16530848479213...|
|  2.0|       2.0|[0.18841991732741...|
|  2.0|       2.0|[0.20023264276145...|
|  2.0|       2.0|[0.28149718404605...|
|  2.0|       2.0|[0.20037174080684...|
|  2.0|       2.0|[0.18351954523753...|
|  2.0|       2.0|[0.24433103432406...|
|  2.0|       2.0|[0.18864884421606...|
|  2.0|       2.0|[0.17451529522505...|
|  2.0|       2.0|[0.17250580082906...|
|  0.0|       0.0|[0.49917720827077...|
|  2.0|       2.0|[0.16640704731458...|
|  0.0|       0.0|[0.50561466273043...|
+-----+----------+--------------------+
only showing top 20 rows



In [11]:
#Check if model is working on rendom datapoints

data_point = [[400.0,300.0,100.0,200.0,'Setosa']]
df = spark.createDataFrame(data_point, schema=schema)

predictions = model.transform(df)
predictions.select("label", "prediction", "probability").show()

+-----+----------+--------------------+
|label|prediction|         probability|
+-----+----------+--------------------+
|  2.0|       0.0|[0.99820596719649...|
+-----+----------+--------------------+



In [19]:
# serialize the model for online prediction

import mleap.pyspark
from mleap.pyspark.spark_support import SimpleSparkSerializer

# MLeap's pyspark_model.serializeToBundle(output_path) requires output_path to be an absolute path;
model.serializeToBundle("jar:file:/__Code/mdl_lfy/model/iris_spark_model.zip", model.transform(trainingData))