In [None]:
from __future__ import division, print_function, unicode_literals

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder\
                            .enableHiveSupport()\
                            .appName("sparksql")\
                            .master("local[*]")\
                            .getOrCreate()

In [None]:
inputDF = spark.read.csv("dataset.csv", header=True, inferSchema=True).repartition(100).cache()

In [None]:
from pyspark.ml.feature import StringIndexer

In [None]:
stringIndexer = StringIndexer(inputCol = "Soil_Type", outputCol = "Soil_I")
si_model = stringIndexer.fit(inputDF)
inputDF = si_model.transform(inputDF)

In [None]:
stringIndexer2 = StringIndexer(inputCol = "Wild_Type", outputCol = "Wild_I")
si_model = stringIndexer2.fit(inputDF)
inputDF = si_model.transform(inputDF)

In [None]:
from pyspark.ml.feature import OneHotEncoder

In [None]:
encoder = OneHotEncoder(inputCol='Wild_I', outputCol="WildEncoder")
inputDF = encoder.transform(inputDF)
encoder = OneHotEncoder(inputCol='Soil_I', outputCol="SoilEncoder")
inputDF = encoder.transform(inputDF)

In [None]:
from pyspark.ml.feature import VectorAssembler

In [None]:
vector_assembler = VectorAssembler(inputCols=['SoilEncoder', # feature name of Soil type encoded
                                              'WildEncoder', # feature name of Wild type encoded
                                              'Elevation',
                                              'Aspect',
                                              'Slope',
                                              'Horizontal_Distance_To_Hydrology',
                                              'Vertical_Distance_To_Hydrology',
                                              'Horizontal_Distance_To_Roadways',
                                              'Hillshade_9am',
                                              'Hillshade_Noon',
                                              'Hillshade_3pm',
                                              'Horizontal_Distance_To_Fire_Points'
                                              ], outputCol='features')



In [None]:
inputDF = vector_assembler.transform(inputDF)

In [None]:
from pyspark.ml.classification import RandomForestClassifier, RandomForestClassificationModel

In [None]:
(trainingData, testData) = inputDF.randomSplit([0.7, 0.3], seed = 23)
rfClassifer = RandomForestClassifier(labelCol='Target')

In [None]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline

pipeline = Pipeline(stages = [rfClassifer])


paramGrid = ParamGridBuilder()\
    .addGrid(rfClassifer.maxDepth, [6,7,8]) \
    .addGrid(rfClassifer.numTrees, [95,100,105]) \
    .build()
    
    
evaluator = MulticlassClassificationEvaluator(labelCol = "Target", predictionCol = "prediction", metricName = "accuracy") 

crossval = CrossValidator(estimator = pipeline,
                          estimatorParamMaps = paramGrid,
                          evaluator = evaluator,
                          numFolds = 3,
                          seed = 23)

cvModel = crossval.fit(trainingData)

predictions = cvModel.transform(testData)
accuracy = evaluator.evaluate(predictions)
print(accuracy)