Thanks to [@dkaberna](https://github.com/dkaberna) for uploading the solution, I couldn't get pass the error complaining about not using BigData Services

# Cover Type Prediction using ensembles

## Dataset Description
The dataset represents the data about trees which were planted in the US. The dataset consists of the information about 500000 trees. Your aim is to build  Random Forest Ensemble to predict the cover type of trees. In order to successfully complete this assignment you have to follow this algorithm:
* Load the training data
* Transform categorical features into vector representations
* Split dataset into the train and validation part
* Fit the Random Forest Ensemble into the training set
* Compare the accuracy of the fitted model with the Logistic Regression Model, which is about 0.67 for this set


If you have enough time, it will be very interesting to dig into further research through these steps:
* Determine which features are valuable for your model (calculate feature importance of your model).
* Try to reduce number of trees and see the results.
* Understand why the linear models have poor performance on this dataset.


## Loading data

Init pyspark session

In [1]:
from __future__ import division, print_function, unicode_literals # For the compatibility with Python 2

In [2]:
from pyspark.sql import SparkSession
spark_session = SparkSession.builder\
                            .enableHiveSupport()\
                            .appName("spark sql")\
                            .master("local[4]")\
                            .getOrCreate()

Load train dataset located at /data/covertype2 with at least 60 partitions (use function repartition for this case). Use option `inferSchema` to save numerical features.

In [3]:
df = spark_session.read.format("com.databricks.spark.csv")\
        .option("header", "true")\
        .option("inferschema", "true")\
        .option("mode", "DROPMALFORMED")\
        .load("/data/covertype2/train.csv")\
        .repartition(60)

## Transforming data

As you can see, there are two categorical features in dataset: 'Soil_Type' and 'Wild_Type'. You have to transform them into the vector embeddings.

First of all, you have to use StringIndexer to transform feature types to indexes

In [4]:
from pyspark.ml.feature import StringIndexer

In [5]:
cat_cols=['Soil_Type','Wild_Type']
cat_cols_index={'Soil_Type':'Soil_Index','Wild_Type':'Wild_Index'}
cat_cols_encoder={'Soil_Index':'SoilEncoder','Wild_Index':'WildEncoder'}

In [6]:
stringIndexer = StringIndexer(inputCol = "Soil_Type", outputCol = "Soil_Index")
model1 = stringIndexer.fit(df)
indexedDF = model1.transform(df)

stringIndexer2 = StringIndexer(inputCol = "Wild_Type", outputCol = "Wild_Index")
model2 = stringIndexer2.fit(indexedDF)
indexedDF2 = model2.transform(indexedDF)

Apply OneHotEncoder technique to the dataset in order to get vectors for the Random Forest classification

In [7]:
from pyspark.ml.feature import OneHotEncoder

In [8]:
encoder = OneHotEncoder(inputCol = "Soil_Index", outputCol = "SoilEncoder")
encoder.setDropLast(False)
encodedDF = encoder.transform(indexedDF2)

encoder2 = OneHotEncoder(inputCol = "Wild_Index", outputCol = "WildEncoder")
encoder2.setDropLast(False)
encodedDF2 = encoder2.transform(encodedDF)

Use the VectorAssembler technique to accumulate all features into one vector. Don't forget to use features that you have generated

In [9]:
from pyspark.ml.feature import VectorAssembler

In [10]:
vector_assembler = VectorAssembler(inputCols=['SoilEncoder', # feature name of Soil type encoded
                                              'WildEncoder', # feature name of Wild type encoded
                                              'Elevation',
                                              'Aspect',
                                              'Slope',
                                              'Horizontal_Distance_To_Hydrology',
                                              'Vertical_Distance_To_Hydrology',
                                              'Horizontal_Distance_To_Roadways',
                                              'Hillshade_9am',
                                              'Hillshade_Noon',
                                              'Hillshade_3pm',
                                              'Horizontal_Distance_To_Fire_Points'
                                              ], outputCol='features')
finalDF = vector_assembler.transform(encodedDF2)

In [11]:
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier

### Cross-Validation

from pyspark.ml.classification import DecisionTreeClassifier

decisionTree = DecisionTreeClassifier(labelCol = "Target")

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.evaluation import BinaryClassificationEvaluator

pipeline = Pipeline(stages = [decisionTree])

paramGrid = ParamGridBuilder()\
    .addGrid(decisionTree.maxDepth, [1, 2, 3, 4, 5, 6, 7, 8, 9])\
    .build()

evaluator = MulticlassClassificationEvaluator(labelCol = "Target", predictionCol = "prediction", metricName = "accuracy") 

crossval = CrossValidator(estimator = pipeline,
                          estimatorParamMaps = paramGrid,
                          evaluator = evaluator,
                          numFolds = 10)

cvModel = crossval.fit(finalDF)

(trainingData, testData) = finalDF.randomSplit([0.8, 0.2], seed = 123)

predictions = cvModel.transform(testData)
evaluator = MulticlassClassificationEvaluator(labelCol = "Target", predictionCol = "prediction", metricName = "accuracy")
accuracy = evaluator.evaluate(predictions)

## Training

Fit the Random Forest model to the train dataset. Don't forget to split dataset into two parts to check your trained models. It is desirable to use about 100 trees with depth about 7 in order to avoid wasting too much time waiting while your model will be fit to the data. Try to adjust the options 'subsamplingRate' and 'featureSubsetStrategy' to get better results

<b> Extra task.</b> Use the Cross-Validation to check your model.

In [50]:
from pyspark.ml import Pipeline
#Pipeline for static model
rf = RandomForestClassifier(labelCol='Target',featuresCol='features', numTrees=100,maxDepth=9)

In [51]:
#Training the model
(trainingData, testData) = finalDF.randomSplit([0.8, 0.2], seed = 123)
model = rf.fit(trainingData)

Get the feature importances of the trained model. What 5 features are the most important in the dataset?

Apply model to the validation part of your set and get the accuracy score for the data. Use the MulticlassClassificationEvaluator class from the ml.evaluation module. 

In [52]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [53]:
predictions = model.transform(testData)
evaluator = MulticlassClassificationEvaluator(labelCol = "Target", predictionCol = "prediction", metricName = "accuracy")
accuracy = evaluator.evaluate(predictions)

### Addng CV

Are your results better than the results from the Logistic Regression model?

# Performing test submission

Apply the models to the test dataset.

<b>Note!</b> Dataset will be changed during the test phase. Your last cell output must be the accuracy score.

In [54]:
# Load dataset, transform dataset
dfTest = spark_session.read.format("com.databricks.spark.csv")\
        .option("header", "true")\
        .option("inferschema", "true")\
        .option("mode", "DROPMALFORMED")\
        .load("/data/covertype2")\
        .repartition(60)

In [55]:
model1 = stringIndexer.fit(dfTest)
indexedDFTest = model1.transform(dfTest)

model2 = stringIndexer2.fit(indexedDFTest)
indexedDF2Test= model2.transform(indexedDFTest)

encodedDFTest = encoder.transform(indexedDF2Test)

encodedDF2Test = encoder2.transform(encodedDFTest)
finalDFTest = vector_assembler.transform(encodedDF2Test)

In [56]:
# Calculate accuracy for static model
predictions = model.transform(finalDFTest)
evaluator = MulticlassClassificationEvaluator(labelCol = "Target", predictionCol = "prediction", metricName = "accuracy")
accuracy = evaluator.evaluate(predictions)

In [57]:
print(accuracy)

0.7171700726138098
