# A. Creating Spark Session and Loading the Data


# **step 01: Import Spark Session and initialize Spark

In [1]:
# create entry points to spark
from pyspark import SparkContext # Spark
from pyspark.sql import SparkSession # Spark SQL

# We add this line to avoid an error : "Cannot run multiple SparkContexts at once". 
# If there is an existing spark context, we will reuse it instead of creating a new context.
sc = SparkContext.getOrCreate()

# local[*]: run Spark locally with as many working processors as logical cores on your machine.
# In the field of `master`, we use a local server with as many working processors (or threads) as possible (i.e. `local[*]`). 
# If we want Spark to run locally with 'k' worker threads, we can specify as `local[k]`.
# The `appName` field is a name to be shown on the Sparking cluster UI. 

# If there is no existing spark context, we now create a new context
if (sc is None):
    sc = SparkContext(master="local[4]", appName="assignment2")
    # Here we create a SparkSession with  local cores 
spark = SparkSession(sparkContext=sc)

# **Step 02: Load the dataset and print the schema and total number of entries


In [2]:
# load the csv file into a dataframe using spark session
df = spark.read.csv('weatherAUS.csv',header = True,inferSchema=True)
# show the count of entries
print("There are ",df.count()," entries")


There are  142193  entries


# B. Data Cleaning and Processing

# Step 03: Delete columns from the dataset

In [3]:
#create a list containing columns which are not contributing to the rain prediction 
drop_list = ['Date', "Location", 'Evaporation', 'Sunshine',"Cloud9am","Cloud3pm","Temp9am","Temp3pm"]
# renew dataframe by select columns not in the drop_list
df = df.select([column for column in df.columns if column not in drop_list])
# to check whether makes it or not
df.show(0,truncate=False)


+-------+-------+--------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+-----------+-----------+---------+------------+
|MinTemp|MaxTemp|Rainfall|WindGustDir|WindGustSpeed|WindDir9am|WindDir3pm|WindSpeed9am|WindSpeed3pm|Humidity9am|Humidity3pm|Pressure9am|Pressure3pm|RainToday|RainTomorrow|
+-------+-------+--------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+-----------+-----------+---------+------------+
+-------+-------+--------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+-----------+-----------+---------+------------+
only showing top 0 rows



# Step 04: Print the number of missing data in each column.

In [4]:
import pyspark.sql.functions as func
#counting the number of rows when value is "NA" for each columns done by a for loop
count_Null = df.select([func.count(func.when(func.col(c)=="NA",c)).alias(c) for c in df.columns])
count_Null.show()

+-------+-------+--------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+-----------+-----------+---------+------------+
|MinTemp|MaxTemp|Rainfall|WindGustDir|WindGustSpeed|WindDir9am|WindDir3pm|WindSpeed9am|WindSpeed3pm|Humidity9am|Humidity3pm|Pressure9am|Pressure3pm|RainToday|RainTomorrow|
+-------+-------+--------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+-----------+-----------+---------+------------+
|    637|    322|    1406|       9330|         9270|     10013|      3778|        1348|        2630|       1774|       3610|      14014|      13981|     1406|           0|
+-------+-------+--------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+-----------+-----------+---------+------------+



# Step 05: Fill the missing data with average value and maximum occurrence value

In [5]:
# build a list of Non-numeric columns
Non_numeric_col = ["WindGustDir","WindDir9am","WindDir3pm","RainToday","RainTomorrow"]
# find mean value for each numeric columns
mean_value_df = df.select([func.mean(func.col(c)).alias(c) for c in df.columns if c not in Non_numeric_col])
meanv = mean_value_df.collect()
# find the most frequent item for each columns
maxi = [df.filter(df[c] != "NA").groupBy(c).count().\
        sort("count",ascending = False).collect()[0][0] for c in Non_numeric_col]

# fill in all the missing data with average value (for numeric
# column) or maximum frequency value (for non-numeric column).
i = 0
j = 0
for c in df.columns :
    if c not in Non_numeric_col:         
        df = df.withColumn(c,func.when(func.col(c)=="NA",meanv[0][i]).otherwise(func.col(c)))
        i += 1
        
    else:
        df = df.replace("NA",maxi[j],c)
        j += 1   
