## Initializing required packages and libraries

In [None]:
from pyspark import SparkContext, SparkConf 
from pyspark.sql import SparkSession
from pyspark.sql.functions import round,mean,when, count, col
from pyspark.ml.feature import StringIndexer,VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier,RandomForestClassifier,GBTClassifier,LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import matplotlib.pyplot as plt
import numpy as np

## Step 01 -- Initializing Spark

1) I have added the line SparkContext.getOrCreate() to avoid an error : "Cannot run multiple SparkContexts at once". If there is an existing spark context, we will reuse it instead of creating a new context.

2) local[*]: run Spark locally with as many working processors as logical cores on your machine.

3) The `appName` field is a name to be shown on the Sparking cluster UI. 


In [None]:
sc = SparkContext.getOrCreate()

if (sc is None):
    sc = SparkContext(master="local[*]", appName="FIT5202 Assignment 2")

spark = SparkSession(sparkContext=sc)

## Step 02 -- Load,Read and print

In [None]:
weatherdf = spark.read.csv('weatherAUS.csv',inferSchema=True, header=True)

print("The total number of entries in the dataset is: ",weatherdf.count())

## Step 03 - Delete unimportant columns

In [None]:
columns_to_drop=['Date','Location','Evaporation','Sunshine','Cloud9am','Cloud3pm','Temp9am','Temp3pm']
weatherdf = weatherdf.select([column for column in weatherdf.columns if column not in columns_to_drop])

print("Schema for the dataset")
print("------------------------------------------------------------------------------- ")

weatherdf.printSchema()

## Step 04 - Printing the count of null values(NA) in each column

In [None]:
weatherdf.select([count(when(col(c)=='NA', c)).alias(c) for c in weatherdf.columns]).show()

## Step 05 -- replace NA values in numeric column with mean

Here I've used first()[0] to retrieve the value of mean instead of dataframe. 

In [None]:
weatherdf = weatherdf.withColumn('MinTemp', when(col('MinTemp')=='NA',weatherdf.select(round(mean(col('MinTemp')),2))\
                                        .first()[0]).otherwise(col('MinTemp')))\
        .withColumn('MaxTemp', when(col('MaxTemp')=='NA',weatherdf.select(round(mean(col('MaxTemp')),2))\
                                        .first()[0]).otherwise(col('MaxTemp')))\
        .withColumn('Rainfall', when(col('Rainfall')=='NA',weatherdf.select(round(mean(col('Rainfall')),2))\
                                         .first()[0]).otherwise(col('Rainfall')))\
        .withColumn('WindGustSpeed', when(col('WindGustSpeed')=='NA',weatherdf.select(round(mean(col('WindGustSpeed')),2))\
                                        .first()[0]).otherwise(col('WindGustSpeed')))\
        .withColumn('WindSpeed9am', when(col('WindSpeed9am')=='NA',weatherdf.select(round(mean(col('WindSpeed9am')),2))\
                                         .first()[0]).otherwise(col('WindSpeed9am')))\
        .withColumn('WindSpeed3pm', when(col('WindSpeed3pm')=='NA',weatherdf.select(round(mean(col('WindSpeed3pm')),2))\
                                         .first()[0]).otherwise(col('WindSpeed3pm')))\
        .withColumn('Humidity9am', when(col('Humidity9am')=='NA',weatherdf.select(round(mean(col('Humidity9am')),2))\
                                        .first()[0]).otherwise(col('Humidity9am')))\
        .withColumn('Humidity3pm', when(col('Humidity3pm')=='NA',weatherdf.select(round(mean(col('Humidity3pm')),2))\
                                        .first()[0]).otherwise(col('Humidity3pm')))\
        .withColumn('Pressure9am', when(col('Pressure9am')=='NA',weatherdf.select(round(mean(col('Pressure9am')),2))\
                                        .first()[0]).otherwise(col('Pressure9am')))\
        .withColumn('Pressure3pm', when(col('Pressure3pm')=='NA',weatherdf.select(round(mean(col('Pressure3pm')),2))\
                                        .first()[0]).otherwise(col('Pressure3pm')))

## Step 05 -- Replace NA values in non-numeric column with frequency

To get the maximum occurence, first I applied groupBy() and sorted the count in descending order. 

In [None]:
weatherdf = weatherdf.withColumn('WindGustDir', when(col('WindGustDir')=='NA',weatherdf.select('WindGustDir')\
                    .groupBy('WindGustDir').count().sort('count',ascending=False).first()[0])\
                                 .otherwise(col('WindGustDir')))\
.withColumn('WindDir9am', when(col('WindDir9am')=='NA',weatherdf.select('WindDir9am')\
                    .groupBy('WindDir9am').count().sort('count',ascending=False).first()[0])\
                                .otherwise(col('WindDir9am')))\
