In [2]:
# Downloading and extracting Spark 3.5.1

!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz
!tar xf spark-3.5.1-bin-hadoop3.tgz
!pip install -q findspark

In [3]:
# Setting up Spark environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.1-bin-hadoop3"


In [4]:
# Mounting to Google Drive
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [5]:
import os
import pandas as pd
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.types import *

# creating a spark session
spark = SparkSession.builder.appName("weatherAUS").getOrCreate()



In [6]:
#Libraries

import os
import sys
import tempfile
import shutil
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.mllib.stat import Statistics
from pyspark.mllib.util import MLUtils
from pyspark.sql.types import *
from pyspark.sql.functions import col, expr,isnan,when,count,year, month, dayofmonth
from pyspark.sql.types import StringType, FloatType, IntegerType, DoubleType
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler,Imputer, StandardScaler, StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql.functions import lit

In [7]:
#Reading the dataset
data = spark.read.csv('/content/drive/MyDrive/weatherAUS.csv',inferSchema=True,header=True)
data.show(5)


+----------+--------+-------+-------+--------+-----------+--------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+-----------+-----------+--------+--------+-------+-------+---------+-------+------------+
|      Date|Location|MinTemp|MaxTemp|Rainfall|Evaporation|Sunshine|WindGustDir|WindGustSpeed|WindDir9am|WindDir3pm|WindSpeed9am|WindSpeed3pm|Humidity9am|Humidity3pm|Pressure9am|Pressure3pm|Cloud9am|Cloud3pm|Temp9am|Temp3pm|RainToday|RISK_MM|RainTomorrow|
+----------+--------+-------+-------+--------+-----------+--------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+-----------+-----------+--------+--------+-------+-------+---------+-------+------------+
|2008-12-01|  Albury|   13.4|   22.9|     0.6|         NA|      NA|          W|           44|         W|       WNW|          20|          24|         71|         22|     1007.7|     1007.1|       8|      NA|   16.9|   21.8|       No|  

In [8]:
#Printing the data schema
data.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Location: string (nullable = true)
 |-- MinTemp: string (nullable = true)
 |-- MaxTemp: string (nullable = true)
 |-- Rainfall: string (nullable = true)
 |-- Evaporation: string (nullable = true)
 |-- Sunshine: string (nullable = true)
 |-- WindGustDir: string (nullable = true)
 |-- WindGustSpeed: string (nullable = true)
 |-- WindDir9am: string (nullable = true)
 |-- WindDir3pm: string (nullable = true)
 |-- WindSpeed9am: string (nullable = true)
 |-- WindSpeed3pm: string (nullable = true)
 |-- Humidity9am: string (nullable = true)
 |-- Humidity3pm: string (nullable = true)
 |-- Pressure9am: string (nullable = true)
 |-- Pressure3pm: string (nullable = true)
 |-- Cloud9am: string (nullable = true)
 |-- Cloud3pm: string (nullable = true)
 |-- Temp9am: string (nullable = true)
 |-- Temp3pm: string (nullable = true)
 |-- RainToday: string (nullable = true)
 |-- RISK_MM: double (nullable = true)
 |-- RainTomorrow: string (nullable = true)



1) Exploring categorical variable names

