In [1]:
# Import required modules
import pandas as pd
from pyspark import SparkConf
from pyspark.sql import SparkSession

In [2]:
%%markdown
# Predict weather using pyspark

# Predict weather using pyspark


In [3]:
%%markdown
# Spark Context

# Spark Context


In [4]:
master = "local[*]"

app_name = "Predict Weather Using Spark"
spark_conf = SparkConf().setMaster(master).setAppName(app_name)

spark = SparkSession.builder.config(conf=spark_conf).getOrCreate()
sc = spark.sparkContext
sc.setLogLevel('ERROR')

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/12/15 21:46:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Data Prep

In [5]:
df_train = spark.read.format('csv')\
                    .option('header', True).option('escape', '"').option('inferSchema', True)\
                    .load('data/weather-training-data.csv')

df_test = spark.read.format('csv')\
                    .option('header', True).option('escape', '"').option('inferSchema', True)\
                    .load('data/weather-test-data.csv')

                                                                                

In [6]:
print(f"Training data {df_train.count()}, Test data {df_test.count()}")

Training data 99516, Test data 42677


## Check Data

In [7]:
print(f"Training data {df_train.describe().toPandas().head()}")

                                                                                

Training data   summary    row ID  Location             MinTemp             MaxTemp  \
0   count     99516     99516               99073               99286   
1    mean      None      None  12.176265985687314  23.218513184134757   
2  stddev      None      None   6.390882290565232   7.115072398372783   
3     min      Row0  Adelaide                -8.5                -4.1   
4     max  Row99999   Woomera                33.9                48.1   

             Rainfall        Evaporation           Sunshine WindGustDir  \
0               98537              56985              52199       99516   
1  2.3530237372762692   5.46131964552076  7.615090327400976        None   
2   8.487865726637184  4.162490444622897  3.783007646224733        None   
3                 0.0                0.0                0.0           E   
4               371.0               86.2               14.5         WSW   

       WindGustSpeed  ...         Humidity9am        Humidity3pm  \
0              93036  ...   

In [8]:
print(f"Test data {df_test.describe().toPandas().head()}")



Test data   summary   row ID  Location             MinTemp             MaxTemp  \
0   count    42677     42677               42483               42585   
1    mean     None      None  12.210032248193444  23.246067864271506   
2  stddev     None      None  6.4321216400080825   7.123596236045857   
3     min     Row0  Adelaide                -8.2                -4.8   
4     max  Row9999   Woomera                31.8                47.0   

             Rainfall         Evaporation            Sunshine WindGustDir  \
0               42250               24365               22178       42677   
1  2.3428615384614444  5.4897147547711915  7.6478311840563205        None   
2   8.412105837942626   4.248849509250146  3.7780187468829842        None   
3                 0.0                 0.0                 0.0           E   
4               278.4               145.0                14.3         WSW   

        WindGustSpeed  ...        WindSpeed3pm         Humidity9am  \
0               39887  .

                                                                                

## Check Missing Data / Null Value
Check Missing Value and Drop Null/NaN values

In [9]:
from pyspark.sql.functions import isnan, when, count, col
df_train.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_train.columns]).show()



+------+--------+-------+-------+--------+-----------+--------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+-----------+-----------+--------+--------+-------+-------+---------+------------+
|row ID|Location|MinTemp|MaxTemp|Rainfall|Evaporation|Sunshine|WindGustDir|WindGustSpeed|WindDir9am|WindDir3pm|WindSpeed9am|WindSpeed3pm|Humidity9am|Humidity3pm|Pressure9am|Pressure3pm|Cloud9am|Cloud3pm|Temp9am|Temp3pm|RainToday|RainTomorrow|
+------+--------+-------+-------+--------+-----------+--------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+-----------+-----------+--------+--------+-------+-------+---------+------------+
|     0|       0|    443|    230|     979|      42531|   47317|          0|         6480|         0|         0|         935|        1835|       1233|       2506|       9748|       9736|   37572|   40002|    614|   1904|        0|           0|
+------+--------+-------+---

                                                                                

Identify list of columns

In [10]:
column_df_train = df_train.columns
print(column_df_train)

['row ID', 'Location', 'MinTemp', 'MaxTemp', 'Rainfall', 'Evaporation', 'Sunshine', 'WindGustDir', 'WindGustSpeed', 'WindDir9am', 'WindDir3pm', 'WindSpeed9am', 'WindSpeed3pm', 'Humidity9am', 'Humidity3pm', 'Pressure9am', 'Pressure3pm', 'Cloud9am', 'Cloud3pm', 'Temp9am', 'Temp3pm', 'RainToday', 'RainTomorrow']


# Data Cleaning

# Feature Engineering
Select a list of features to put into ML Model. Use K Means Clustering. <br>
1. implement StringIndexer to index columns "Location", "WindGustDir", "WindDir*" and "RainToday"
2. Implement OHE
3. Implement VA


## Data Prep in Feature Engineering

In [12]:
from pyspark.ml.classification import GBTClassifier, LogisticRegression
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler

"""Pass all features to put into the ML Model for now"""