.withColumn('WindDir3pm', when(col('WindDir3pm')=='NA',weatherdf.select('WindDir3pm')\
                    .groupBy('WindDir3pm').count().sort('count',ascending=False).first()[0])\
                                .otherwise(col('WindDir3pm')))\
.withColumn('RainToday', when(col('RainToday')=='NA',weatherdf.select('RainToday')\
                    .groupBy('RainToday').count().sort('count',ascending=False).first()[0])\
                                .otherwise(col('RainToday')))\
.withColumn('RainTomorrow', when(col('RainTomorrow')=='NA',weatherdf.select('RainTomorrow')\
                    .groupBy('RainTomorrow').count().sort('count',ascending=False).first()[0])\
                                .otherwise(col('RainTomorrow')))

## Step 06 -- Data Transformation 

Splitting numeric and Non-numeric columns from the dataframe

In [None]:
numericColumns=['MinTemp',
                'MaxTemp',
                'Rainfall',
                'WindGustSpeed',
                'WindSpeed9am',
                'WindSpeed3pm',
                'Humidity9am',
                'Humidity3pm',
                'Pressure9am',
                'Pressure3pm']

nonNumericColumns=['WindGustDir',
                   'WindDir9am',
                   'WindDir3pm',
                   'RainToday',
                   'RainTomorrow']

#Here I am converting numeric columns into double type
for col_name in numericColumns:  
    weatherdf=weatherdf.withColumn(col_name, col(col_name).cast('double'))

## Step 06 -- Data Transformation 

Converting non-numeric columns into numeric using StringIndexer

In [None]:
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index") for column in nonNumericColumns ]
pipeline = Pipeline(stages=indexers)
weatherdf = pipeline.fit(weatherdf).transform(weatherdf)

## Step 06 -- Data Transformation

I'm using VectorAssembler for data transformation

In [None]:
assembler = VectorAssembler(inputCols=["MinTemp","MaxTemp","Rainfall","WindGustSpeed",
                                "WindSpeed9am","WindSpeed3pm","Humidity9am","Humidity3pm","Pressure9am",
                                "Pressure3pm"],
                       outputCol="features")

weatherdf=assembler.transform(weatherdf)

### Since non numeric columns are subsequently converted to numeric using StringIndexer, I'm removing the original non-numeric columns from the dataframe

In [None]:
weatherdf = weatherdf.select([column for column in weatherdf.columns if column not in nonNumericColumns])

## Step 07 - Splitting the data randomly between 70% and 30%

In [None]:
(trainingData, testData) = weatherdf.randomSplit([0.7, 0.3], seed = 100)

## Step 08 - Computing accuracy using machine learning algorithms
### 8.1 Decision Tree Classifier

In [None]:
dt = DecisionTreeClassifier(labelCol="RainTomorrow_index", featuresCol="features")
dtModel = dt.fit(trainingData)

dtPredictions = dtModel.transform(testData)

dtPredictions.select("MinTemp","MaxTemp","Rainfall","WindGustSpeed","WindSpeed9am","WindSpeed3pm",\
                    "Humidity9am","Humidity3pm","Pressure9am","Pressure3pm","WindGustDir_index",\
                    "WindDir9am_index","WindDir3pm_index","RainToday_index","probability","prediction")\
                .orderBy("probability", ascending=False).show(5)

dtEvaluator = MulticlassClassificationEvaluator(labelCol="RainTomorrow_index",\
predictionCol="prediction", metricName="accuracy")

decisionTreeAccuracy = (dtEvaluator.evaluate(dtPredictions))*100


print("Decision Tree Accuracy is: "+" "+"%.2f" % decisionTreeAccuracy)

## Step 08
### 8.2 Random Forest

In [None]:
rf = RandomForestClassifier(labelCol="RainTomorrow_index",\
featuresCol="features", numTrees=10)

rfModel = rf.fit(trainingData)

rfPredictions = rfModel.transform(testData)

rfPredictions.select("MinTemp","MaxTemp","Rainfall","WindGustSpeed","WindSpeed9am","WindSpeed3pm",\
                    "Humidity9am","Humidity3pm","Pressure9am","Pressure3pm","WindGustDir_index",\
                    "WindDir9am_index","WindDir3pm_index","RainToday_index","probability","prediction")\
                .orderBy("probability", ascending=False).show(5)

rfEvaluator = MulticlassClassificationEvaluator(labelCol="RainTomorrow_index",\
predictionCol="prediction", metricName="accuracy")

randomForestAccuracy = (rfEvaluator.evaluate(rfPredictions))*100

print("Random Forest Accuracy is: "+" "+"%.2f" % randomForestAccuracy)

## Step 08

### 8.3 Logistic Regression

In [None]:
lr = LogisticRegression(labelCol="RainTomorrow_index", featuresCol="features", maxIter=10)
lrModel = lr.fit(trainingData)

lrPredictions = lrModel.transform(testData)