In [9]:
#converting sttrings to float
df_stf = data.select(*(col(c).cast(FloatType()) if c in ["MinTemp","MaxTemp","Rainfall","Evaporation","WindGustSpeed","WindSpeed9am","Sunshine","WindSpeed3pm", "Humidity9am", "Humidity3pm", "Pressure9am", "Pressure3pm", "Cloud9am", "Cloud3pm", "Temp9am", "Temp3pm", "RISK_MM"
] else col(c) for c in data.columns))
df_stf.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Location: string (nullable = true)
 |-- MinTemp: float (nullable = true)
 |-- MaxTemp: float (nullable = true)
 |-- Rainfall: float (nullable = true)
 |-- Evaporation: float (nullable = true)
 |-- Sunshine: float (nullable = true)
 |-- WindGustDir: string (nullable = true)
 |-- WindGustSpeed: float (nullable = true)
 |-- WindDir9am: string (nullable = true)
 |-- WindDir3pm: string (nullable = true)
 |-- WindSpeed9am: float (nullable = true)
 |-- WindSpeed3pm: float (nullable = true)
 |-- Humidity9am: float (nullable = true)
 |-- Humidity3pm: float (nullable = true)
 |-- Pressure9am: float (nullable = true)
 |-- Pressure3pm: float (nullable = true)
 |-- Cloud9am: float (nullable = true)
 |-- Cloud3pm: float (nullable = true)
 |-- Temp9am: float (nullable = true)
 |-- Temp3pm: float (nullable = true)
 |-- RainToday: string (nullable = true)
 |-- RISK_MM: float (nullable = true)
 |-- RainTomorrow: string (nullable = true)



In [10]:
#printing categorical variables in the dataset
schema = df_stf.schema
string_columns = [field.name for field in schema if isinstance(field.dataType, StringType)]
print("Columns containing string values:", string_columns)


Columns containing string values: ['Location', 'WindGustDir', 'WindDir9am', 'WindDir3pm', 'RainToday', 'RainTomorrow']


2) categorical variables having null values

In [11]:
# Identifying categorical variables with null values
df_cv = df_stf.select([count(when(col(c).contains('None') | \
                            col(c).contains('NULL') | \
                            col(c).contains('NA') | \
                            (col(c) == '' ) | \
                            col(c).isNull() | \
                            isnan(c), c
                           )).alias(c)
                    for c in string_columns])
print('Displaying categorical variables with null value counts')
df_cv.show()


Displaying categorical variables with null value counts
+--------+-----------+----------+----------+---------+------------+
|Location|WindGustDir|WindDir9am|WindDir3pm|RainToday|RainTomorrow|
+--------+-----------+----------+----------+---------+------------+
|       0|       9330|     10013|      3778|     1406|           0|
+--------+-----------+----------+----------+---------+------------+



In [12]:
categorical_columns_with_null = [c for c in string_columns if df_cv.select(c).first()[c] > 0]
print(f'Number of categorical columns with null values are: {categorical_columns_with_null},count: {len(categorical_columns_with_null)}')


Number of categorical columns with null values are: ['WindGustDir', 'WindDir9am', 'WindDir3pm', 'RainToday'],count: 4


3) Frequency count of each categorical variable

In [None]:
#Printing Frequency count of categorical variables
for column in string_columns:
    frequency_count = df_cv.groupBy(column).count()
    print(f"\nFrequency count for '{column}':")
    frequency_count.show(100,truncate=False)
    distinct_values_count = df_cv.select(column).distinct().count()
    print(f"Number of datapoints in each distinct value for {column}: {distinct_values_count}")
    print()


Frequency count for 'Location':
+----------------+-----+
|Location        |count|
+----------------+-----+
|Cairns          |2988 |
|NorfolkIsland   |2964 |
|Bendigo         |3034 |
|Canberra        |3418 |
|Cobar           |2988 |
|SydneyAirport   |3005 |
|Wollongong      |2983 |
|Williamtown     |2553 |
|Moree           |2854 |
|Mildura         |3007 |
|Portland        |2996 |
|Brisbane        |3161 |
|Sydney          |3337 |
|Sale            |3000 |
|BadgerysCreek   |2928 |
|Tuggeranong     |2998 |
|Ballarat        |3028 |
|GoldCoast       |2980 |
|MelbourneAirport|3009 |
|Dartmoor        |2943 |
|Nhil            |1569 |
|Albury          |3011 |
|WaggaWagga      |2976 |
|CoffsHarbour    |2953 |
|Melbourne       |2435 |
|Penrith         |2964 |
|NorahHead       |2929 |
|MountGinini     |2907 |
|Townsville      |3033 |
|Newcastle       |2955 |
|Watsonia        |2999 |
|Richmond        |2951 |
|Walpole         |2819 |
|Woomera         |2990 |
|Adelaide        |3090 |
|PerthAirport    

