# Predicting Forect Cover Type

In this notebook, we are trying to predict the forest cover type of a land using features such a elevation, soil type, slope, etc. This is a supervised learning problem and we have a dataset readily available with training examples and the true target value. Furthermore, this is a classification type problem as for each example we are trying to classify it into one of the 7 pre-defined forest cover type.

## Requirements

This tutorial uses pyspark 3.0 API for distributed machine learning on big data.

In [1]:
# Importing necessary libraries
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
import pyspark.sql.types as t
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, VectorIndexer
from pyspark.ml.classification import RandomForestClassifier, DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
import pandas as pd
from pprint import PrettyPrinter
import os
import gc

## Spark Configuration

I am using spark in the standalone mode with single node cluster. For other configurations, see spark's documentation [here](https://spark.apache.org/docs/latest/).

In [2]:
# Creating SparkSession object
spark = SparkSession.builder.appName('ForestCover').master("spark://DESKTOP-H1P4CCM.localdomain:7077").getOrCreate()

In [3]:
# Global Variables
base = os.getcwd()
filepath = "datasets/covtype/covtype.data"
colNames = ["Elevation", "Aspect", "Slope", "Horizontal_Distance_To_Hydrology", "Vertical_Distance_To_Hydrology",\
            "Horizontal_Distance_To_Roadways", "Hillshade_9am", "Hillshade_Noon", "Hillshade_3pm",\
            "Horizontal_Distance_To_Fire_Points"] + ["Wilderness_Area_"+str(i) for i in range(4)]\
+ ["Soil_Type_"+str(i) for i in range(40)] + ['Cover_Type']
labelCol = "Cover_Type"
pp = PrettyPrinter()

## Dataset

