In [None]:
from pyspark.sql.functions import col
from pyspark.sql.functions import explode
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import IndexToString
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


In [None]:
event_df = p_event_store.find('IrisApp')

In [None]:
def get_field_type(name):
    if name.startswith('attr'):
        return 'double'
    else:
        return 'string'

field_names = (event_df
            .select(explode("fields"))
            .select("key")
            .distinct()
            .rdd.flatMap(lambda x: x)
            .collect())
field_names.sort()
exprs = [col("fields").getItem(k).cast(get_field_type(k)).alias(k) for k in field_names]
data_df = event_df.select(*exprs)

In [None]:
(train_df, test_df) = data_df.randomSplit([0.9, 0.1])


In [None]:
labelIndexer = StringIndexer(inputCol="target", outputCol="label").fit(train_df)

featureAssembler = VectorAssembler(inputCols=[x for x in field_names if x.startswith('attr')],
                                   outputCol="features")
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=10)
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)
pipeline = Pipeline(stages=[featureAssembler, labelIndexer, rf, labelConverter])


In [None]:
model = pipeline.fit(train_df)


In [None]:
predict_df = model.transform(test_df)


In [None]:
predict_df.select("predictedLabel", "target", "features").show(5)


In [None]:
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predict_df)
print("Test Error = %g" % (1.0 - accuracy))


In [None]:
#model.save('/tmp/iris-model')