4) First five rows of the dataset

In [15]:
#printing first five rows of the data
df_stf.show(5)



+----------+--------+-------+-------+--------+-----------+--------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+-----------+-----------+--------+--------+-------+-------+---------+-------+------------+
|      Date|Location|MinTemp|MaxTemp|Rainfall|Evaporation|Sunshine|WindGustDir|WindGustSpeed|WindDir9am|WindDir3pm|WindSpeed9am|WindSpeed3pm|Humidity9am|Humidity3pm|Pressure9am|Pressure3pm|Cloud9am|Cloud3pm|Temp9am|Temp3pm|RainToday|RISK_MM|RainTomorrow|
+----------+--------+-------+-------+--------+-----------+--------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+-----------+-----------+--------+--------+-------+-------+---------+-------+------------+
|2008-12-01|  Albury|   13.4|   22.9|     0.6|       NULL|    NULL|          W|         44.0|         W|       WNW|        20.0|        24.0|       71.0|       22.0|     1007.7|     1007.1|     8.0|    NULL|   16.9|   21.8|       No|  

5) Avilable columns of the dataset

In [16]:
#printing avilable columns in the dataset
df_stf.columns

['Date',
 'Location',
 'MinTemp',
 'MaxTemp',
 'Rainfall',
 'Evaporation',
 'Sunshine',
 'WindGustDir',
 'WindGustSpeed',
 'WindDir9am',
 'WindDir3pm',
 'WindSpeed9am',
 'WindSpeed3pm',
 'Humidity9am',
 'Humidity3pm',
 'Pressure9am',
 'Pressure3pm',
 'Cloud9am',
 'Cloud3pm',
 'Temp9am',
 'Temp3pm',
 'RainToday',
 'RISK_MM',
 'RainTomorrow']

6) Droping Risk_MM column

In [17]:
#Dropping risk column
df_dr = df_stf.drop("RISK_MM")


7) Summary of the dataset

In [18]:
#summary of the data
df_dr.summary().show()


+-------+--------+------------------+------------------+-----------------+-----------------+------------------+-----------+------------------+----------+----------+-----------------+-----------------+------------------+------------------+------------------+------------------+------------------+-----------------+-----------------+------------------+---------+------------+
|summary|Location|           MinTemp|           MaxTemp|         Rainfall|      Evaporation|          Sunshine|WindGustDir|     WindGustSpeed|WindDir9am|WindDir3pm|     WindSpeed9am|     WindSpeed3pm|       Humidity9am|       Humidity3pm|       Pressure9am|       Pressure3pm|          Cloud9am|         Cloud3pm|          Temp9am|           Temp3pm|RainToday|RainTomorrow|
+-------+--------+------------------+------------------+-----------------+-----------------+------------------+-----------+------------------+----------+----------+-----------------+-----------------+------------------+------------------+--------------

8) First five rows of categrical Variables

In [19]:
#printing first five rows of categrical Variables
df_dr.select(string_columns).show(5, truncate=False)

+--------+-----------+----------+----------+---------+------------+
|Location|WindGustDir|WindDir9am|WindDir3pm|RainToday|RainTomorrow|
+--------+-----------+----------+----------+---------+------------+
|Albury  |W          |W         |WNW       |No       |No          |
|Albury  |WNW        |NNW       |WSW       |No       |No          |
|Albury  |WSW        |W         |WSW       |No       |No          |
|Albury  |NE         |SE        |E         |No       |No          |
|Albury  |W          |ENE       |NW        |No       |No          |
+--------+-----------+----------+----------+---------+------------+
only showing top 5 rows



9) Decomposing date field into year, month, and day fields


