# Assignment 2

### Python version: 3.6
* conda create --name py3.6 python=3.6
* source activate py3.6
* python -V

### Java version:  8
* brew cask install adoptopenjdk/openjdk/adoptopenjdk8
* open ~/.bash_profile
* export JAVA_HOME=$(/usr/libexec/java_home -v 1.8)
* java -version

## Step 01: Import Spark Session and initialize Spark

In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
...     .master("local") \
...     .appName("Weather Forecast") \
...     .config("spark.some.config.option", "some-value") \
...     .getOrCreate()

## Step 02: Load the dataset and print the schema and total number of entries

In [210]:
df = spark.read.csv('weatherAUS.csv', header=True)

In [211]:
df.describe().collect()

[Row(summary='count', Date='142193', Location='142193', MinTemp='142193', MaxTemp='142193', Rainfall='142193', Evaporation='142193', Sunshine='142193', WindGustDir='142193', WindGustSpeed='142193', WindDir9am='142193', WindDir3pm='142193', WindSpeed9am='142193', WindSpeed3pm='142193', Humidity9am='142193', Humidity3pm='142193', Pressure9am='142193', Pressure3pm='142193', Cloud9am='142193', Cloud3pm='142193', Temp9am='142193', Temp3pm='142193', RainToday='142193', RainTomorrow='142193'),
 Row(summary='mean', Date=None, Location=None, MinTemp='12.186399728729098', MaxTemp='23.226784191272444', Rainfall='2.3499740743111954', Evaporation='5.469824216349123', Sunshine='7.624853113193571', WindGustDir=None, WindGustSpeed='39.98429165757619', WindDir9am=None, WindDir3pm=None, WindSpeed9am='14.001988000994', WindSpeed3pm='18.63757586179718', Humidity9am='68.8438103105705', Humidity3pm='51.482606091656265', Pressure9am='1017.6537584159781', Pressure3pm='1015.258203537907', Cloud9am='4.437189391

In [217]:
df = spark.read.csv('weatherAUS.csv', header=True)

In [218]:
df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- MinTemp: string (nullable = true)
 |-- MaxTemp: string (nullable = true)
 |-- Rainfall: string (nullable = true)
 |-- Evaporation: string (nullable = true)
 |-- Sunshine: string (nullable = true)
 |-- WindGustDir: string (nullable = true)
 |-- WindGustSpeed: string (nullable = true)
 |-- WindDir9am: string (nullable = true)
 |-- WindDir3pm: string (nullable = true)
 |-- WindSpeed9am: string (nullable = true)
 |-- WindSpeed3pm: string (nullable = true)
 |-- Humidity9am: string (nullable = true)
 |-- Humidity3pm: string (nullable = true)
 |-- Pressure9am: string (nullable = true)
 |-- Pressure3pm: string (nullable = true)
 |-- Cloud9am: string (nullable = true)
 |-- Cloud3pm: string (nullable = true)
 |-- Temp9am: string (nullable = true)
 |-- Temp3pm: string (nullable = true)
 |-- RainToday: string (nullable = true)
 |-- RainTomorrow: string (nullable = true)



In [203]:
df.count()

142193

## Step 03: Delete columns from the dataset

In [230]:
df = df.drop('Date', 'Location', 'Evaporation', 'Sunshine', 'Cloud9am', 'Cloud3pm', 'Temp9am', 'Temp3pm')
df.show(3)

+-------+-------+--------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+-----------+-----------+---------+------------+
|MinTemp|MaxTemp|Rainfall|WindGustDir|WindGustSpeed|WindDir9am|WindDir3pm|WindSpeed9am|WindSpeed3pm|Humidity9am|Humidity3pm|Pressure9am|Pressure3pm|RainToday|RainTomorrow|
+-------+-------+--------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+-----------+-----------+---------+------------+
|   13.4|   22.9|     0.6|          W|           44|         W|       WNW|          20|          24|         71|         22|     1007.7|     1007.1|       No|          No|
|    7.4|   25.1|       0|        WNW|           44|       NNW|       WSW|           4|          22|         44|         25|     1010.6|     1007.8|       No|          No|
|   12.9|   25.7|       0|        WSW|           46|         W|       WSW|          19|          26|         38|         30|     1007.6|    

## Step 04: Print the number of missing data in each column.

In [232]:
from pyspark.sql.functions import col,sum
df.select(*(sum(col(c).isin('NA').cast("int")).alias(c) for c in df.columns)).show()

+-------+-------+--------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+-----------+-----------+---------+------------+
|MinTemp|MaxTemp|Rainfall|WindGustDir|WindGustSpeed|WindDir9am|WindDir3pm|WindSpeed9am|WindSpeed3pm|Humidity9am|Humidity3pm|Pressure9am|Pressure3pm|RainToday|RainTomorrow|
+-------+-------+--------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+-----------+-----------+---------+------------+
|    637|    322|    1406|       9330|         9270|     10013|      3778|        1348|        2630|       1774|       3610|      14014|      13981|     1406|           0|
+-------+-------+--------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+-----------+-----------+---------+------------+



## Step 05: Fill the missing data with average value and maximum occurrence value.

In [237]:
from pyspark.sql.functions import countDistinct, avg, stddev, min, max, sum, count 
df.select(avg('MinTemp'),
          avg('MaxTemp'),
          avg('Rainfall'),
          avg('WindGustSpeed'),
          avg('WindSpeed9am'),
          avg('WindSpeed3pm'),
          avg('Humidity9am'),
          avg('Humidity3pm'),
          avg('Pressure9am'),
          avg('Pressure3pm')).show()

+------------------+------------------+------------------+------------------+-----------------+-----------------+----------------+------------------+------------------+-----------------+
|      avg(MinTemp)|      avg(MaxTemp)|     avg(Rainfall)|avg(WindGustSpeed)|avg(WindSpeed9am)|avg(WindSpeed3pm)|avg(Humidity9am)|  avg(Humidity3pm)|  avg(Pressure9am)| avg(Pressure3pm)|
+------------------+------------------+------------------+------------------+-----------------+-----------------+----------------+------------------+------------------+-----------------+
|12.186399728729098|23.226784191272444|2.3499740743111954| 39.98429165757619|  14.001988000994|18.63757586179718|68.8438103105705|51.482606091656265|1017.6537584159781|1015.258203537907|
+------------------+------------------+------------------+------------------+-----------------+-----------------+----------------+------------------+------------------+-----------------+



In [262]:
avg_list = df.select(avg('MinTemp'), 
                     avg('MaxTemp'), 
                     avg('Rainfall'), 
                     avg('WindGustSpeed'), 
                     avg('WindSpeed9am'), 
                     avg('WindSpeed3pm'), 
                     avg('Humidity9am'), 
                     avg('Humidity3pm'), 
                     avg('Pressure9am'), 
                     avg('Pressure3pm')).collect()

In [268]:
df.select(max('WindGustDir'),
          max('WindDir9am'),
          max('WindDir3pm'),
          max('RainToday'),
          max('RainTomorrow'),
         ).show()

+----------------+---------------+---------------+--------------+-----------------+
|max(WindGustDir)|max(WindDir9am)|max(WindDir3pm)|max(RainToday)|max(RainTomorrow)|
+----------------+---------------+---------------+--------------+-----------------+
|             WSW|            WSW|            WSW|           Yes|              Yes|
+----------------+---------------+---------------+--------------+-----------------+



In [270]:
max_list = df.select(max('WindGustDir'),
                     max('WindDir9am'),
                     max('WindDir3pm'),
                     max('RainToday'),
                     max('RainTomorrow'),
                    ).collect()

In [278]:
df2 = df\
.withColumn('MinTemp', when(df.MinTemp=='NA', avg_list[0][0]).otherwise(df.MinTemp))\
.withColumn('MaxTemp', when(df.MaxTemp=='NA', avg_list[0][1]).otherwise(df.MaxTemp))\
.withColumn('Rainfall', when(df.Rainfall=='NA', avg_list[0][2]).otherwise(df.Rainfall))\
.withColumn('WindGustSpeed', when(df.WindGustSpeed=='NA', avg_list[0][3]).otherwise(df.WindGustSpeed))\
.withColumn('WindSpeed9am', when(df.WindSpeed9am=='NA', avg_list[0][4]).otherwise(df.WindSpeed9am))\
.withColumn('WindSpeed3pm', when(df.WindSpeed3pm=='NA', avg_list[0][5]).otherwise(df.WindSpeed3pm))\
.withColumn('Humidity9am', when(df.Humidity9am=='NA', avg_list[0][6]).otherwise(df.Humidity9am))\
.withColumn('Humidity3pm', when(df.Humidity3pm=='NA', avg_list[0][7]).otherwise(df.Humidity3pm))\
.withColumn('Pressure9am', when(df.Pressure9am=='NA', avg_list[0][8]).otherwise(df.Pressure9am))\
.withColumn('Pressure3pm', when(df.Pressure3pm=='NA', avg_list[0][9]).otherwise(df.Pressure3pm))\
.withColumn('WindGustDir', when(df.WindGustDir=='NA', max_list[0][0]).otherwise(df.WindGustDir))\
.withColumn('WindDir9am', when(df.WindDir9am=='NA', max_list[0][1]).otherwise(df.WindDir9am))\
.withColumn('WindDir3pm', when(df.WindDir3pm=='NA', max_list[0][2]).otherwise(df.WindDir3pm))\
.withColumn('RainToday', when(df.RainToday=='NA', max_list[0][3]).otherwise(df.RainToday))\
.withColumn('RainTomorrow', when(df.RainTomorrow=='NA', max_list[0][4]).otherwise(df.RainTomorrow))\

df2.show(20)

+-------+-------+------------------+-----------+-------------+----------+----------+---------------+------------+-----------+-----------+-----------+-----------+---------+------------+
|MinTemp|MaxTemp|          Rainfall|WindGustDir|WindGustSpeed|WindDir9am|WindDir3pm|   WindSpeed9am|WindSpeed3pm|Humidity9am|Humidity3pm|Pressure9am|Pressure3pm|RainToday|RainTomorrow|
+-------+-------+------------------+-----------+-------------+----------+----------+---------------+------------+-----------+-----------+-----------+-----------+---------+------------+
|   13.4|   22.9|               0.6|          W|           44|         W|       WNW|             20|          24|         71|         22|     1007.7|     1007.1|       No|          No|
|    7.4|   25.1|                 0|        WNW|           44|       NNW|       WSW|              4|          22|         44|         25|     1010.6|     1007.8|       No|          No|
|   12.9|   25.7|                 0|        WSW|           46|         W|  

## Step 06: Data transformation

## Step 07: Create the feature vector and divide the dataset

## Step 08: Apply machine learning classification algorithms on the dataset and compare their accuracy. Plot the accuracy as bar graph.

## Step 09: Calculate the confusion matrix and find the precision, recall, and F1 score of each classification algorithm. Explain how the accuracy of the predication can be improved?