In [121]:
# Author: Sumantra Patnaik

# Multi-Class classification using Apache Spark MLLib

# Dataset : Crowdsourced data from OpenStreetMap - used to automate the classification of satellite images 
# into different land cover classes (impervious, farm, forest, grass, orchard, water)

# Algorithms Used: Multinomial Logistic Regression, Decision Trees, Random Forest

import warnings
warnings.filterwarnings('ignore')
%matplotlib inline
import findspark
findspark.init()

In [122]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('OpenStreetMap_Classification').getOrCreate()

In [123]:
train_df = spark.read.csv("OpenStreetMap_Data/training.csv", header=True, inferSchema=True)
test_df = spark.read.csv("OpenStreetMap_Data/testing.csv", header=True, inferSchema=True)
comb_df = train_df.union(test_df)

In [124]:
train_df.printSchema()

root
 |-- class: string (nullable = true)
 |-- max_ndvi: double (nullable = true)
 |-- 20150720_N: double (nullable = true)
 |-- 20150602_N: double (nullable = true)
 |-- 20150517_N: double (nullable = true)
 |-- 20150501_N: double (nullable = true)
 |-- 20150415_N: double (nullable = true)
 |-- 20150330_N: double (nullable = true)
 |-- 20150314_N: double (nullable = true)
 |-- 20150226_N: double (nullable = true)
 |-- 20150210_N: double (nullable = true)
 |-- 20150125_N: double (nullable = true)
 |-- 20150109_N: double (nullable = true)
 |-- 20141117_N: double (nullable = true)
 |-- 20141101_N: double (nullable = true)
 |-- 20141016_N: double (nullable = true)
 |-- 20140930_N: double (nullable = true)
 |-- 20140813_N: double (nullable = true)
 |-- 20140626_N: double (nullable = true)
 |-- 20140610_N: double (nullable = true)
 |-- 20140525_N: double (nullable = true)
 |-- 20140509_N: double (nullable = true)
 |-- 20140423_N: double (nullable = true)
 |-- 20140407_N: double (nullable = 

In [78]:
comb_df.columns

['class',
 'max_ndvi',
 '20150720_N',
 '20150602_N',
 '20150517_N',
 '20150501_N',
 '20150415_N',
 '20150330_N',
 '20150314_N',
 '20150226_N',
 '20150210_N',
 '20150125_N',
 '20150109_N',
 '20141117_N',
 '20141101_N',
 '20141016_N',
 '20140930_N',
 '20140813_N',
 '20140626_N',
 '20140610_N',
 '20140525_N',
 '20140509_N',
 '20140423_N',
 '20140407_N',
 '20140322_N',
 '20140218_N',
 '20140202_N',
 '20140117_N',
 '20140101_N']

In [79]:
#Target classes to classify
comb_df.select('class').distinct().show()

+----------+
|     class|
+----------+
|      farm|
|     grass|
|     water|
|impervious|
|    forest|
|   orchard|
+----------+



In [86]:
my_cols = comb_df.select(['max_ndvi','20150720_N','20150602_N','20150517_N','20150501_N','20150415_N','20150330_N','20150314_N',
                          '20150226_N','20150210_N','20150125_N','20150109_N','20141117_N','20141101_N','20141016_N',
                           '20140930_N','20140813_N','20140626_N','20140610_N','20140525_N','20140509_N','20140423_N',
                           '20140407_N','20140322_N','20140218_N','20140202_N','20140117_N','20140101_N'])
feature_cols = ['max_ndvi','20150720_N','20150602_N','20150517_N','20150501_N','20150415_N','20150330_N','20150314_N',
                          '20150226_N','20150210_N','20150125_N','20150109_N','20141117_N','20141101_N','20141016_N',
                           '20140930_N','20140813_N','20140626_N','20140610_N','20140525_N','20140509_N','20140423_N',
                           '20140407_N','20140322_N','20140218_N','20140202_N','20140117_N','20140101_N']

In [81]:
#Find columns havin missing values in them
for column in comb_df.columns:
    print (column,comb_df.filter(comb_df[column].isNull()).count())

class 0
max_ndvi 0
20150720_N 0
20150602_N 0
20150517_N 0
20150501_N 0
20150415_N 0
20150330_N 0
20150314_N 0
20150226_N 0
20150210_N 0
20150125_N 0
20150109_N 0
20141117_N 0
20141101_N 0
20141016_N 0
20140930_N 0
20140813_N 0
20140626_N 0
20140610_N 0
20140525_N 0
20140509_N 0
20140423_N 0
20140407_N 0
20140322_N 0
20140218_N 0
20140202_N 0
20140117_N 0
20140101_N 0


In [82]:
from pyspark.ml.feature import (VectorAssembler,VectorIndexer,StringIndexer,OneHotEncoder)

In [83]:
class_indexer = StringIndexer(inputCol = 'class',outputCol = 'TargetClass')
model = class_indexer.fit(comb_df)
comb_df_1 = model.transform(comb_df)

In [84]:
comb_df_1.printSchema()

root
 |-- class: string (nullable = true)
 |-- max_ndvi: double (nullable = true)
 |-- 20150720_N: double (nullable = true)
 |-- 20150602_N: double (nullable = true)
 |-- 20150517_N: double (nullable = true)
 |-- 20150501_N: double (nullable = true)
 |-- 20150415_N: double (nullable = true)
 |-- 20150330_N: double (nullable = true)
 |-- 20150314_N: double (nullable = true)
 |-- 20150226_N: double (nullable = true)
 |-- 20150210_N: double (nullable = true)
 |-- 20150125_N: double (nullable = true)
 |-- 20150109_N: double (nullable = true)
 |-- 20141117_N: double (nullable = true)
 |-- 20141101_N: double (nullable = true)
 |-- 20141016_N: double (nullable = true)
 |-- 20140930_N: double (nullable = true)
 |-- 20140813_N: double (nullable = true)
 |-- 20140626_N: double (nullable = true)
 |-- 20140610_N: double (nullable = true)
 |-- 20140525_N: double (nullable = true)
 |-- 20140509_N: double (nullable = true)
 |-- 20140423_N: double (nullable = true)
 |-- 20140407_N: double (nullable = 

In [87]:
assembler = VectorAssembler(inputCols = feature_cols,outputCol = 'features')

In [88]:
output = assembler.transform(comb_df_1)

In [89]:
output.head(1)

[Row(class='water', max_ndvi=997.904, 20150720_N=637.595, 20150602_N=658.668, 20150517_N=-1882.03, 20150501_N=-1924.36, 20150415_N=997.904, 20150330_N=-1739.99, 20150314_N=630.087, 20150226_N=-1628.24, 20150210_N=-1325.64, 20150125_N=-944.084, 20150109_N=277.107, 20141117_N=-206.799, 20141101_N=536.441, 20141016_N=749.348, 20140930_N=-482.993, 20140813_N=492.001, 20140626_N=655.77, 20140610_N=-921.193, 20140525_N=-1043.16, 20140509_N=-1942.49, 20140423_N=267.138, 20140407_N=366.608, 20140322_N=452.238, 20140218_N=211.328, 20140202_N=-2203.02, 20140117_N=-1180.19, 20140101_N=433.906, TargetClass=4.0, features=DenseVector([997.904, 637.595, 658.668, -1882.03, -1924.36, 997.904, -1739.99, 630.087, -1628.24, -1325.64, -944.084, 277.107, -206.799, 536.441, 749.348, -482.993, 492.001, 655.77, -921.193, -1043.16, -1942.49, 267.138, 366.608, 452.238, 211.328, -2203.02, -1180.19, 433.906]))]

In [90]:
final_data = output.select(['features','TargetClass'])

In [91]:
final_data.filter(final_data['TargetClass'] == 2.0).show()

+--------------------+-----------+
|            features|TargetClass|
+--------------------+-----------+
|[1522.31,856.499,...|        2.0|
|[4288.19,2173.48,...|        2.0|
|[955.877,292.614,...|        2.0|
|[1165.82,867.414,...|        2.0|
|[872.409,174.889,...|        2.0|
|[3385.67,3058.11,...|        2.0|
|[4202.04,3979.8,2...|        2.0|
|[4809.47,4572.18,...|        2.0|
|[5686.7,4559.86,5...|        2.0|
|[4186.02,3117.1,3...|        2.0|
|[1141.28,897.368,...|        2.0|
|[1556.6,1253.12,1...|        2.0|
|[1664.56,1559.09,...|        2.0|
|[6522.16,4189.42,...|        2.0|
|[2763.32,2232.54,...|        2.0|
|[1156.2,468.962,4...|        2.0|
|[1414.92,507.251,...|        2.0|
|[1416.28,802.106,...|        2.0|
|[4778.76,4778.76,...|        2.0|
|[5316.17,4698.36,...|        2.0|
+--------------------+-----------+
only showing top 20 rows



In [92]:
train_data, test_data = final_data.randomSplit([0.7,0.3])

In [93]:
from pyspark.ml.classification import LogisticRegression
log_reg_OpenStreetMap = LogisticRegression(featuresCol='features', labelCol='TargetClass', 
                                           maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model_OpenStreetMap = log_reg_OpenStreetMap.fit(train_data)

In [94]:
results = lr_model_OpenStreetMap.transform(test_data)

In [95]:
results.select(['prediction','TargetClass']).show()

+----------+-----------+
|prediction|TargetClass|
+----------+-----------+
|       0.0|        2.0|
|       0.0|        2.0|
|       0.0|        2.0|
|       0.0|        2.0|
|       0.0|        2.0|
|       0.0|        4.0|
|       0.0|        2.0|
|       0.0|        2.0|
|       0.0|        2.0|
|       0.0|        2.0|
|       0.0|        2.0|
|       0.0|        2.0|
|       0.0|        2.0|
|       0.0|        2.0|
|       0.0|        2.0|
|       0.0|        2.0|
|       0.0|        2.0|
|       0.0|        2.0|
|       0.0|        2.0|
|       0.0|        2.0|
+----------+-----------+
only showing top 20 rows



In [97]:
#Evaluate the Logistic Regression classification model
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator_accuracy = MulticlassClassificationEvaluator(
    labelCol="TargetClass", predictionCol="prediction", metricName="accuracy")
lr_accuracy = evaluator.evaluate(results)
print("Test Error is : %g " % (1.0 - lr_accuracy))

Test Error is : 0.307313 


In [98]:
#F1 Score
evaluator_F1= MulticlassClassificationEvaluator(
    labelCol="TargetClass", predictionCol="prediction", metricName="f1")
lr_f1_score = dt_evaluator_F1.evaluate(results)
print("Test F1 for Logistic Regression Classifier is : %g " % lr_f1_score)

Test F1 for Logistic Regression Classifier is : 0.566928 


In [99]:
#weightedPrecision
evaluator_Precision = MulticlassClassificationEvaluator(
    labelCol="TargetClass", predictionCol="prediction", metricName="weightedPrecision")
lr_precision_score = evaluator_Precision.evaluate(results)
print("Test Precison for Logistic Regression Classifier is : %g " % lr_precision_score)

Test Precison for Logistic Regression Classifier is : 0.479816 


In [100]:
#weightedRecall
evaluator_Recall = MulticlassClassificationEvaluator(
    labelCol="TargetClass", predictionCol="prediction", metricName="weightedRecall")
lr_recall_score = dt_evaluator_Recall.evaluate(results)
print("Test Recall for Logistic Regression Classifier is : %g " % lr_recall_score)

Test Recall for Logistic Regression Classifier is : 0.692687 


In [101]:
#Decision-Tree classifier
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(labelCol="TargetClass", featuresCol="features")
dt_model = dt.fit(train_data)

In [102]:
dt_predictions = dt_model.transform(test_data)

In [103]:
dt_predictions.select("prediction", "TargetClass", "features").show(5)

+----------+-----------+--------------------+
|prediction|TargetClass|            features|
+----------+-----------+--------------------+
|       2.0|        2.0|[563.444,286.846,...|
|       2.0|        2.0|[913.563,776.078,...|
|       2.0|        2.0|[959.739,553.443,...|
|       2.0|        2.0|[974.948,726.428,...|
|       2.0|        2.0|[992.794,481.385,...|
+----------+-----------+--------------------+
only showing top 5 rows



In [104]:
#Evauate the Decision Tree classifier model
#Accuracy
dt_accuracy = evaluator_accuracy.evaluate(dt_predictions)
print("Test Accuracy for Decision Tree Classifier is : %g " % dt_accuracy)
print("Test Error for Decision Tree Classifier is : %g " % (1.0 - dt_accuracy))

Test Accuracy for Decision Tree Classifier is : 0.845727 
Test Error for Decision Tree Classifier is : 0.154273 


In [105]:
#F1 Score
dt_f1_score = evaluator_F1.evaluate(dt_predictions)
print("Test F1 for Decision Tree Classifier is : %g " % dt_f1_score)

Test F1 for Decision Tree Classifier is : 0.836315 


In [107]:
#Weighted Precision
dt_precision_score = evaluator_Precision.evaluate(dt_predictions)
print("Test Precision for Decision Tree Classifier is : %g " % dt_precision_score)

Test Precision for Decision Tree Classifier is : 0.836233 


In [109]:
#weighted Recall
dt_recall_score = evaluator_Recall.evaluate(dt_predictions)
print("Test Recall for Decision Tree Classifier is : %g " % dt_recall_score)

Test Recall for Decision Tree Classifier is : 0.845727 


In [110]:
#Random_Forest Classifier
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol="TargetClass", featuresCol="features", numTrees=10)
rf_model = rf.fit(train_data)

In [111]:
#Predict based on the test data
rf_predictions = rf_model.transform(test_data)

In [112]:
#Evauate the Random Forest classifier model
#Accuracy

rf_accuracy = evaluator_accuracy.evaluate(rf_predictions)
print("Test Accuracy for Random Forest Classifier is : %g " % rf_accuracy)
print("Test Error for Random Forest Classifier is : %g " % (1.0 - rf_accuracy))

Test Accuracy for Random Forest Classifier is : 0.851898 
Test Error for Random Forest Classifier is : 0.148102 


In [113]:
rf_f1_score = evaluator_F1.evaluate(rf_predictions)
print("Test F1 for Random Forest Classifier is : %g " % rf_f1_score)

Test F1 for Random Forest Classifier is : 0.821483 


In [114]:
rf_precision_score = evaluator_Precision.evaluate(rf_predictions)
print("Test Precison for Random Forest Classifier is : %g " % rf_precision_score)

Test Precison for Random Forest Classifier is : 0.84535 


In [115]:
rf_recall_score = evaluator_Recall.evaluate(rf_predictions)
print("Test Recall  for Random Forest Classifier is : %g " % rf_recall_score)

Test Recall  for Random Forest Classifier is : 0.851898 
