In [1]:
import pandas as pd
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [2]:
spark = SparkSession.builder.appName("Exemple").config("spark.some.config.option", "some-value").getOrCreate()

In [3]:
df = spark.read.csv("iris.csv", header=True, sep=",", inferSchema=True).toDF("sep_len", "sep_wid", "pet_len", "pet_wid", "variety")
df_temp = VectorAssembler( inputCols=["sep_len", "sep_wid", "pet_len", "pet_wid"], outputCol="features"  ).transform(df)
df = df_temp.drop("sep_len", "sep_wid", "pet_len", "pet_wid")
df = StringIndexer( inputCol="variety", outputCol="label" ).fit(df).transform(df)

In [4]:
print df.printSchema() # print datatype of each column under tree representation
print df.describe().toPandas().transpose()
print df.show(5)

root
 |-- variety: string (nullable = true)
 |-- features: vector (nullable = true)
 |-- label: double (nullable = false)

None
             0     1                   2       3          4
summary  count  mean              stddev     min        max
variety    150  None                None  Setosa  Virginica
label      150   1.0  0.8192319205190407     0.0        2.0
+-------+-----------------+-----+
|variety|         features|label|
+-------+-----------------+-----+
| Setosa|[5.1,3.5,1.4,0.2]|  2.0|
| Setosa|[4.9,3.0,1.4,0.2]|  2.0|
| Setosa|[4.7,3.2,1.3,0.2]|  2.0|
| Setosa|[4.6,3.1,1.5,0.2]|  2.0|
| Setosa|[5.0,3.6,1.4,0.2]|  2.0|
+-------+-----------------+-----+
only showing top 5 rows

None


In [5]:
train, test = df.randomSplit( [0.7,0.3], seed = 11 )

In [7]:
scaler = StandardScaler( inputCol="features", outputCol="scaledFeatures" )
rf = RandomForestClassifier( labelCol="label", featuresCol="scaledFeatures", numTrees=250, seed=42 )

pipeline = Pipeline( stages=[scaler, rf] )

paramGrid = ParamGridBuilder()\
  .addGrid(rf.maxDepth, [2, 3, 4])\
  .addGrid(rf.numTrees, [25, 50, 75])\
  .build()

evaluator = MulticlassClassificationEvaluator(metricName="f1")
cv = CrossValidator(estimator=pipeline, evaluator=evaluator, estimatorParamMaps=paramGrid, numFolds=5)
cv = cv.fit(train)

In [9]:
params = [{p.name: v for p, v in m.items()} for m in cv.getEstimatorParamMaps()]
for ps, metric in zip(params, cv.avgMetrics):
    print ps, metric

{'numTrees': 25, 'maxDepth': 2} 0.927686480186
{'numTrees': 50, 'maxDepth': 2} 0.93559793148
{'numTrees': 75, 'maxDepth': 2} 0.93559793148
{'numTrees': 25, 'maxDepth': 3} 0.93559793148
{'numTrees': 50, 'maxDepth': 3} 0.943888529771
{'numTrees': 75, 'maxDepth': 3} 0.953011336788
{'numTrees': 25, 'maxDepth': 4} 0.953011336788
{'numTrees': 50, 'maxDepth': 4} 0.934690668808
{'numTrees': 75, 'maxDepth': 4} 0.953011336788


In [10]:
java_model = cv.bestModel.stages[-1]._java_obj
print "Best Model :" 
print {param.name: java_model.getOrDefault(java_model.getParam(param.name)) for param in paramGrid[0]}

Best Model :
{'numTrees': 75, 'maxDepth': 3}


In [11]:
predictions = cv.bestModel.transform(test).select("prediction", "label")

In [12]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

Test Error = 0.0243902


In [13]:
metrics = MulticlassMetrics(predictions.rdd)
print("Precision = %s" % metrics.precision())
print("Recall = %s" % metrics.recall())
print("F1 Score = %s" % metrics.fMeasure())

Precision = 0.975609756098
Recall = 0.975609756098
F1 Score = 0.975609756098


In [14]:
labels = [(i.asDict().values())[0] for i in df.select("label").distinct().collect()]

In [15]:
for label in sorted(labels):
    print"Class %s" % (label),  "P  = %s" % (metrics.precision(label))
    print"         "          , "R  = %s" % (metrics.recall(label))
    print"         "          , "F1 = %s" % (metrics.fMeasure(label, beta=1.0))

Class 0.0 P  = 1.0
          R  = 0.928571428571
          F1 = 0.962962962963
Class 1.0 P  = 0.923076923077
          R  = 1.0
          F1 = 0.96
Class 2.0 P  = 1.0
          R  = 1.0
          F1 = 1.0