lrPredictions.select("MinTemp","MaxTemp","Rainfall","WindGustSpeed","WindSpeed9am","WindSpeed3pm",\
                    "Humidity9am","Humidity3pm","Pressure9am","Pressure3pm","WindGustDir_index",\
                    "WindDir9am_index","WindDir3pm_index","RainToday_index","probability","prediction")\
                .orderBy("probability", ascending=False).show(5)

lrEvaluator = MulticlassClassificationEvaluator(labelCol="RainTomorrow_index",\
predictionCol="prediction", metricName="accuracy")

logisticRegressionAccuracy = (lrEvaluator.evaluate(lrPredictions))*100

print("Logistic Regression Accuracy is: "+" "+"%.2f" % logisticRegressionAccuracy)

## Step 08

### 8.4 GBT

In [None]:
gbt = GBTClassifier(labelCol="RainTomorrow_index", featuresCol="features",maxIter=10)
gbtModel = gbt.fit(trainingData)
gbtPredictions = gbtModel.transform(testData)

gbtPredictions.select("MinTemp","MaxTemp","Rainfall","WindGustSpeed","WindSpeed9am","WindSpeed3pm",\
                    "Humidity9am","Humidity3pm","Pressure9am","Pressure3pm","WindGustDir_index",\
                    "WindDir9am_index","WindDir3pm_index","RainToday_index","probability","prediction")\
                .orderBy("probability", ascending=False).show(5)

gbtEvaluator = MulticlassClassificationEvaluator(labelCol="RainTomorrow_index",\
predictionCol="prediction", metricName="accuracy")

gbtAccuracy = (gbtEvaluator.evaluate(gbtPredictions))*100

print("Gradient Boosting Accuracy is: "+" "+"%.2f" % gbtAccuracy)

## Step 08 - Comparing accuracy using bar graphs

In [None]:
%matplotlib inline
bar_width = 0.5

classificationList=('Decision Tree','Random Forest','Logistic Regression','GBT')
y_pos = np.arange(len(classificationList))
accuracyValues = [decisionTreeAccuracy,randomForestAccuracy,logisticRegressionAccuracy,gbtAccuracy]

plt.subplots(figsize=(10,5))
plt.style.use('ggplot')
plt.bar(y_pos,accuracyValues,bar_width, align='center', color='C0')
plt.xticks(y_pos,classificationList,fontsize=14)
plt.xlabel('Classfication Techniques',fontsize=14)
plt.ylabel('Accuracy(%)',fontsize=14)
plt.title('COMPARISON OF MACHINE LEARNING ALGORITHMS ACCURACY')

## STEP 08 - ANALYSIS

The accuracy graph for machine learning classification algorithms is illustrated above. It is clear that GBT generated high accuracy followed by Random Forest and Decision Tree with marginal differences. Finally, Logistic Regression presume to be less accurate compared to other three algorithms. 

## Step 9 - Calculate confusion matrix

- **True Positive (TP)** : Observation is positive, and is predicted to be positive.
- **False Negative (FN)** : Observation is positive, but is predicted negative.
- **True Negative (TN)** : Observation is negative, and is predicted to be negative.
- **False Positive (FP)** : Observation is negative, but is predicted positive.

In this case, positive is 1.0 and negative is 0.0

In [None]:
def confusionMatrix(predictions):
    
    TN=predictions.filter('prediction = 0.0 AND RainTomorrow_index = 0.0').count()
    FN=predictions.filter('prediction = 0.0 AND RainTomorrow_index = 1.0').count()
    TP=predictions.filter('prediction = 1.0 AND RainTomorrow_index = 1.0').count()
    FP=predictions.filter('prediction = 1.0 AND RainTomorrow_index = 0.0').count()
    
    precision = TP/(TP+FP)
    recall = TP/(TP+FN)
    F1 = 2*((precision*recall)/(precision+recall))
    return('precision = {:.2f}, recall = {:.2f}, f1Score = {:.2f}'.format(precision, recall, F1))

## Confusion Matrix for machine learning classification algorithms

In [None]:
print("Confusion Matrix for Decision Tree are : ",confusionMatrix(dtPredictions))
print("Confusion Matrix for Random Forest are : ",confusionMatrix(rfPredictions))
print("Confusion Matrix for Logistic Regression are : ",confusionMatrix(lrPredictions))
print("Confusion Matrix for Gradient Boosting are : ",confusionMatrix(gbtPredictions))

### How Accuacy can be improved? 

**NOTE: The below mentioned points are based on my understanding.** 

1. If you are using wrong or irrelevant parameter, you hardly get good accuracy. Henceforth, selecting right parametes realted to target variable plays a pivotal role in deciding accuracy. 
2. Cross Validation could help in this regard. It will leave the sample where you do not want to train this model. However it tests the model on these sample before finalizing the model. 
3. Although, there are 140k records in this dataset, predicting a weather is huge task. Additional data could have helped to boost the accuracy. 
4. Randomly splitting test and training data may impact accuracy. So adequate attention has to be given. 