## Set up the dataframe.

Read the data into a Rdd.

In [2]:
import findspark
findspark.init('/usr/local/bin/spark-2.0.1')

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master("local") \
    .appName("Word Count") \
    .getOrCreate()

In [3]:
sc = spark.sparkContext
trainRdd = sc.textFile('input/tr.csv').cache()
testRdd = sc.textFile('input/test.csv').cache()

In [4]:
def makeDF(rdd):
    first = rdd.first()
    notFirst = rdd.filter(lambda r: r != first)
    tuples = notFirst.map(lambda x: tuple(x.split(',')))
    return spark.createDataFrame(tuples, first.split(',')).cache()

In [5]:
train_df = makeDF(trainRdd)
test_df = makeDF(testRdd)

In [6]:
train_df.select('species').show()

+--------------------+
|             species|
+--------------------+
|         Acer_Opalus|
|Pterocarya_Stenop...|
|Quercus_Hartwissiana|
|     Tilia_Tomentosa|
|  Quercus_Variabilis|
|Magnolia_Salicifolia|
| Quercus_Canariensis|
|       Quercus_Rubra|
|     Quercus_Brantii|
|      Salix_Fragilis|
|     Zelkova_Serrata|
|Betula_Austrosine...|
|     Quercus_Pontica|
|      Quercus_Afares|
|   Quercus_Coccifera|
|     Fagus_Sylvatica|
|         Phildelphus|
|       Acer_Palmatum|
|   Quercus_Pubescens|
|   Populus_Adenopoda|
+--------------------+
only showing top 20 rows



In [7]:
test_df.count()

594

## Prepare the data for training.

Label encode the target column in the training set.

In [12]:
from pyspark.ml.feature import IndexToString, StringIndexer
stInd = StringIndexer(inputCol='species', outputCol='speciesEnc')
stIndModel = stInd.fit(train_df)
train_df_enc = stIndModel.transform(train_df).cache()
train_df_enc.select('species', 'speciesEnc').show()

+--------------------+----------+
|             species|speciesEnc|
+--------------------+----------+
|         Acer_Opalus|      48.0|
|Pterocarya_Stenop...|      97.0|
|Quercus_Hartwissiana|      15.0|
|     Tilia_Tomentosa|      55.0|
|  Quercus_Variabilis|      18.0|
|Magnolia_Salicifolia|      38.0|
| Quercus_Canariensis|      78.0|
|       Quercus_Rubra|      98.0|
|     Quercus_Brantii|      42.0|
|      Salix_Fragilis|      88.0|
|     Zelkova_Serrata|      36.0|
|Betula_Austrosine...|       1.0|
|     Quercus_Pontica|      50.0|
|      Quercus_Afares|      25.0|
|   Quercus_Coccifera|      29.0|
|     Fagus_Sylvatica|      37.0|
|         Phildelphus|      39.0|
|       Acer_Palmatum|      77.0|
|   Quercus_Pubescens|      69.0|
|   Populus_Adenopoda|      34.0|
+--------------------+----------+
only showing top 20 rows



Create the features column that will be used to train the model.

In [13]:
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import *
cols = train_df_enc.columns
cols.remove('species')
cols.remove('speciesEnc')
cols.remove('id')
for col in cols:
    train_df_enc = train_df_enc.withColumn(col, train_df_enc[col].cast(FloatType()))
    test_df = test_df.withColumn(col, test_df[col].cast(FloatType()))

va = VectorAssembler(inputCols=cols, outputCol="features")
train_df_vec = va.transform(train_df_enc).cache()
test_df_vec = va.transform(test_df)
train_df_vec.select('features', 'speciesEnc').show()

+--------------------+----------+
|            features|speciesEnc|
+--------------------+----------+
|[0.00781199987977...|      48.0|
|[0.00585900014266...|      97.0|
|[0.00585900014266...|      15.0|
|[0.0,0.0039059999...|      55.0|
|[0.00585900014266...|      18.0|
|[0.0703120008111,...|      38.0|
|[0.02148400060832...|      78.0|
|[0.0,0.0,0.037108...|      98.0|
|[0.00585900014266...|      42.0|
|[0.0,0.0,0.009766...|      88.0|
|[0.01953100040555...|      36.0|
|[0.00195299996994...|       1.0|
|[0.015625,0.01171...|      50.0|
|[0.01171899959444...|      25.0|
|[0.01171899959444...|      29.0|
|[0.02734399959444...|      37.0|
|[0.00976600032299...|      39.0|
|[0.0,0.0,0.001952...|      77.0|
|[0.00195299996994...|      69.0|
|[0.00585900014266...|      34.0|
+--------------------+----------+
only showing top 20 rows



In [14]:
train_set, test_set = train_df_vec.select('features', 'speciesEnc').randomSplit([.8, .2])

## Fit the model.

In [15]:
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

rf = RandomForestClassifier(labelCol="speciesEnc", predictionCol="prediction", probabilityCol="probability", rawPredictionCol="rawPrediction", featureSubsetStrategy="auto", impurity="gini", seed=None)

paramGrid = ParamGridBuilder() \
    .addGrid(rf.maxDepth, [5,10]) \
    .addGrid(rf.numTrees,  [10, 25, 50]) \
    .build()
    
    
evaluator = MulticlassClassificationEvaluator(labelCol="speciesEnc", predictionCol="prediction")
    
crossval = CrossValidator(estimator=rf,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)

In [16]:
cvModel = crossval.fit(train_set)

In [17]:
cvModel.avgMetrics

[0.2444053501477224,
 0.5048817696418642,
 0.3650942291627472,
 0.7168536969463511,
 0.4583266302122232,
 0.769789167019974]

In [19]:
cvModel.bestModel.save("spark_random_forest_model_2")

## Make predictions on the test set.

In [20]:
train_df_vec.select('species').distinct().show()

+--------------------+
|             species|
+--------------------+
|      Quercus_Afares|
|   Alnus_Sieboldiana|
| Arundinaria_Simonii|
|         Acer_Pictum|
|Quercus_Semecarpi...|
|Callicarpa_Bodinieri|
|   Quercus_Alnifolia|
|   Quercus_Shumardii|
|     Acer_Circinatum|
|     Tilia_Tomentosa|
|     Fagus_Sylvatica|
|   Cotinus_Coggygria|
|     Acer_Capillipes|
|   Quercus_Coccifera|
|Viburnum_x_Rhytid...|
|     Quercus_Pontica|
|Populus_Grandiden...|
|  Cornus_Controversa|
|    Acer_Saccharinum|
|   Prunus_X_Shmittii|
+--------------------+
only showing top 20 rows



In [21]:
prediction_df = cvModel.bestModel.transform(test_df_vec).cache()

In [22]:
species_labels = stIndModel.labels
species_labels.insert(0,'id')

In [29]:
from decimal import *
pred_rdd = prediction_df.select('id', 'probability').rdd.map(lambda x: [float(x[0])] + [Decimal(a) for a in x[1]])

In [31]:
sol_fields = [StructField(field_name, DecimalType(38, 38), True) for field_name in species_labels]
sol_fields[0].dataType = FloatType()
sol_schema=StructType(sol_fields)    
pred_df = spark.createDataFrame(pred_rdd, sol_schema).cache()

In [32]:
import pandas as pd
pd_dataframe = pred_df.toPandas()