# Assignment 2A

Name: Taaha Waseem <br>
ID: 28888286 <br>
Last Updated: 27/04/2021 <br>

## 1. Data Loading and Exploration

### 1.1 Data Loading

#### 1.1.1 Creating SparkSession

In [1]:
# Import SparkConf class into program
from pyspark import SparkConf
# Import SparkContext and SparkSession classes
from pyspark import SparkContext # Spark
from pyspark.sql import SparkSession # Spark SQL
import matplotlib.pyplot as plt
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
%matplotlib inline
import pandas as pd # Pandas

import datetime

master = "local[*]"
app_name = "Assignment 2A"
# Setup configuration parameters for Spark
spark_conf = SparkConf().setMaster(master).setAppName(app_name)

# Creating SparkSession
spark = SparkSession.builder.config(conf=spark_conf)\
                            .config('spark.sql.session.timeZone', 'Australia/Melbourne')\
                            .getOrCreate()
sc = spark.sparkContext
sc.setLogLevel('ERROR')

ModuleNotFoundError: No module named 'pyspark'

#### 1.1.2 Defining Data Schema

In [2]:
from pyspark.sql.types import StructType, IntegerType, DateType, StringType, TimestampType, ArrayType, BooleanType
from pyspark.sql.types import StructField
import datetime
#Reference: https://stackoverflow.com/questions/48698062/set-schema-in-pyspark-dataframe-read-csv-with-null-elements


# Defining schema for pedestrian count
pedSchema = StructType([
    StructField("ID", IntegerType()),
    StructField("Date_Time", StringType(), True),
    StructField("Year", IntegerType()),
    StructField("Month", StringType()),
    StructField("Mdate", IntegerType()),
    StructField("Day", StringType()),
    StructField("Time", IntegerType()),
    StructField("Sensor_ID", IntegerType()),
    StructField("Sensor_Name", StringType()),
    StructField("Hourly_Counts", IntegerType())
])

#### 1.1.3 Loading the pedestrian count csv

In [3]:
from pyspark.sql import functions as F
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")

# Loading the pedestrian count csv using the schema defined earlier
ped_df = spark.read.format('csv').\
    option('header', 'true').\
    schema(pedSchema).\
    load('Pedestrian_Counting_System_-_Monthly__counts_per_hour.csv')

# Creating function to transform Date_Time column to TimeStampType
func = F.udf(lambda x: datetime.datetime.strptime(x, '%m/%d/%Y %I:%M:%S %p'), TimestampType())
# The function is implemented on the Date_Time column
ped_df = ped_df.withColumn('Date_Time', func(F.col('Date_Time')))