In [20]:
#Decomposing date field
df = df_dr.withColumn("Year", year(col("Date"))) \
         .withColumn("Month", month(col("Date"))) \
         .withColumn("Day", dayofmonth(col("Date")))
df_dd = df.drop("Date")
df_dd.show(5)

+--------+-------+-------+--------+-----------+--------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+-----------+-----------+--------+--------+-------+-------+---------+------------+----+-----+---+
|Location|MinTemp|MaxTemp|Rainfall|Evaporation|Sunshine|WindGustDir|WindGustSpeed|WindDir9am|WindDir3pm|WindSpeed9am|WindSpeed3pm|Humidity9am|Humidity3pm|Pressure9am|Pressure3pm|Cloud9am|Cloud3pm|Temp9am|Temp3pm|RainToday|RainTomorrow|Year|Month|Day|
+--------+-------+-------+--------+-----------+--------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+-----------+-----------+--------+--------+-------+-------+---------+------------+----+-----+---+
|  Albury|   13.4|   22.9|     0.6|       NULL|    NULL|          W|         44.0|         W|       WNW|        20.0|        24.0|       71.0|       22.0|     1007.7|     1007.1|     8.0|    NULL|   16.9|   21.8|       No|          No|2008|   12| 

10) Number of unique Locations in dataset

In [21]:
#printing number of unique locations in the dataset
df_dd.select("Location").distinct().count()

49

11) Number of times each location appears in the dataset

In [22]:
# no of times each location appears in the dataset
Unique_locations_appears = df_dd.groupBy("Location").count()
Unique_locations_appears.show(Unique_locations_appears.count(), truncate=False)

+----------------+-----+
|Location        |count|
+----------------+-----+
|Cairns          |2988 |
|NorfolkIsland   |2964 |
|Bendigo         |3034 |
|Canberra        |3418 |
|Cobar           |2988 |
|SydneyAirport   |3005 |
|Wollongong      |2983 |
|Williamtown     |2553 |
|Moree           |2854 |
|Mildura         |3007 |
|Portland        |2996 |
|Brisbane        |3161 |
|Sydney          |3337 |
|Sale            |3000 |
|BadgerysCreek   |2928 |
|Tuggeranong     |2998 |
|Ballarat        |3028 |
|GoldCoast       |2980 |
|MelbourneAirport|3009 |
|Dartmoor        |2943 |
|Nhil            |1569 |
|Albury          |3011 |
|WaggaWagga      |2976 |
|CoffsHarbour    |2953 |
|Melbourne       |2435 |
|Penrith         |2964 |
|NorahHead       |2929 |
|MountGinini     |2907 |
|Townsville      |3033 |
|Newcastle       |2955 |
|Watsonia        |2999 |
|Richmond        |2951 |
|Walpole         |2819 |
|Woomera         |2990 |
|Adelaide        |3090 |
|PerthAirport    |3009 |
|Albany          |3016 |


14) Replacing null values of dataset with median value for numerical columns

In [None]:
#Replacing null values of data with median for numericals
schema = df_dd.schema
numerical_columns = [field.name for field in schema if isinstance(field.dataType, FloatType)]
medians = {}
for column in numerical_columns:
    median_value = df_dd.approxQuantile(column, [0.5], 0.01)[0]
    medians[column] = median_value
for column in numerical_columns:
    median_value = medians[column]
    df_mn = df_mn.withColumn(column, when(col(column).isNull(), median_value).otherwise(col(column)))
df_mn.show()

+--------+------------------+------------------+-------------------+-----------------+-----------------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+------------------+------------------+--------+--------+------------------+------------------+---------+------------+----+-----+---+
|Location|           MinTemp|           MaxTemp|           Rainfall|      Evaporation|         Sunshine|WindGustDir|WindGustSpeed|WindDir9am|WindDir3pm|WindSpeed9am|WindSpeed3pm|Humidity9am|Humidity3pm|       Pressure9am|       Pressure3pm|Cloud9am|Cloud3pm|           Temp9am|           Temp3pm|RainToday|RainTomorrow|Year|Month|Day|
+--------+------------------+------------------+-------------------+-----------------+-----------------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+------------------+------------------+--------+--------+------------------+------------------+---------+------------+----+-----+

 Replacing null values of data wuth mode in the categrical columns