df_train = df_train.drop("row ID").drop("prediction")
input_cols = [col for col in df_train.columns if col not in ["RainTomorrow"]]
output_cols = [f"{col}_Index" for col in input_cols]

# String Indexer
string_indexer = StringIndexer(inputCols=input_cols, outputCols=output_cols).setHandleInvalid("keep")
indexed = string_indexer.fit(df_train).transform(df_train)

# One Hot Encoder
output_cols_OHE = [f"{col}_OHE" for col in output_cols if col not in ["RainTomorrow"]]
encoder = OneHotEncoder(inputCols=output_cols, outputCols=output_cols_OHE)
encoded = encoder.fit(indexed).transform(indexed)

# VectorAssembler
input_cols_VA = [col for col in output_cols_OHE if col != 'label']
assembler = VectorAssembler(inputCols=input_cols_VA, outputCol='rawFeatures')
df_output = assembler.transform(df_train)

                                                                                

IllegalArgumentException: Location_Index_OHE does not exist. Available: Location, MinTemp, MaxTemp, Rainfall, Evaporation, Sunshine, WindGustDir, WindGustSpeed, WindDir9am, WindDir3pm, WindSpeed9am, WindSpeed3pm, Humidity9am, Humidity3pm, Pressure9am, Pressure3pm, Cloud9am, Cloud3pm, Temp9am, Temp3pm, RainToday, RainTomorrow

# Make Predictions
Use the above result to make predictions

In [13]:
predictions = pipelineModel.transform(df_train).withColumnRenamed('scaledFeatures', 'features')
predictions.groupBy('label','location', 'prediction').count().orderBy('location', 'prediction').show(5)

NameError: name 'pipelineModel' is not defined

# Evaluating Model
Evaluate the model by calculating the **Silhouette Score**

In [None]:
from pyspark.ml.evaluation import ClusteringEvaluator

evaluator = ClusteringEvaluator()
silhouette_score = evaluator.evaluate(predictions)
print("The Silhouette score with squared euclidean distance of GBT Classification Clustering is : " + str(silhouette_score))

**Training Summary:** AUC, accuracy, recall and precision

In [None]:
predictions.printSchema()

In [None]:
gbt_prediction_and_labels = predictions.select('label', 'prediction').rdd

In [None]:
from pyspark.mllib.evaluation import MulticlassMetrics

gbt_evaluator = MulticlassMetrics(gbt_prediction_and_labels)
print("Training summary regarding the Gradient Boosted Tree Classifier")
print(f"Accuracy: {gbt_evaluator.accuracy}")

gbt_labels = (predictions.select("label").distinct().collect()[1])

for i in sorted(gbt_labels):
    print("Label %s precision = %s" % (i, gbt_evaluator.precision(i)))
    print("Label %s recall = %s" % (i, gbt_evaluator.recall(i)))
    print("Label %s F1 Measure = %s" % (i, gbt_evaluator.fMeasure(i, beta=1.0)))


**Binary Classification Evaluator**

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

gbt_auc_evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
gbt_auc = gbt_auc_evaluator.evaluate(predictions)
print(f"AUC: {gbt_auc}")

# Feature Analysis


Get the top 5 most import features

In [None]:
predictions.select("RainTomorrow", "features", "prediction")\
    .groupBy("RainTomorrow", "features", "prediction").count().orderBy("count", ascending=False).show(5)


## ROC Curve

In [None]:
import pyspark.sql.types as T
import pyspark.sql.functions as F

# Use this UDF to cast probability to array
cast_to_array = F.udf(lambda i: i.toArray().tolist(), T.ArrayType(T.FloatType()))

gbt_roc_df = predictions.withColumn("probability", cast_to_array("probability"))
gbt_roc_prob_df = gbt_roc_df.select(gbt_roc_df.probability[0].alias("negativeProb"), gbt_roc_df.probability[1].alias("positiveProb"), "label")

In [None]:
# Method to build Confusion matrix
def build_confusion_matrix(prediction):
    # Calculate the elements of the confusion matrix
    TN = prediction.filter("prediction = 0 AND label = 0").count()
    TP = prediction.filter("prediction = 1 AND label = 1").count()
    FN = prediction.filter("prediction = 0 AND label = 1").count()
    FP = prediction.filter("prediction = 1 AND label = 0").count()
    return TP,TN,FP,FN

Compute **confusion matrix**, set TP threshold at 0.8

In [None]:
threshold = 0.8
gbt_roc_prob_df.withColumn("prediction",F.when(gbt_roc_prob_df.positiveProb > threshold,1).otherwise(0))
gbt_tp,gbt_tn,gbt_fp,gbt_fn = build_confusion_matrix(gbt_roc_prob_df)
gbt_tpr = gbt_tp/(gbt_tp + gbt_fn)
gbt_fpr = gbt_fp/(gbt_fp + gbt_tn)
print("TPR:",gbt_tpr,"FPR: ",gbt_fpr)

## Plot ROC

In [None]:
import matplotlib.pyplot as plt

plt.plot(gbt_fpr,gbt_tpr)
plt.ylabel("TPR")
plt.xlabel("FPR")
plt.title("ROC Curve")
plt.show()