In [1]:
import numpy as np 
import pandas as pd 

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

/kaggle/input/weather-prediction/weather_prediction_dataset.csv
/kaggle/input/weather-prediction/metadata.txt
/kaggle/input/weather-prediction/weather_prediction_bbq_labels.csv


In this notebook, we use Spark Machine Learning library to work on this dataset. A related notebook that explored this dataset using ML techniques from sklearn and ANN/CNN from tensorflow can be found here:

https://www.kaggle.com/code/lorresprz/ann-cnn-randomforest-predicting-nice-weather



In [2]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l- \ done
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l- \ | done
[?25h  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=a36f7a9688cbbf00a1d815c3e46eeabb92788adc5fbb831bd02f8af5dbcf80b7
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [3]:
import pyspark

# Load dataset for weather prediction

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('weather_pred').getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/08 07:45:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
df1 = spark.read.csv('/kaggle/input/weather-prediction/weather_prediction_dataset.csv', header=True, inferSchema=True)
df2 = spark.read.csv('/kaggle/input/weather-prediction/weather_prediction_bbq_labels.csv',header=True, inferSchema=True)

                                                                                

There are 165 columns for the 18 cities in Europe. For each cities, multiple measurements including wind gust, wind speed, cloud cover, humidity, pressure, global radiation, precipitation, sunshine, minimum temperature, maximum temperature were recorded. Furthermore, the set of measurements varies from city to city (for example, some city might have wind speed in place of cloud cover or some might have wind gust in place of wind speed). For this notebook, we will focus on Maastricht, a beautiful Dutch city. The objective of this is to predict whether the weather is okay for to have a barbecue outdoors.

In [6]:
df1.printSchema()

root
 |-- DATE: integer (nullable = true)
 |-- MONTH: integer (nullable = true)
 |-- BASEL_cloud_cover: integer (nullable = true)
 |-- BASEL_humidity: double (nullable = true)
 |-- BASEL_pressure: double (nullable = true)
 |-- BASEL_global_radiation: double (nullable = true)
 |-- BASEL_precipitation: double (nullable = true)
 |-- BASEL_sunshine: double (nullable = true)
 |-- BASEL_temp_mean: double (nullable = true)
 |-- BASEL_temp_min: double (nullable = true)
 |-- BASEL_temp_max: double (nullable = true)
 |-- BUDAPEST_cloud_cover: integer (nullable = true)
 |-- BUDAPEST_humidity: double (nullable = true)
 |-- BUDAPEST_pressure: double (nullable = true)
 |-- BUDAPEST_global_radiation: double (nullable = true)
 |-- BUDAPEST_precipitation: double (nullable = true)
 |-- BUDAPEST_sunshine: double (nullable = true)
 |-- BUDAPEST_temp_mean: double (nullable = true)
 |-- BUDAPEST_temp_max: double (nullable = true)
 |-- DE_BILT_cloud_cover: integer (nullable = true)
 |-- DE_BILT_wind_speed: d

In [7]:
df2.printSchema()

root
 |-- DATE: integer (nullable = true)
 |-- BASEL_BBQ_weather: boolean (nullable = true)
 |-- BUDAPEST_BBQ_weather: boolean (nullable = true)
 |-- DE_BBQ_weather: boolean (nullable = true)
 |-- DRESDEN_BBQ_weather: boolean (nullable = true)
 |-- DUSSELDORF_BBQ_weather: boolean (nullable = true)
 |-- HEATHROW_BBQ_weather: boolean (nullable = true)
 |-- KASSEL_BBQ_weather: boolean (nullable = true)
 |-- LJUBLJANA_BBQ_weather: boolean (nullable = true)
 |-- MAASTRICHT_BBQ_weather: boolean (nullable = true)
 |-- MALMO_BBQ_weather: boolean (nullable = true)
 |-- MONTELIMAR_BBQ_weather: boolean (nullable = true)
 |-- MUENCHEN_BBQ_weather: boolean (nullable = true)
 |-- OSLO_BBQ_weather: boolean (nullable = true)
 |-- PERPIGNAN_BBQ_weather: boolean (nullable = true)
 |-- SONNBLICK_BBQ_weather: boolean (nullable = true)
 |-- STOCKHOLM_BBQ_weather: boolean (nullable = true)
 |-- TOURS_BBQ_weather: boolean (nullable = true)



# Choose a subset of the data

In [8]:
#pick the columns corresponding to Maastricht
df1.columns[80:91]

['MAASTRICHT_cloud_cover',
 'MAASTRICHT_wind_speed',
 'MAASTRICHT_wind_gust',
 'MAASTRICHT_humidity',
 'MAASTRICHT_pressure',
 'MAASTRICHT_global_radiation',
 'MAASTRICHT_precipitation',
 'MAASTRICHT_sunshine',
 'MAASTRICHT_temp_mean',
 'MAASTRICHT_temp_min',
 'MAASTRICHT_temp_max']

In [9]:
dfM = df1.select(['DATE','MAASTRICHT_cloud_cover',
 'MAASTRICHT_wind_speed',
 'MAASTRICHT_wind_gust',
 'MAASTRICHT_humidity',
 'MAASTRICHT_pressure',
 'MAASTRICHT_global_radiation',
 'MAASTRICHT_precipitation',
 'MAASTRICHT_sunshine',
 'MAASTRICHT_temp_mean',
 'MAASTRICHT_temp_min',
 'MAASTRICHT_temp_max'])

In [10]:
dfM2 = df2.select(['DATE','MAASTRICHT_BBQ_weather'])
dfM2.show()

+--------+----------------------+
|    DATE|MAASTRICHT_BBQ_weather|
+--------+----------------------+
|20000101|                 false|
|20000102|                 false|
|20000103|                 false|
|20000104|                 false|
|20000105|                 false|
|20000106|                 false|
|20000107|                 false|
|20000108|                 false|
|20000109|                 false|
|20000110|                 false|
|20000111|                 false|
|20000112|                 false|
|20000113|                 false|
|20000114|                 false|
|20000115|                 false|
|20000116|                 false|
|20000117|                 false|
|20000118|                 false|
|20000119|                 false|
|20000120|                 false|
+--------+----------------------+
only showing top 20 rows



In [11]:
import pyspark.sql.functions as F
from functools import reduce

#convert the Boolean True/Fase to 0 or 1
cols = ["MAASTRICHT_BBQ_weather"]
dfM2 = reduce(lambda df, c: dfM2.withColumn(c, F.when(df[c] == 'false', 0).otherwise(1)), cols, dfM2)


In [12]:
dfM2.show()

+--------+----------------------+
|    DATE|MAASTRICHT_BBQ_weather|
+--------+----------------------+
|20000101|                     0|
|20000102|                     0|
|20000103|                     0|
|20000104|                     0|
|20000105|                     0|
|20000106|                     0|
|20000107|                     0|
|20000108|                     0|
|20000109|                     0|
|20000110|                     0|
|20000111|                     0|
|20000112|                     0|
|20000113|                     0|
|20000114|                     0|
|20000115|                     0|
|20000116|                     0|
|20000117|                     0|
|20000118|                     0|
|20000119|                     0|
|20000120|                     0|
+--------+----------------------+
only showing top 20 rows



In [13]:
#Join the labels (True, False) from the second newly created dataframe to the first one
dfM2= dfM2.join(dfM, on=["DATE"])
dfM2.printSchema()

root
 |-- DATE: integer (nullable = true)
 |-- MAASTRICHT_BBQ_weather: integer (nullable = false)
 |-- MAASTRICHT_cloud_cover: integer (nullable = true)
 |-- MAASTRICHT_wind_speed: double (nullable = true)
 |-- MAASTRICHT_wind_gust: double (nullable = true)
 |-- MAASTRICHT_humidity: double (nullable = true)
 |-- MAASTRICHT_pressure: double (nullable = true)
 |-- MAASTRICHT_global_radiation: double (nullable = true)
 |-- MAASTRICHT_precipitation: double (nullable = true)
 |-- MAASTRICHT_sunshine: double (nullable = true)
 |-- MAASTRICHT_temp_mean: double (nullable = true)
 |-- MAASTRICHT_temp_min: double (nullable = true)
 |-- MAASTRICHT_temp_max: double (nullable = true)



# Convert data to Spark ML format

In [14]:
#The data format required by spark ML is (features, label) so we need to assemble all features into a feature vector. 
from pyspark.ml.feature import VectorAssembler



In [15]:
#create an instance of the Assembler which takes in a series of columns to be used as features
#and returns a condensed vector
assembler = VectorAssembler(inputCols = ['DATE','MAASTRICHT_cloud_cover',
 'MAASTRICHT_wind_speed',
 'MAASTRICHT_wind_gust',
 'MAASTRICHT_humidity',
 'MAASTRICHT_pressure',
 'MAASTRICHT_global_radiation',
 'MAASTRICHT_precipitation',
 'MAASTRICHT_sunshine',
 'MAASTRICHT_temp_mean',
 'MAASTRICHT_temp_min',
 'MAASTRICHT_temp_max',
 'MAASTRICHT_cloud_cover',
 'MAASTRICHT_wind_speed',
 'MAASTRICHT_wind_gust',
 'MAASTRICHT_humidity',
 'MAASTRICHT_pressure',
 'MAASTRICHT_global_radiation',
 'MAASTRICHT_precipitation',
 'MAASTRICHT_sunshine',
 'MAASTRICHT_temp_mean',
 'MAASTRICHT_temp_min',
 'MAASTRICHT_temp_max'], outputCol = 'features')

In [16]:
#Call the 'transform' method on the dataframe returns a new dataframe with the newly created 'features' column

data = assembler.transform(dfM2)

In [17]:
final_data = data.select(['features','MAASTRICHT_BBQ_weather'])
final_data.show()

23/12/08 07:45:25 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+--------------------+----------------------+
|            features|MAASTRICHT_BBQ_weather|
+--------------------+----------------------+
|[2.0000101E7,8.0,...|                     0|
|[2.0000102E7,7.0,...|                     0|
|[2.0000103E7,7.0,...|                     0|
|[2.0000104E7,8.0,...|                     0|
|[2.0000105E7,4.0,...|                     0|
|[2.0000106E7,6.0,...|                     0|
|[2.0000107E7,6.0,...|                     0|
|[2.0000108E7,7.0,...|                     0|
|[2.0000109E7,6.0,...|                     0|
|[2.000011E7,7.0,1...|                     0|
|[2.0000111E7,3.0,...|                     0|
|[2.0000112E7,5.0,...|                     0|
|[2.0000113E7,8.0,...|                     0|
|[2.0000114E7,8.0,...|                     0|
|[2.0000115E7,8.0,...|                     0|
|[2.0000116E7,8.0,...|                     0|
|[2.0000117E7,8.0,...|                     0|
|[2.0000118E7,8.0,...|                     0|
|[2.0000119E7,7.0,...|            

# Spark ML tree-based methods: DecisionTree, RandomForest

In [18]:
from pyspark.ml.classification import DecisionTreeClassifier,GBTClassifier,RandomForestClassifier
from pyspark.ml import Pipeline

In [19]:
train_data,test_data = final_data.randomSplit([0.7,0.3])

In [20]:
dtc = DecisionTreeClassifier(labelCol='MAASTRICHT_BBQ_weather',featuresCol='features')
rfc = RandomForestClassifier(labelCol='MAASTRICHT_BBQ_weather',featuresCol='features')
gbt = GBTClassifier(labelCol='MAASTRICHT_BBQ_weather',featuresCol='features')

In [21]:
# Train the models (its three models, so it might take some time)
dtc_model = dtc.fit(train_data)
rfc_model = rfc.fit(train_data)
gbt_model = gbt.fit(train_data)

In [22]:
#Call 'transform' on the test_data
#these will create 3 new dataframes for each model
dtc_predictions = dtc_model.transform(test_data)
rfc_predictions = rfc_model.transform(test_data)
gbt_predictions = gbt_model.transform(test_data)

In [23]:
#let's look at one of the newly created dataframes
rfc_predictions.show()

+--------------------+----------------------+--------------------+--------------------+----------+
|            features|MAASTRICHT_BBQ_weather|       rawPrediction|         probability|prediction|
+--------------------+----------------------+--------------------+--------------------+----------+
|[2.0000104E7,8.0,...|                     0|[19.9803464359417...|[0.99901732179708...|       0.0|
|[2.0000107E7,6.0,...|                     0|[19.8906189481361...|[0.99453094740680...|       0.0|
|[2.0000108E7,7.0,...|                     0|[19.9823033831042...|[0.99911516915521...|       0.0|
|[2.000011E7,7.0,1...|                     0|[19.9823033831042...|[0.99911516915521...|       0.0|
|[2.0000111E7,3.0,...|                     0|[19.8906189481361...|[0.99453094740680...|       0.0|
|[2.0000116E7,8.0,...|                     0|[19.9823033831042...|[0.99911516915521...|       0.0|
|[2.0000121E7,8.0,...|                     0|[19.9823033831042...|[0.99911516915521...|       0.0|
|[2.000012

# Accuracy evaluation

In [24]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [25]:
# Select (prediction, true label) and compute test error
acc_evaluator = MulticlassClassificationEvaluator(labelCol="MAASTRICHT_BBQ_weather", predictionCol="prediction", metricName="accuracy")

In [26]:
dtc_acc = acc_evaluator.evaluate(dtc_predictions)
rfc_acc = acc_evaluator.evaluate(rfc_predictions)
gbt_acc = acc_evaluator.evaluate(gbt_predictions)

In [27]:
print("Here are the results!")
print('-'*80)
print('A single decision tree had an accuracy of: {0:2.2f}%'.format(dtc_acc*100))
print('-'*80)
print('A random forest ensemble had an accuracy of: {0:2.2f}%'.format(rfc_acc*100))
print('-'*80)
print('A ensemble using GBT had an accuracy of: {0:2.2f}%'.format(gbt_acc*100))

Here are the results!
--------------------------------------------------------------------------------
A single decision tree had an accuracy of: 100.00%
--------------------------------------------------------------------------------
A random forest ensemble had an accuracy of: 100.00%
--------------------------------------------------------------------------------
A ensemble using GBT had an accuracy of: 100.00%


All methods have the accuracy of 1 (this is the same result obtained using RandomForestClassifier from sklearn.ensemble in the other notebook). 