For this excercise, I am using Covtype dataset avaliable [online](https://archive.ics.uci.edu/ml/machine-learning-databases/covtype/) as compressed zip file and the associted info file. The dataset contains records of land patches in Colarado, USA and their associated forest cover type. Each record contains 54 properties that collectively describe a land patch. Of the 54 properties, 10 are numerical and 2 categorical which are one-hot encoded and spread out into 44 propeties (4 wilderness and 40 soil type).

In [4]:
# Read csv file
covtype = spark.read.option('header', False).option('inferSchema', True).csv(os.path.join(base, filepath))
covtype.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- _c1: integer (nullable = true)
 |-- _c2: integer (nullable = true)
 |-- _c3: integer (nullable = true)
 |-- _c4: integer (nullable = true)
 |-- _c5: integer (nullable = true)
 |-- _c6: integer (nullable = true)
 |-- _c7: integer (nullable = true)
 |-- _c8: integer (nullable = true)
 |-- _c9: integer (nullable = true)
 |-- _c10: integer (nullable = true)
 |-- _c11: integer (nullable = true)
 |-- _c12: integer (nullable = true)
 |-- _c13: integer (nullable = true)
 |-- _c14: integer (nullable = true)
 |-- _c15: integer (nullable = true)
 |-- _c16: integer (nullable = true)
 |-- _c17: integer (nullable = true)
 |-- _c18: integer (nullable = true)
 |-- _c19: integer (nullable = true)
 |-- _c20: integer (nullable = true)
 |-- _c21: integer (nullable = true)
 |-- _c22: integer (nullable = true)
 |-- _c23: integer (nullable = true)
 |-- _c24: integer (nullable = true)
 |-- _c25: integer (nullable = true)
 |-- _c26: integer (nullable = true)
 |-- _

In [5]:
# Renaming the columns
data = covtype.toDF(*colNames).withColumn(labelCol, f.col(labelCol).cast("double"))
data.printSchema()

root
 |-- Elevation: integer (nullable = true)
 |-- Aspect: integer (nullable = true)
 |-- Slope: integer (nullable = true)
 |-- Horizontal_Distance_To_Hydrology: integer (nullable = true)
 |-- Vertical_Distance_To_Hydrology: integer (nullable = true)
 |-- Horizontal_Distance_To_Roadways: integer (nullable = true)
 |-- Hillshade_9am: integer (nullable = true)
 |-- Hillshade_Noon: integer (nullable = true)
 |-- Hillshade_3pm: integer (nullable = true)
 |-- Horizontal_Distance_To_Fire_Points: integer (nullable = true)
 |-- Wilderness_Area_0: integer (nullable = true)
 |-- Wilderness_Area_1: integer (nullable = true)
 |-- Wilderness_Area_2: integer (nullable = true)
 |-- Wilderness_Area_3: integer (nullable = true)
 |-- Soil_Type_0: integer (nullable = true)
 |-- Soil_Type_1: integer (nullable = true)
 |-- Soil_Type_2: integer (nullable = true)
 |-- Soil_Type_3: integer (nullable = true)
 |-- Soil_Type_4: integer (nullable = true)
 |-- Soil_Type_5: integer (nullable = true)
 |-- Soil_Type

In [6]:
# Data
data.show(1, truncate=False, vertical=True)

-RECORD 0----------------------------------
 Elevation                          | 2596 
 Aspect                             | 51   
 Slope                              | 3    
 Horizontal_Distance_To_Hydrology   | 258  
 Vertical_Distance_To_Hydrology     | 0    
 Horizontal_Distance_To_Roadways    | 510  
 Hillshade_9am                      | 221  
 Hillshade_Noon                     | 232  
 Hillshade_3pm                      | 148  
 Horizontal_Distance_To_Fire_Points | 6279 
 Wilderness_Area_0                  | 1    
 Wilderness_Area_1                  | 0    
 Wilderness_Area_2                  | 0    
 Wilderness_Area_3                  | 0    
 Soil_Type_0                        | 0    
 Soil_Type_1                        | 0    
 Soil_Type_2                        | 0    
 Soil_Type_3                        | 0    
 Soil_Type_4                        | 0    
 Soil_Type_5                        | 0    
 Soil_Type_6                        | 0    
 Soil_Type_7                    

## Target

The target variable for this problem is a categorical variable with 7 categories each of which represents a unique forest coverage type. As we can see from the record counts for each of the category of target variable, the dataset is highly imabalanced.

In [7]:
# Target
data.groupBy(labelCol).count().orderBy(labelCol).show()

+----------+------+
|Cover_Type| count|
+----------+------+
|       1.0|211840|
|       2.0|283301|
|       3.0| 35754|
|       4.0|  2747|
|       5.0|  9493|
|       6.0| 17367|
|       7.0| 20510|
+----------+------+



## Training

I am using Spark's MLlib library to perform multi-class classification on this dataset. The data is first split into train and test sets. I will then calculate the baseline accuracy using random guessing based on proportions of records of each class in training and test datasets. After that, I will train a simple DecisionTree learner on the training set without any parameter tuning and compare its performance with the baseline model. I will then proceed to to use a grid search model, from spark's MLlib, to tune the decision tree, using k-fold cross-validation, and improve its performance. I will also undo the one-hot encoding of some categorical variables to create a smaller, efficient dataset. In the end, I will use a more powerful RandomForest model to see if we can get better results.

In [8]:
# Splitting dataset into train and test sets
train, test = data.randomSplit([0.9, 0.1])

### Random Baseline

In [9]:
def calculateProbabilities(data, label):
    """
    The function takes a spark DataFrame as input and returns the probability of selecting different values in label column.
    
    Parameters
    -----------
    data : pyspark.sql.DataFrame
        PySpark DataFrame containing the dataset
    label: str
        Name of the target variable in dataset
    
    Returns
    -------
    list
        A list of tuples where the first element is a unique category in target variable and the second element specifies the
        proportion of records in the dataset that belong to that category.
    """
    total = data.count()
    return data.select(label).groupBy(label).count().orderBy(label).rdd.map(lambda x: (x[0], x[1]/total)).collect()

In [10]:
# Proportion of each category in training data
trainProbabilities = calculateProbabilities(train, labelCol)
testProbabilities = calculateProbabilities(test, labelCol)
baseline_accuracy = sum([x[1]*y[1] for x, y in zip(trainProbabilities, testProbabilities)])
print("The baseline accuracy is:", round(baseline_accuracy*100, 3))

The baseline accuracy is: 37.704


In [11]:
# Garbage Collection
del trainProbabilities, testProbabilities
gc.collect()

354

### Feature Transformation

In this example there's no need to define transformations since the dataset is already clean and in a suitable format. Since all the classes in MLlib module require that the feature be a single vector, I will just use the VectorAssembler class to create feature vector for each record by packing all the properties in a single vector. Any other required features should be defined in this step.

In [12]:
# Using VectorAssembler class to generate feature vector
feature_cols = [col for col in data.columns if col!=labelCol]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="featureVector")
trainData = assembler.transform(train)
trainData.show(1, truncate=False, vertical=True)