In [None]:
#Replacing null values of data with mode for categrical
string_columns = [field.name for field in schema if isinstance(field.dataType, StringType)]
for column in string_columns:
    mode_value = df_mn.groupBy(column).count().orderBy("count", ascending=False).first()[0]
    df_mc = df_mn.fillna({column: mode_value})
df_mc.show()


+--------+------------------+------------------+-------------------+-----------------+-----------------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+------------------+------------------+--------+--------+------------------+------------------+---------+------------+----+-----+---+
|Location|           MinTemp|           MaxTemp|           Rainfall|      Evaporation|         Sunshine|WindGustDir|WindGustSpeed|WindDir9am|WindDir3pm|WindSpeed9am|WindSpeed3pm|Humidity9am|Humidity3pm|       Pressure9am|       Pressure3pm|Cloud9am|Cloud3pm|           Temp9am|           Temp3pm|RainToday|RainTomorrow|Year|Month|Day|
+--------+------------------+------------------+-------------------+-----------------+-----------------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+------------------+------------------+--------+--------+------------------+------------------+---------+------------+----+-----+

12) performing one hot encoding

In [None]:
# OneHotEncoding for string columns
indexers = [StringIndexer(inputCol=column, outputCol=column + "_index", handleInvalid="keep") for column in string_columns]
encoders = [OneHotEncoder(inputCol=indexer.getOutputCol(), outputCol= indexer.getInputCol() + "_vec") for indexer in indexers]


13) RainTomorrow is the label, and all other fields are features

In [None]:
# Using vec_Assembler to assemble features into a single vector
vec_assembler = VectorAssembler(inputCols=[
    'MinTemp', 'MaxTemp', 'Rainfall', 'Evaporation', 'Sunshine', 'WindGustSpeed', 'WindSpeed9am', 'WindSpeed3pm', 'Humidity9am', 'Humidity3pm',
    'Pressure9am', 'Pressure3pm', 'Cloud9am', 'Cloud3pm', 'Temp9am', 'Temp3pm', 'Year', 'Month', 'Day',
    'Location_vec', 'WindGustDir_vec', 'WindDir9am_vec', 'WindDir3pm_vec', 'RainToday_vec'
], outputCol='features')
label_indexer = StringIndexer(inputCol="RainTomorrow", outputCol="RainTomorrowLabel")

15) Normalization of data

In [None]:
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors
from pyspark.sql import SparkSession

# Initialize MinMaxScaler
scaler = MinMaxScaler(inputCol="features", outputCol="normalized_features")


16) Training with Logistic Regression model on the training dataset (70% 30% split).

In [None]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
lr_model = LogisticRegression(featuresCol="normalized_features", labelCol="RainTomorrowLabel")

In [None]:
#Creating pipeline
pipeline = Pipeline(stages=indexers + encoders + [vec_assembler, scaler, label_indexer, lr_model])
pipeline.getStages()

[StringIndexer_dcab8e503e84,
 StringIndexer_d903dd2d7d60,
 StringIndexer_4e7a7d0aade5,
 StringIndexer_7e7c43c27a6d,
 StringIndexer_09fe9a17d25a,
 StringIndexer_39633e5fc6f0,
 OneHotEncoder_06176624f726,
 OneHotEncoder_d28fc6407e1b,
 OneHotEncoder_794b4c34abb7,
 OneHotEncoder_0c2cb50da65f,
 OneHotEncoder_73c53b1f2f50,
 OneHotEncoder_c0ff7184a721,
 VectorAssembler_8098424ee15e,
 MinMaxScaler_b4bdae6bc391,
 StringIndexer_c346ec105569,
 LogisticRegression_319a263e84b0]