#To check out the mean value and maximum frequency value have been found properly
mean_value_df.show()
print(maxi)

+------------------+----------------+------------------+-----------------+---------------+-----------------+----------------+------------------+------------------+------------------+
|           MinTemp|         MaxTemp|          Rainfall|    WindGustSpeed|   WindSpeed9am|     WindSpeed3pm|     Humidity9am|       Humidity3pm|       Pressure9am|       Pressure3pm|
+------------------+----------------+------------------+-----------------+---------------+-----------------+----------------+------------------+------------------+------------------+
|12.186399728729311|23.2267841912725|2.3499740743107442|39.98429165757619|14.001988000994|18.63757586179718|68.8438103105705|51.482606091656265|1017.6537584159615|1015.2582035378894|
+------------------+----------------+------------------+-----------------+---------------+-----------------+----------------+------------------+------------------+------------------+

['W', 'N', 'SE', 'No', 'No']


# Step 06: Data transformation

In [6]:
from pyspark.sql.types import DoubleType
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

alist = []
for c in df.columns :
    if c not in Non_numeric_col: 
        #transforming numerical data into double type
        df = df.withColumn(c,df[c].cast(DoubleType()))
    else:
        #transforming non-numerical data into number by using StringIndexer
        alist.append(StringIndexer(inputCol = c, outputCol = c + "_label"))
        
# Fit the pipeline to training documents.       
pipeline = Pipeline(stages=alist)
pipelineFit = pipeline.fit(df)
dataset = pipelineFit.transform(df)

#drop original columns whose indexing are done
dataset = dataset.select([c for c in dataset.columns if c not in Non_numeric_col])
dataset.printSchema()

root
 |-- MinTemp: double (nullable = true)
 |-- MaxTemp: double (nullable = true)
 |-- Rainfall: double (nullable = true)
 |-- WindGustSpeed: double (nullable = true)
 |-- WindSpeed9am: double (nullable = true)
 |-- WindSpeed3pm: double (nullable = true)
 |-- Humidity9am: double (nullable = true)
 |-- Humidity3pm: double (nullable = true)
 |-- Pressure9am: double (nullable = true)
 |-- Pressure3pm: double (nullable = true)
 |-- WindGustDir_label: double (nullable = false)
 |-- WindDir9am_label: double (nullable = false)
 |-- WindDir3pm_label: double (nullable = false)
 |-- RainToday_label: double (nullable = false)
 |-- RainTomorrow_label: double (nullable = false)



# Step 07: Create the feature vector and divide the dataset

In [7]:
#Bind all feature columns
vector_assembler = VectorAssembler(inputCols=["MinTemp","MaxTemp","Rainfall","WindGustDir_label","WindGustSpeed",\
                                              "WindDir9am_label",\
                                              "WindDir3pm_label","WindSpeed9am","WindSpeed3pm","Humidity9am",\
                                              "Humidity3pm",\
                                              "Pressure9am","Pressure3pm",
                                              "RainToday_label",],outputCol = "features")
#Fit and Evaluate Models
pipeline1 = Pipeline(stages= [vector_assembler])
pipelineFit1 = pipeline1.fit(dataset)
dataset_new = pipelineFit1.transform(dataset)

#Split the dataset into training and testing (70%,30%)
(trainingData, testData) = dataset_new.randomSplit([0.7, 0.3], seed = 100)

# C. Apply Machine Learning Algorithms

# Step 08: Apply machine learning classification algorithms 

# DecisionTreeClassifier()

In [8]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Create an initial model using the train set.
dt = DecisionTreeClassifier(labelCol="RainTomorrow_label", featuresCol="features")
model1 = dt.fit(trainingData)
# test the model
predictions1 = model1.transform(testData)
# compute accuracy on the test set
evaluator =\
MulticlassClassificationEvaluator(labelCol="RainTomorrow_label",\
predictionCol="prediction", metricName="accuracy")

accuracy1 = evaluator.evaluate(predictions1)
print("The probability of the rain fall tomorrow is",'%.2f%%'%(100*accuracy1))

The probability of the rain fall tomorrow is 83.34%


# RandomForestClassifier()