ped_df.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Date_Time: timestamp (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Month: string (nullable = true)
 |-- Mdate: integer (nullable = true)
 |-- Day: string (nullable = true)
 |-- Time: integer (nullable = true)
 |-- Sensor_ID: integer (nullable = true)
 |-- Sensor_Name: string (nullable = true)
 |-- Hourly_Counts: integer (nullable = true)



#### 1.1.4 Adding above_threshold column

In [4]:
# Function that returns 1 if hourly counts > 2000, else 0
func = F.udf(lambda x: 1 if x >= 2000 else 0, IntegerType())

# Applying the function on the dataframe to add another column
ped_df = ped_df.withColumn('above_threshold', func(F.col('Hourly_Counts')))

# Displaying first 5 records
ped_df.show(5, truncate=False)

+-------+-------------------+----+--------+-----+------+----+---------+----------------------------+-------------+---------------+
|ID     |Date_Time          |Year|Month   |Mdate|Day   |Time|Sensor_ID|Sensor_Name                 |Hourly_Counts|above_threshold|
+-------+-------------------+----+--------+-----+------+----+---------+----------------------------+-------------+---------------+
|2887628|2019-11-01 17:00:00|2019|November|1    |Friday|17  |34       |Flinders St-Spark La        |300          |0              |
|2887629|2019-11-01 17:00:00|2019|November|1    |Friday|17  |39       |Alfred Place                |604          |0              |
|2887630|2019-11-01 17:00:00|2019|November|1    |Friday|17  |37       |Lygon St (East)             |216          |0              |
|2887631|2019-11-01 17:00:00|2019|November|1    |Friday|17  |40       |Lonsdale St-Spring St (West)|627          |0              |
|2887632|2019-11-01 17:00:00|2019|November|1    |Friday|17  |36       |Queen St (We

### 1.2 Exploring the Data

#### 1.2.1 Basic Stastics

In [5]:
# Displaying basic summary and statistics
ped_df.select('ID', 'Year', 'Mdate', 'Time', 'Sensor_ID', 'Hourly_Counts').summary().show()

+-------+-----------------+------------------+------------------+------------------+------------------+-----------------+
|summary|               ID|              Year|             Mdate|              Time|         Sensor_ID|    Hourly_Counts|
+-------+-----------------+------------------+------------------+------------------+------------------+-----------------+
|  count|          3435106|           3435106|           3435106|           3435106|           3435106|          3435106|
|   mean|        1717553.5|2016.0032330880038|15.751918863639142|11.459955238644746|22.978422791028866|560.7805942524044|
| stddev|991629.8312350252|3.1237869143646275|  8.79918757461428| 6.943473866829414|16.229792156265397|809.9942576353371|
|    min|                1|              2009|                 1|                 0|                 1|                0|
|    25%|           858830|              2014|                 8|                 5|                 9|               50|
|    50%|          17176

#### 1.2.2 Count of above-threshold and below-threshold

In [6]:
# Counting rows above threshold
aboveCount = ped_df.filter(F.col('above_threshold') == 1).count()
# Counting rows below threshold
belowCount = ped_df.filter(F.col('above_threshold') == 0).count()

# Printing count of rows above threshold
print("Total number of above threshold records: " + str(aboveCount))
# Printing count of rows below threshold
print("Total number of below threshold records: " + str(belowCount))

Total number of above threshold records: 250942
Total number of below threshold records: 3184164


There is a class imbalence whereby the total number of records with hourly_counts below the threshold is significantly larger than the total number of records with hourly_counts above the threshold. This would affect the classification model in a negative manner whereby the class above threshold might have a lower predictive accuracy.

## 2. Feature Extraction and ML training

### 2.1 Preparing Spark ML Transformers/Estimators for features, labels and models

#### 2.1.1 Prepare feature columns

In [7]:
# Month in Integer
func = F.udf(lambda x: x.month, IntegerType())
ped_df = ped_df.withColumn('Month (in integer)', func(F.col('Date_Time')))

# Day of Week
func = F.udf(lambda x: x.isocalendar()[2], IntegerType())
ped_df = ped_df.withColumn('day_of_week', func(F.col('Date_Time')))

# Week of Year
func = F.udf(lambda x: x.isocalendar()[1], IntegerType())
ped_df = ped_df.withColumn('week_of_year', func(F.col('Date_Time')))

#Filter 2014 - 2019
ped_df = ped_df.filter((F.col('Year') >= 2014) & (F.col('Year') <= 2019))

#Filter 9pm to 11pm
ped_df = ped_df.filter((F.col('Time') >= 21) & (F.col('Time') <= 23))

#### 2.1.2 Creating Transformers/Estimators

In [8]:
# Defining input and output columns
inputCols = ['Month (in integer)', 'Mdate', 'week_of_year', 'day_of_week', 'Time', 'Sensor_ID']
outputCol = 'above_threshold'

In [9]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder

# Defining outputCols for One Hot Encoder
outputCols_OHE = [f'{x}_vec' for x in inputCols]

# Declaring OneHotEncoder
encoder = OneHotEncoder(inputCols=inputCols,
                       outputCols=outputCols_OHE).setHandleInvalid('keep')

# Declaring Vector Assembler
assembler = VectorAssembler(inputCols=outputCols_OHE,
                           outputCol='features')

In [10]:
from pyspark.ml.classification import LogisticRegression

# Declaring Logistic Regression
lr = LogisticRegression(featuresCol='features',labelCol='above_threshold')

In [11]:
from pyspark.ml.classification import DecisionTreeClassifier

# Declaring Decision Tree Classifier
dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'above_threshold', maxDepth = 3)

In [12]:
from pyspark.ml.classification import RandomForestClassifier

# Declaring Random Forest Classifier
rf = RandomForestClassifier(labelCol='above_threshold', featuresCol='features', numTrees=10)

#### 2.1.3 Creating Pipeline

In [13]:
from pyspark.ml import Pipeline

# Creating logistic regression pipeline
pipelineLR = Pipeline(stages=[encoder, assembler, lr])

# Creating decision tree pipeline
pipelineDT = Pipeline(stages=[encoder, assembler, dt])

# Creating random forest pipeline
pipelineRF = Pipeline(stages=[encoder, assembler, rf])

### 2.2 Preparing the training and testing data

In [14]:
# Creating training data (from year 2014 till 2018)
train = ped_df.filter((F.col('Year') >= 2014) & (F.col('Year') <= 2018))

# Creating testing data (year  2019)
test = ped_df.filter(F.col('Year') == 2019)

### 2.3 Training and evaluating models

#### 2.3.1 Training and Predicting using Pipelines

In [15]:
# Training logistic regression
modelLR = pipelineLR.fit(train)

# Making predictions on test data
predictionsLR = modelLR.transform(test)

In [16]:
# Training decision tree
modelDT = pipelineDT.fit(train)

# Making predictions on test data
predictionsDT = modelDT.transform(test)

In [17]:
# Training random forest
modelRF = pipelineRF.fit(train)

# Making predictions on test data
predictionsRF = modelRF.transform(test)

#### 2.3.2 Measuring classification performance

In [18]:
# Defining function to return metrics (Precision, Accuracy and Recall)
# Also returns output dataframe/confusion matrix
def print_metrics(predictions):
    output = predictions.groupBy("above_threshold", "prediction").count()
    TN = 0 if output.filter('prediction = 0 AND above_threshold = 0').count() == 0\
        else output.filter('prediction = 0 AND above_threshold=0').first()['count']
    TP = 0 if output.filter('prediction = 1 AND above_threshold = 1').count() == 0\
        else output.filter('prediction = 1 AND above_threshold=1').first()['count']
    FN = 0 if output.filter('prediction = 0 AND above_threshold = 1').count() == 0\
        else output.filter('prediction = 0 AND above_threshold=1').first()['count']
    FP = 0 if output.filter('prediction = 1 AND above_threshold = 0').count() == 0\
        else output.filter('prediction = 1 AND above_threshold=0').first()['count']
    
    # If TN, TP, FN or FP is equal to 0, add a row to the final output that shows the particular row as well
    columns = ['above_threshold', 'prediction', 'count']
    if TN == 0:
        newRow = spark.createDataFrame([[0, 0, 0]])
        output = output.union(newRow)
    if TP == 0:
        newRow = spark.createDataFrame([[1, 1, 0]])
        output = output.union(newRow)
    if FN == 0:
        newRow = spark.createDataFrame([[0, 1, 0]])
        output = output.union(newRow)
    if FP == 0:
        newRow = spark.createDataFrame([[1, 0, 0]])
        output = output.union(newRow)
    
    # Creating accuracy evaluator
    eval_accuracy = MulticlassClassificationEvaluator(labelCol="above_threshold",\
                                                      predictionCol="prediction", metricName="accuracy",\
                                                      metricLabel=1)
    
    # Creating precision evaluator
    eval_precision = MulticlassClassificationEvaluator(labelCol="above_threshold",\
                                                      predictionCol="prediction", metricName="precisionByLabel",\
                                                      metricLabel=1)
    
    # Creating recall evaluator
    eval_recall = MulticlassClassificationEvaluator(labelCol="above_threshold",\
                                                      predictionCol="prediction", metricName="recallByLabel",\
                                                       metricLabel=1)
    
    # Getting accuracy, precision and recall using the evaluators
    accuracy = eval_accuracy.evaluate(predictions)
    precision = eval_precision.evaluate(predictions)
    recall = eval_recall.evaluate(predictions)
    
    # Returing the results
    return output, accuracy, precision, recall

In [19]:
# Printing metrics for logistic regression
output, accuracy, precision, recall = print_metrics(predictionsLR)

print("Logistic Regression:")
output.show()
print("Accuracy: " + str(accuracy))
print("Precision: " + str(precision))
print("Recall: " + str(recall))

Logistic Regression:
+---------------+----------+-----+
|above_threshold|prediction|count|
+---------------+----------+-----+
|              1|       0.0|  696|
|              0|       0.0|55910|
|              1|       1.0|  240|
|              0|       1.0|  258|
+---------------+----------+-----+

Accuracy: 0.983293639674979
Precision: 0.4819277108433735
Recall: 0.2564102564102564


In [20]:
# Printing metrics for decision tree
output, accuracy, precision, recall = print_metrics(predictionsDT)

print("Decision Tree:")
output.show()
print("Accuracy: " + str(accuracy))
print("Precision: " + str(precision))
print("Recall: " + str(recall))

Decision Tree:
+---------------+----------+-----+
|above_threshold|prediction|count|
+---------------+----------+-----+
|              1|       0.0|  890|
|              0|       0.0|56058|
|              1|       1.0|   46|
|              0|       1.0|  110|
+---------------+----------+-----+

Accuracy: 0.9824880919024936
Precision: 0.2948717948717949
Recall: 0.049145299145299144


In [21]:
# Printing metrics for random forest
output, accuracy, precision, recall = print_metrics(predictionsRF)

print("Random Forest:")
output.show()
print("Accuracy: " + str(accuracy))
print("Precision: " + str(precision))
print("Recall: " + str(recall))

Random Forest:
+---------------+----------+-----+
|above_threshold|prediction|count|
+---------------+----------+-----+
|              1|       0.0|  936|
|              0|       0.0|56168|
|              1|       1.0|    0|
|              1|       0.0|    0|
+---------------+----------+-----+

Accuracy: 0.9836088540207341
Precision: 0.0
Recall: 0.0


When it comes to accuracy, <b>Random Forest</b> was the most accurate model, followed by <b>Logistic Regression</b> and <b>Decision Tree</b>. However, the three models have very similar accuracy with very minute difference and hence we move on to the other metrics. When it comes to precision and recall, Logistic Regression is the best model followed by Decision Tree and Random Forest. Hence, the ranking would be as follows:
    
1. Logistic Regression
2. Decision Tree
3. Random Forest

Hence, Logistic Regression will be persisted.

In [23]:
# Saving Logistic Regression pipeline
modelLR.save('best_above_threshold_pedestrian_count_model')

#### 2.3.3 Visualising Decision Tree

In [24]:
# Visualisng the decision tree decision making at the node level
model_dt = modelDT.stages[-1]
print(model_dt.toDebugString)

DecisionTreeClassificationModel: uid=DecisionTreeClassifier_fad81dbdaccd, depth=3, numNodes=13, numClasses=2, numFeatures=190
  If (feature 169 in {1.0})
   If (feature 104 in {1.0})
    If (feature 98 in {1.0})
     Predict: 0.0
    Else (feature 98 not in {1.0})
     Predict: 1.0
   Else (feature 104 not in {1.0})
    If (feature 105 in {1.0})
     Predict: 1.0
    Else (feature 105 not in {1.0})
     Predict: 0.0
  Else (feature 169 not in {1.0})
   If (feature 166 in {1.0})
    If (feature 105 in {1.0})
     Predict: 1.0
    Else (feature 105 not in {1.0})
     Predict: 0.0
   Else (feature 166 not in {1.0})
    Predict: 0.0



In [25]:
#Reference: https://www.timlrx.com/blog/feature-selection-using-feature-importance-score-creating-a-pyspark-estimator

# Function to extract features, correspond them to the column and sorting them according to score
def extract_features(featureImportance, df, featuresCol):
    extract = []
    for i in df.schema[featuresCol].metadata["ml_attr"]["attrs"]:
        extract = extract + df.schema[featuresCol].metadata["ml_attr"]["attrs"][i]
    variableList = pd.DataFrame(extract)
    variableList['score'] = variableList['idx'].apply(lambda x: featureImportance[x])
    return(variableList.sort_values('score', ascending = False))


In [26]:
# Printing top 3 features according to the score
extract_features(model_dt.featureImportances, predictionsDT, "features").head(3)

Unnamed: 0,idx,name,score
169,169,Sensor_ID_vec_38,0.49963
105,105,day_of_week_vec_6,0.23191
166,166,Sensor_ID_vec_35,0.152003