In [None]:
#spliting the dataset
train_data, test_data = df_mc.randomSplit([0.7,.3], seed=64)


In [None]:
#printing no of rows in trainig and testing for the dataset
print("Number of rows in training set:", train_data.count())
print("Number of rows in testing set:", test_data.count())
fit_model = pipeline.fit(train_data)
results = fit_model.transform(test_data)


Number of rows in training set: 99610
Number of rows in testing set: 42583


In [None]:
#printing the results of the model
results.show(100)

+--------+--------------------+------------------+-------------------+-----------------+-----------------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+------------------+------------------+--------+--------+-------------------+------------------+---------+------------+----+-----+---+--------------+-----------------+----------------+----------------+---------------+------------------+---------------+---------------+---------------+---------------+-------------+----------------+--------------------+--------------------+-----------------+--------------------+--------------------+----------+
|Location|             MinTemp|           MaxTemp|           Rainfall|      Evaporation|         Sunshine|WindGustDir|WindGustSpeed|WindDir9am|WindDir3pm|WindSpeed9am|WindSpeed3pm|Humidity9am|Humidity3pm|       Pressure9am|       Pressure3pm|Cloud9am|Cloud3pm|            Temp9am|           Temp3pm|RainToday|RainTomorrow|Year|Month|Day|Location_index|

17) Predicting the RainTomorrow for the test set

In [None]:
#performing testing
my_eval = BinaryClassificationEvaluator(rawPredictionCol='prediction',
                                       labelCol='RainTomorrowLabel')
results.select('RainTomorrowLabel','prediction').show(100)

+-----------------+----------+
|RainTomorrowLabel|prediction|
+-----------------+----------+
|              0.0|       0.0|
|              0.0|       0.0|
|              0.0|       0.0|
|              0.0|       0.0|
|              0.0|       0.0|
|              0.0|       0.0|
|              0.0|       0.0|
|              0.0|       0.0|
|              0.0|       0.0|
|              0.0|       0.0|
|              0.0|       0.0|
|              0.0|       0.0|
|              0.0|       0.0|
|              0.0|       0.0|
|              0.0|       0.0|
|              0.0|       0.0|
|              0.0|       0.0|
|              0.0|       0.0|
|              0.0|       0.0|
|              0.0|       0.0|
|              1.0|       0.0|
|              0.0|       0.0|
|              0.0|       0.0|
|              0.0|       0.0|
|              0.0|       0.0|
|              0.0|       0.0|
|              0.0|       0.0|
|              0.0|       0.0|
|              0.0|       0.0|
|       

18) confusion matrix and TP, TN,
FP,FN and  accuracy and F1 score of your model.

In [None]:
from pyspark.mllib.evaluation import MulticlassMetrics

# Calculate and display the confusion matrix
confusion_matrix = MulticlassMetrics(results.select('prediction', 'RainTomorrowLabel').rdd.map(lambda row: (row.prediction, row.RainTomorrowLabel))).confusionMatrix().toArray()
print("Confusion Matrix:")
print(confusion_matrix)




Confusion Matrix:
[[31414.  1732.]
 [ 4631.  4806.]]


In [None]:
# Calculate key metrics and print
TP, TN, FP, FN = confusion_matrix[0][0], confusion_matrix[1][1], confusion_matrix[0][1], confusion_matrix[1][0]
print("TP:", TP, "\nTN:", TN, "\nFP:", FP, "\nFN:", FN)
accuracy, F1_score = (TP + TN) / (TP + TN + FP + FN), 2 * TP / (2 * TP + FP + FN)
print("Accuracy:", accuracy, "\nF1 Score:", F1_score)

TP: 31414.0 
TN: 4806.0 
FP: 1732.0 
FN: 4631.0
Accuracy: 0.8505741727919592 
F1 Score: 0.9080371724646269