-RECORD 0---------------------------------------------------------------------------------------------------------------------------------
 Elevation                          | 1863                                                                                                
 Aspect                             | 37                                                                                                  
 Slope                              | 17                                                                                                  
 Horizontal_Distance_To_Hydrology   | 120                                                                                                 
 Vertical_Distance_To_Hydrology     | 18                                                                                                  
 Horizontal_Distance_To_Roadways    | 90                                                                                                  
 Hillshade_9am             

### Decision Tree Model

A decision tree is a simple model that uses human-like thought process in making decisions. It creates a tree like structure in which the internal node specifies the condition that determines the path and the leave nodes determine the label that a record is assigned, if it satisfies all the conditions on the path to that leaf node. The decision tree is constructed in such a way that after each split from an internal node, the training data gets "purer", that is, more homogeneous (majority of the records belong to same class). We use entropy or gini-index measures that help us determine "node purity" to identify the perfect split at each step.

**Advantages:**
* Easy to build
* Easy to interpret
* Robust to outliers

**Disadvantages:**:
* Weak learner
* Prone to overfit

In [13]:
# DecisionTree Classification model
classifier = DecisionTreeClassifier(featuresCol='featureVector', labelCol=labelCol)
model = classifier.fit(trainData)
pp.pprint(model.toDebugString)