In [9]:
from pyspark.ml.classification import RandomForestClassifier

# Create an initial model using the train set.
rf = RandomForestClassifier(labelCol="RainTomorrow_label",featuresCol="features", numTrees=10)
model = rf.fit(trainingData)
# test the model
predictions2 = model.transform(testData)

# compute accuracy on the test set
evaluator =\
MulticlassClassificationEvaluator(labelCol="RainTomorrow_label",\
predictionCol="prediction", metricName="accuracy")

accuracy2 = evaluator.evaluate(predictions2)
print("The probability of the rain fall tomorrow is",'%.2f%%'%(100*accuracy2))

The probability of the rain fall tomorrow is 83.30%


# LogisticRegression()

In [10]:
from pyspark.ml.classification import LogisticRegression
# Create an initial model using the train set.
lr = LogisticRegression(featuresCol = 'features', labelCol = 'RainTomorrow_label',  maxIter=10)
lrModel = lr.fit(trainingData)

# LogicRegression.transform() is a method that uses only 'features'.
predictions3 = lrModel.transform(testData)

# The MulticlassificationEvaluator is used to evaluate the model.
evaluator =\
MulticlassClassificationEvaluator(labelCol="RainTomorrow_label",\
predictionCol="prediction", metricName="accuracy")

accuracy3 = evaluator.evaluate(predictions3)
print("The probability of the rain fall tomorrow is",'%.2f%%'%(100*accuracy3))

### Why the accuracy is not same when applying different evaluator? 
#   Because  metricNames are not same,in MulticlassificationEvaluator, it is not calculate accuracy by default
### Does default by "label" , the ml can recognize the word "label" ? yes

The probability of the rain fall tomorrow is 81.92%


# GBTClassifier()

In [11]:
from pyspark.ml.classification import GBTClassifier
# Create an initial model using the train set.
gbt = GBTClassifier(featuresCol = 'features', labelCol = 'RainTomorrow_label', maxIter=10)
gbtModel = gbt.fit(trainingData)
# test the model
predictions4 = gbtModel.transform(testData)
 
#Use the MulticlassClassificationEvaluator method to evaluate the accuracy of the model
evaluator =\
MulticlassClassificationEvaluator(labelCol="RainTomorrow_label",\
predictionCol="prediction", metricName="accuracy")

accuracy4 = evaluator.evaluate(predictions4)
print("The probability of the rain fall tomorrow is",'%.2f%%'%(100*accuracy4))

The probability of the rain fall tomorrow is 84.04%


In [12]:
from matplotlib import pyplot as plt
import numpy as np
y = [accuracy1,accuracy2,accuracy3,accuracy4]
objects = ["DecisionTreeClassifier","RandomForestClassifier","LogisticRegression", "GBTClassifier"]
y_pos = np.arange(len(objects))
plt.bar(y_pos,y)
plt.xticks(y_pos, objects,rotation=315)
plt.ylim(.8,.86)
plt.xlabel('Name of models')
plt.ylabel('Accuracy')
plt.title('Accuracy of different models')
plt.tight_layout()
plt.show()


<Figure size 640x480 with 1 Axes>

# Step 09: Calculate the confusion matrix and find the precision, recall, and F1