('DecisionTreeClassificationModel: uid=DecisionTreeClassifier_9a8cbdd3864a, '
 'depth=5, numNodes=49, numClasses=8, numFeatures=54\n'
 '  If (feature 0 <= 3051.5)\n'
 '   If (feature 0 <= 2572.5)\n'
 '    If (feature 10 <= 0.5)\n'
 '     If (feature 0 <= 2471.5)\n'
 '      If (feature 3 <= 15.0)\n'
 '       Predict: 4.0\n'
 '      Else (feature 3 > 15.0)\n'
 '       Predict: 3.0\n'
 '     Else (feature 0 > 2471.5)\n'
 '      If (feature 17 <= 0.5)\n'
 '       Predict: 2.0\n'
 '      Else (feature 17 > 0.5)\n'
 '       Predict: 3.0\n'
 '    Else (feature 10 > 0.5)\n'
 '     If (feature 9 <= 5080.0)\n'
 '      Predict: 2.0\n'
 '     Else (feature 9 > 5080.0)\n'
 '      If (feature 31 <= 0.5)\n'
 '       Predict: 2.0\n'
 '      Else (feature 31 > 0.5)\n'
 '       Predict: 5.0\n'
 '   Else (feature 0 > 2572.5)\n'
 '    If (feature 0 <= 2961.5)\n'
 '     If (feature 15 <= 0.5)\n'
 '      Predict: 2.0\n'
 '     Else (feature 15 > 0.5)\n'
 '      Predict: 3.0\n'
 '    Else (feature 0 > 2961.5

In [14]:
# Feature Importance
pp.pprint(sorted(list(zip(model.featureImportances.toArray(), feature_cols)), key = lambda x: x[0], reverse = True))

[(0.8130894670615963, 'Elevation'),
 (0.038054619229894907, 'Horizontal_Distance_To_Hydrology'),
 (0.036896773869442706, 'Wilderness_Area_0'),
 (0.02660125506316331, 'Soil_Type_31'),
 (0.026048506327293056, 'Hillshade_Noon'),
 (0.02225856699970976, 'Soil_Type_1'),
 (0.01095773511562616, 'Wilderness_Area_2'),
 (0.010646403214294222, 'Soil_Type_3'),
 (0.006123390468124926, 'Soil_Type_22'),
 (0.004076612931206928, 'Horizontal_Distance_To_Fire_Points'),
 (0.003342119314331825, 'Vertical_Distance_To_Hydrology'),
 (0.001479541173056448, 'Horizontal_Distance_To_Roadways'),
 (0.000425009232259348, 'Soil_Type_17'),
 (0.0, 'Aspect'),
 (0.0, 'Slope'),
 (0.0, 'Hillshade_9am'),
 (0.0, 'Hillshade_3pm'),
 (0.0, 'Wilderness_Area_1'),
 (0.0, 'Wilderness_Area_3'),
 (0.0, 'Soil_Type_0'),
 (0.0, 'Soil_Type_2'),
 (0.0, 'Soil_Type_4'),
 (0.0, 'Soil_Type_5'),
 (0.0, 'Soil_Type_6'),
 (0.0, 'Soil_Type_7'),
 (0.0, 'Soil_Type_8'),
 (0.0, 'Soil_Type_9'),
 (0.0, 'Soil_Type_10'),
 (0.0, 'Soil_Type_11'),
 (0.0, 'Soi

In [15]:
# Prediction on training data
predictions = model.transform(trainData)
predictions.select(labelCol, "rawPrediction", "probability", "prediction").show(1, truncate=False, vertical=True)

-RECORD 0---------------------------------------------------------------------------------------------------------
 Cover_Type    | 6.0                                                                                              
 rawPrediction | [0.0,0.0,1338.0,19433.0,1488.0,0.0,9008.0,0.0]                                                   
 probability   | [0.0,0.0,0.04279272075990661,0.6215178942655195,0.047590110979627086,0.0,0.2880992739949467,0.0] 
 prediction    | 3.0                                                                                              
only showing top 1 row



In [16]:
# Evaluation
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol=labelCol)
# Accuracy
print("Accuracy:", evaluator.setMetricName('accuracy').evaluate(predictions))
# F-1 score
print("f1 score:", evaluator.setMetricName('f1').evaluate(predictions))

Accuracy: 0.7052293365801259
f1 score: 0.687221249734038


### Confusion Matrix

The confusion matrix is a NxN matrix, where N is the number of distinct categories of target variable, and used to test the accuracy of a classifier. The (i,j)th element specifies the number of records that actually belong to class i that were determined to belong to class j by the classifier. The sum of elements on the main diagonal is the number of records correctly classified by the classifier.

In [17]:
# # Confusion Matrix using old MLlib API
# predictionsRDD = predictions.select("prediction", "Cover_Type").rdd
# multiClassMetrics = MulticlassMetrics(predictionsRDD)
# confusionMatrix = pd.DataFrame(multiClassMetrics.confusionMatrix().toArray())
# confusionMatrix.index = range(1,8)
# confusionMatrix.columns = range(1,8)
# confusionMatrix

In [18]:
# Confusion Matrix using pivot
confusionMatrix = predictions.groupBy(labelCol).pivot("prediction", range(1, 8)).count().na.fill(0.0).orderBy(labelCol)
confusionMatrix.show()

+----------+------+------+-----+----+---+---+----+
|Cover_Type|     1|     2|    3|   4|  5|  6|   7|
+----------+------+------+-----+----+---+---+----+
|       1.0|130587| 56517|    0|   2|  0|  0|3471|
|       2.0| 48306|204389| 1886| 131|107|  0| 400|
|       3.0|     0|  7837|23667| 674|  0|  0|   0|
|       4.0|     0|    14| 1488|1011|  0|  0|   0|
|       5.0|     0|  8338|  112|   0|119|  0|   0|
|       6.0|     0|  5552| 9462| 647|  0|  0|   0|
|       7.0|  9200|    75|    0|   0|  0|  0|9191|
+----------+------+------+-----+----+---+---+----+



In [19]:
# Garbage Collection
del assembler, trainData, classifier, model, predictions, evaluator, confusionMatrix
gc.collect()

532

### Pipelining and Grid Search

Here, I am going to define a pipeline that combines the transformation and modelling steps to make the process more robust, reproducible and easier to manage. I will also use a grid search method, spark's MLlib implementation, for optimizing the hyperparameters of the earlier model. The grid search will use cross-validation to determine the best model with optimal hyperparameters.

In [None]:
# VectorAssembler to create feaure vector
feature_cols = [col for col in data.columns if col!=labelCol]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="featureVector")
# RandomForest classifier
classifier = RandomForestClassifier(featuresCol='featureVector', labelCol=labelCol)

In [None]:
# Pipeline
pipeline = Pipeline(stages=[assembler, classifier])

In [None]:
# Hyperparameter optimization
paramGridSearch = ParamGridBuilder().\
                    addGrid(classifier.impurity, ['gini', 'entropy']).\
                    addGrid(classifier.maxDepth, [1, 20]).\
                    addGrid(classifier.maxBins, [40, 300]).\
                    addGrid(classifier.minInfoGain, [0, 0.05]).build()

In [None]:
# Evaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol=labelCol, metricName="accuracy")

In [None]:
# Cross-validation for grid search
crossValidation = TrainValidationSplit(estimator=pipeline, estimatorParamMaps=paramGridSearch, evaluator=evaluator, trainRatio=0.9)

In [None]:
# # Best model from cross-validation
# model = crossValidation.fit(train)

In [None]:
# Garbage Collection
del train, test, assembler, classifier, pipeline, paramGridSearch, evaluator, crossValidation, model
gc.collect()

### Removing One-Hot Encoding

As discussed earlier, two properties, namely, wilderness area and soil type, are one-hot encoded in the original data. While one-hot encoding is a popular choice to represent categorical variables to transform the data suitable for machine learning algorithms, it does take a lot of space since the same data is spread out in multiple columns. In addition to that, there are many machine learning algorithms that can handle categorical features, like Decision Tree, and hence I am going to remove the one-hot encoding from the dataset to decrease its size. For this purpose, I am going to use VectorAssemebler to first collect relevant columns into single vector and then pandas UDF to define a function that removes the one-hot encoding. After the one-hot encoding is removed, we obtain columns that numeric values to indicate categories. It is necessary to add metadata information about these columns to indicate that they are actually categorical. For this purpose an instance of VectorIndexer will be used.

In [20]:
@f.pandas_udf(t.IntegerType())
def removeOHE(se):
    """
    Takes a pandas series such that each element in the series is an iterable and returns the index of largest element in iterable.
    
    Parameter
    ---------
    se: pandas.Series
        Series containing iterables
    
    Returns
    -------
    pandas.Series
        Series containing integers
    """
    return se.map(np.argmax)

In [None]:
# Grouping together columns that are one-hot encoding of the same feature
wildernessCols = ["Wilderness_Area_"+str(i) for i in range(4)]
soilCols = ["Soil_Type_"+str(i) for i in range(40)]
wildernessAssembler = VectorAssembler(inputCols=wildernessCols, outputCol="Wilderness_Area")
soilAssembler = VectorAssembler(inputCols=soilCols, outputCol="Soil_Type")
pipeRemoveOHE = Pipeline(stages=[wildernessAssembler, soilAssembler])
pipeRemoveOHEModel = pipeRemoveOHE.fit(train)

In [23]:
#############################################################################################################
# Note: Create a custom transformer that applies the function on a column
##############################################################################################################

# Transforming the dataset to remove one-hot encoding
data = pipeRemoveOHEModel.transform(data).drop(*(wildernessCols+soilCols)).withColumn("Wilderness_Area", removeOHE("Wilderness_Area")).withColumn("Soil_Type", removeOHE("Soil_Type"))
data.printSchema()

root
 |-- Elevation: integer (nullable = true)
 |-- Aspect: integer (nullable = true)
 |-- Slope: integer (nullable = true)
 |-- Horizontal_Distance_To_Hydrology: integer (nullable = true)
 |-- Vertical_Distance_To_Hydrology: integer (nullable = true)
 |-- Horizontal_Distance_To_Roadways: integer (nullable = true)
 |-- Hillshade_9am: integer (nullable = true)
 |-- Hillshade_Noon: integer (nullable = true)
 |-- Hillshade_3pm: integer (nullable = true)
 |-- Horizontal_Distance_To_Fire_Points: integer (nullable = true)
 |-- Cover_Type: double (nullable = true)
 |-- Wilderness_Area: integer (nullable = true)
 |-- Soil_Type: integer (nullable = true)



In [None]:
# Splitting the Dataset into training and test sets
train, test = data.randomSplit([0.9, 0.1])

In [None]:
# VectorAssembler to create feature vector for each record
feature_cols = [col for col in train.columns if col!=labelCol]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="featureVector")

In [None]:
# VectorIndexer to mark categorical variables
indexer = new VectorIndexer(maxCategories=40, inputCol="featureVector", outputCol="modifiedFeatureVector")

### RandomForest

Random Decision Forest or Random Forest for short are powerful ensemble learning methods for regression, classification and other tasks. Random Forests are built by taking an ensemble of multiple Decision Trees built independently using only a random subset of the dataset, called bagging, with each record in this subset having a random subset of all the features. The randomness and independence in construction of each Decision Tree ensures that the resulting ensemble regresses towards the correct answer, hence more powerful than any of the Decision Trees individually. Bagging also helps reduce overfitting since each tree only sees a portion of the dataset.

In [None]:
# RandomForest Classifier
classifier = RandomForestClassifier(featuresCol='modifiedFeatureVector', labelCol='Cover_Type')

In [None]:
# Cleanup
spark.stop()
del spark