In [15]:
from pyspark.mllib.evaluation import MulticlassMetrics
#Build MulticlassMetrics base on 'prediction' and "RainTomorrow_label"
prediction = [predictions1,predictions2,predictions3,predictions4]
ml = ["DecisionTreeClassifier", "RandomForestClassifier","LogisticRegression", "GBTClassifier"]
def step09():
    for i in range(4):
        print("This is for ",ml[i])
        print(" ")         
        results = p.select(['prediction', "RainTomorrow_label"])
        predictionAndLabels=results.rdd
        metrics = MulticlassMetrics(predictionAndLabels)
        
        # calculate the Precision,Recall and F1 Score of label valued with 0, which is not raining tomorrow
        df = prediction[i]
        #  when prediction is not raining and label is not raining, it is true positive
        tp = df.filter(df["RainTomorrow_label"] == 0).filter(df["prediction"] == 0).count()
        #  when prediction is raining and label is not raining, it is false positive
        fp = df.filter(df["RainTomorrow_label"] == 1).filter(df["prediction"] == 0).count()
        #  when prediction is not raining and label is raining, it is true negative
        fn = df.filter(df["RainTomorrow_label"] == 0).filter(df["prediction"] == 1).count()
        #  when prediction is  raining and label is  raining, it is false negative
        tn = df.filter(df["RainTomorrow_label"] == 1).filter(df["prediction"] == 1).count()
        p = tp/(tp+fp)
        r = tp/(tp+fn)
        f1 = 2*p*r/(p+r)
        # Build a metrix                     
        data = [("Positive",float(tp),float(fn)),("Negative",float(fp),float(tn))]
        newdf = spark.createDataFrame(data, ["True value","Positive(predicted)","Negative(predicted)"])
        print("Label 0, Not raining tomorrow")
        print(" ")
        newdf.show()

        print("Precision = %.5f" % p)
        print("Recall = %.5f" % r)
        print("F1 Score = %.5f" % f1)
        print(" ")

        # calculate the Precision,Recall and F1 Score of label valued with 1, which is raining tomorrow
        #  when prediction is raining and label is raining, it is true positive
        tp = df.filter(df["RainTomorrow_label"] == 1).filter(df["prediction"] == 1).count()
        #  when prediction is not raining and label is raining, it is false positive
        fp = df.filter(df["RainTomorrow_label"] == 0).filter(df["prediction"] == 1).count()
        #  when prediction is raining and label is not raining, it is true negative
        fn = df.filter(df["RainTomorrow_label"] == 1).filter(df["prediction"] == 0).count()
        #  when prediction is not raining and label is not raining, it is false negative
        tn = df.filter(df["RainTomorrow_label"] == 0).filter(df["prediction"] == 0).count()        
        p = tp/(tp+fp)
        r = tp/(tp+fn)
        f1 = 2*p*r/(p+r)

        # # Build a metrix                     
        data = [("Positive",float(tp),float(fn)),("Negative",float(fp),float(tn))]
        newdf = spark.createDataFrame(data, ["True value","Positive(predicted)","Negative(predicted)"])
        print("Label 1, raining tomorrow")
        print(" ")
        newdf.show()
        print("Precision = %.5f" % p)
        print("Recall = %.5f" % r)
        print("F1 Score = %.5f" % f1)
        print(" ")


        # # # Overall statistics
        cm=metrics.confusionMatrix().toArray()
        precision = metrics.precision()
        recall = metrics.recall()
        f1Score = metrics.fMeasure()
        print("Summary Stats for overall")
        print("Precision = %.5f" % precision)
        print("Recall = %.5f" % recall)
        print("F1 Score = %.5f" % f1Score)
        print(" ")
        print(" ")
        print(" ")



In [16]:
#Run step09
step09()

This is for  DecisionTreeClassifier
 
Label 0, Not raining tomorrow
 
+----------+-------------------+-------------------+
|True value|Positive(predicted)|Negative(predicted)|
+----------+-------------------+-------------------+
|  Positive|            31410.0|             1567.0|
|  Negative|             5507.0|             3969.0|
+----------+-------------------+-------------------+

Precision = 0.85083
Recall = 0.95248
F1 Score = 0.89879
 
Label 1, raining tomorrow
 
+----------+-------------------+-------------------+
|True value|Positive(predicted)|Negative(predicted)|
+----------+-------------------+-------------------+
|  Positive|             3969.0|             5507.0|
|  Negative|             1567.0|            31410.0|
+----------+-------------------+-------------------+

Precision = 0.71694
Recall = 0.41885
F1 Score = 0.52878
 
Summary Stats for overall
Precision = 0.83337
Recall = 0.83337
F1 Score = 0.83337
 
 
 
This is for  RandomForestClassifier
 
Label 0, Not raining t

# Improvement of accuracy#


1.Add more data: Presence of more data results in better and accurate models.

2.Feature processing : choose better subset of attributes and use better normalization which will better explains the relationship of independent variables with target variable. 

3.Algorithm Tuning : Take different values for the algorithm parameters into account.For instant,In random forest, we have various parameters like max_features, number_trees, random_state, oob_score and others 