In [0]:
DATASET_PATH: str = 'dbfs:/bigdata_proj/datasets/historical-hourly-weather/'
MODELS_PATH: str = 'dbfs:/bigdata_proj/models/historical-hourly-weather/'
RANDOM_SEED: int = 35
LOAD_SAMPLED_DATASET = False
SAVE_COMPUTATIONS = True
SAMPLED_DATASET_PATH: str = f'{DATASET_PATH}aggregated_sampled_weather_measurements.csv'
MAX_TRAIN_SIZE: int = 999_999
LOAD_ECONDING_PIPELINE: bool = False
ENCODING_PIPELINE_PATH: str = f'{MODELS_PATH}data_encoder'
LOAD_PRETRAINED_MODELS = False

In [0]:
import pyspark
import numpy as np
import matplotlib.pyplot as plt

from pyspark import SparkContext, SparkConf
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.functions import mean as _mean
from pyspark.ml.pipeline import PipelineModel
from pyspark.ml.tuning import CrossValidatorModel

from typing import *

In [0]:
%sh
wget --no-verbose https://github.com/jaydhanani99/big-data-6111-43/raw/main/dataset/historical-hourly-weather-dataset.zip -O /tmp/dataset.zip
unzip -u /tmp/dataset.zip -d /tmp/dataset

2024-03-23 23:02:40 URL:https://raw.githubusercontent.com/jaydhanani99/big-data-6111-43/main/dataset/historical-hourly-weather-dataset.zip [12655281/12655281] -> "/tmp/dataset.zip" [1]


Archive:  /tmp/dataset.zip
   creating: /tmp/dataset/aggregated_sampled_weather_measurements.csv/
  inflating: /tmp/dataset/aggregated_sampled_weather_measurements.csv/._committed_7616641238230246128.crc  
  inflating: /tmp/dataset/aggregated_sampled_weather_measurements.csv/.part-00000-tid-5785058191842647654-99694b27-5637-4d82-97fd-79413e3b2b1a-5515-1-c000.csv.crc  
  inflating: /tmp/dataset/aggregated_sampled_weather_measurements.csv/.part-00006-tid-5785058191842647654-99694b27-5637-4d82-97fd-79413e3b2b1a-5521-1-c000.csv.crc  
  inflating: /tmp/dataset/aggregated_sampled_weather_measurements.csv/.part-00002-tid-5785058191842647654-99694b27-5637-4d82-97fd-79413e3b2b1a-5517-1-c000.csv.crc  
  inflating: /tmp/dataset/aggregated_sampled_weather_measurements.csv/.part-00004-tid-5785058191842647654-99694b27-5637-4d82-97fd-79413e3b2b1a-5519-1-c000.csv.crc  
  inflating: /tmp/dataset/aggregated_sampled_weather_measurements.csv/part-00003-tid-5785058191842647654-99694b27-5637-4d82-97fd-79413

In [0]:
for file in dbutils.fs.ls('file:/tmp/dataset'):
    dbutils.fs.mv(file.path, f'{DATASET_PATH}{file.name}', recurse=True)

In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# rest of your code
weather_conditions_df = spark.read.csv(f'{DATASET_PATH}weather_description.csv', header=True, inferSchema=True)
humidity_df = spark.read.csv(f'{DATASET_PATH}humidity.csv', header=True, inferSchema=True)
pressure_df = spark.read.csv(f'{DATASET_PATH}pressure.csv', header=True, inferSchema=True)
temperature_df = spark.read.csv(f'{DATASET_PATH}temperature.csv', header=True, inferSchema=True)
city_attributes_df = spark.read.csv(f'{DATASET_PATH}city_attributes.csv', header=True, inferSchema=True)
wind_direction_df = spark.read.csv(f'{DATASET_PATH}wind_direction.csv', header=True, inferSchema=True)
wind_speed_df = spark.read.csv(f'{DATASET_PATH}wind_speed.csv', header=True, inferSchema=True)

In [0]:
DATETIME_COL = 'datetime'
HUMIDITY_COL = 'humidity'
PRESSURE_COL = 'pressure'
TEMPERATURE_COL = 'temperature'
WIND_DIRECTION_COL = 'wind_direction'
WIND_SPEED_COL = 'wind_speed'
LATITUDE_COL = 'latitude'
LONGITUDE_COL = 'longitude'
CITY_COL = 'city'
COUNTRY_COL = 'country'
WEATHER_CONDITION_COL = 'weather_condition'

In [0]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import col

def filter_dataframe_by_city_column(dataframe: DataFrame,
                                    city_name: str,
                                    new_column_name: str) -> DataFrame:
    '''
    Args:
        - dataframe: a `DataFrame` with a datetime column and n cities columns,
                     where the records are the related hourly measurements
        - city_name: city name between the ones in the dataframe
        - new_column_name: name to replace the city name
        
    Returns: 
        a new `DataFrame` with:
            - the datetime column
            - a single column of measurements related to the `city_name`
              and renamed as `new_column_name`
    '''
    return dataframe.withColumn(new_column_name, col(city_name)) \
                    .select([DATETIME_COL, new_column_name])

In [0]:
def join_dataframes(dataframes: List[DataFrame], column_name: str) -> DataFrame:
    '''
    Args:
        - dataframse: a list of `DataFrame` to be joined
        - column_name: the column over which the records should be joined
        
    Returns:
        a new dataframes resulting from the join of all the dataframes
        over the `column_name` column
    '''
    joined_df = dataframes[0]

    for dataframe in dataframes[1:]:
        joined_df = joined_df.join(dataframe, [column_name])

    return joined_df

In [0]:
from pyspark.sql import Row

# Initialize the main DataFrame to store weather measurements
weather_measurements_df = None

# Collect city attributes as a list
city_attributes_list = city_attributes_df.collect()

# Iterate over each city and its attributes
for row in city_attributes_list:
    # Extract attributes for the current city
    city = row.City
    country = row.Country
    latitude = row.Latitude
    longitude = row.Longitude

    # Filter dataframes for each weather measurement by city
    dataframes = [
        filter_dataframe_by_city_column(humidity_df, city, HUMIDITY_COL),
        filter_dataframe_by_city_column(pressure_df, city, PRESSURE_COL),
        filter_dataframe_by_city_column(temperature_df, city, TEMPERATURE_COL),
        filter_dataframe_by_city_column(wind_direction_df, city, WIND_DIRECTION_COL),
        filter_dataframe_by_city_column(wind_speed_df, city, WIND_SPEED_COL),
        filter_dataframe_by_city_column(weather_conditions_df, city, WEATHER_CONDITION_COL)
    ]

    # Join filtered dataframes based on datetime column and add city attributes as columns
    joined_df = join_dataframes(dataframes, DATETIME_COL) \
        .withColumn(CITY_COL, lit(city)) \
        .withColumn(COUNTRY_COL, lit(country)) \
        .withColumn(LATITUDE_COL, lit(latitude)) \
        .withColumn(LONGITUDE_COL, lit(longitude))

    # Aggregate the DataFrames computed for each city into the main DataFrame 
    # by appending them iteratively, ensuring all city measurements are combined.
    weather_measurements_df = weather_measurements_df.union(joined_df) if weather_measurements_df is not None else joined_df

In [0]:
not_null_weather_measurements_df = weather_measurements_df.dropna()

In [0]:
def get_weather_conditions_aggregation_dict(weather_conditions: Iterable[str]) -> Dict[str, str]:
    '''
    Args:
        - weather_conditions: an iterable collection of string weather conditions to be aggregated

    Returns:
        a dictionary that maps from the original weather condition name to one of the following categories:
            - thunderstorm
            - rainy
            - snowy
            - cloudy
            - foggy
            - sunny
    '''
    
    # Initialize an empty dictionary to store aggregated weather conditions
    weather_conditions_dict = dict()
  
    # Iterate over each weather condition
    for weather_condition in weather_conditions:
  
        # Convert weather condition to lowercase for case-insensitive matching
        weather_condition_lowered = weather_condition.lower()

        # Check for keywords in weather condition to assign category
        if any(key in weather_condition_lowered for key in ['squall', 'thunderstorm']):
            weather_conditions_dict[weather_condition] = 'thunderstorm'
        elif any(key in weather_condition_lowered for key in ['drizzle', 'rain']):
            weather_conditions_dict[weather_condition] = 'rainy'
        elif any(key in weather_condition_lowered for key in ['sleet', 'snow']):
            weather_conditions_dict[weather_condition] = 'snowy'
        elif 'cloud' in weather_condition_lowered:
            weather_conditions_dict[weather_condition] = 'cloudy'
        elif any(key in weather_condition_lowered for key in ['clear', 'sun']):
            weather_conditions_dict[weather_condition] = 'sunny'
            
    return weather_conditions_dict

In [0]:
weather_conditions_all = not_null_weather_measurements_df \
    .select(WEATHER_CONDITION_COL) \
    .distinct() \
    .rdd.map(lambda row: row[0]) \
    .collect()

In [0]:
weather_conditions_dict = get_weather_conditions_aggregation_dict(weather_conditions_all)

In [0]:
weather_measurements_aggregated_df = not_null_weather_measurements_df.replace(weather_conditions_dict)

In [0]:
WEATHER_CONDITIONS = set(weather_conditions_dict.values())

weather_measurements_aggregated_df = weather_measurements_aggregated_df \
    .filter(weather_measurements_aggregated_df[WEATHER_CONDITION_COL].isin(WEATHER_CONDITIONS))

In [0]:
def count_weather_condition_occurrences(dataframe: DataFrame, class_name: str) -> int:
    '''
    Args:
        - dataframe: a `DataFrame` which contains a column `WEATHER_CONDITION_COL`
        - class_name: the class name to count the occurences of
        
    Returns:
        the total number of `class_name` occurences inside `dataframe`
    '''
    return dataframe.filter(dataframe[WEATHER_CONDITION_COL] == class_name).count()

In [0]:
def get_undersampling_fracs(dataframe: DataFrame) -> Dict[str, float]:
    '''
    Args:
        - dataframe: a `DataFrame` of weather measurements which contains a column `WEATHER_CONDITION_COL`
        
    Returns:
        a dictionary that goes from a weather condition to its fraction
        that should be sampled in order to match the occurrences of the minority class
    '''

    rainy_cnt = count_weather_condition_occurrences(dataframe, 'rainy')
    snowy_cnt = count_weather_condition_occurrences(dataframe, 'snowy')
    sunny_cnt = count_weather_condition_occurrences(dataframe, 'sunny')
    cloudy_cnt = count_weather_condition_occurrences(dataframe, 'cloudy')
    thunderstorm_cnt = count_weather_condition_occurrences(dataframe, 'thunderstorm')

    minority_class_cnt = np.min(
        [rainy_cnt, snowy_cnt, sunny_cnt, cloudy_cnt, thunderstorm_cnt]
    )

    return {
        'rainy': minority_class_cnt / rainy_cnt if rainy_cnt != 0 else 0,
        'snowy': minority_class_cnt / snowy_cnt if snowy_cnt != 0 else 0,
        'sunny': minority_class_cnt / sunny_cnt if sunny_cnt != 0 else 0,
        'cloudy': minority_class_cnt / cloudy_cnt if cloudy_cnt != 0 else 0,
        'thunderstorm': minority_class_cnt / thunderstorm_cnt if thunderstorm_cnt != 0 else 0
    }

In [0]:
sampled_weather_measurements_df = weather_measurements_aggregated_df.sampleBy(WEATHER_CONDITION_COL,
                                                                           fractions=get_undersampling_fracs(weather_measurements_aggregated_df),
                                                                           seed=RANDOM_SEED)
if LOAD_SAMPLED_DATASET:
    sampled_weather_measurements_df = spark.read.csv(SAMPLED_DATASET_PATH, header=True, inferSchema=True)

In [0]:
if SAVE_COMPUTATIONS and not LOAD_SAMPLED_DATASET:
    sampled_weather_measurements_df.write.csv(SAMPLED_DATASET_PATH,
                                              mode='overwrite',
                                              header=True)

In [0]:
train_df, test_df = sampled_weather_measurements_df.randomSplit([0.8, 0.2], seed=RANDOM_SEED)

In [0]:
train_df = train_df.limit(MAX_TRAIN_SIZE)

In [0]:
print(f'Train set size:  {train_df.count()} instances')
print(f'Test set size:   {test_df.count()} instances')

Train set size:  43155 instances
Test set size:   10956 instances


In [0]:

NUMERICAL_FEATURES = [HUMIDITY_COL,
                      PRESSURE_COL,
                      TEMPERATURE_COL,
                      WIND_DIRECTION_COL,
                      WIND_SPEED_COL,
                      LATITUDE_COL,
                      LONGITUDE_COL]

CATEGORICAL_FEATURES = []

TARGET_VARIABLE_COL = WEATHER_CONDITION_COL
PREDICTED_TARGET_VARIABLE_COL = f'predicted_{TARGET_VARIABLE_COL}'

LABEL_COL = 'label'
PREDICTION_COL = 'prediction'

FEATURES_COL = 'features'
SCALED_FEATURES_COL = f'scaled_{FEATURES_COL}'

In [0]:

def save_pyspark_model(model, path: str, append_datetime: bool = True) -> None:
    
    if append_datetime:
        from datetime import datetime
        path += '-' + datetime.now().strftime("%Y%d%m-%H%M%S")
    
    model.write().overwrite().save(path)

In [0]:
def encoding_pipeline(dataframe: DataFrame,
                      numerical_features: List[str],
                      categorical_features: List[str],
                      target_variable: str,
                      with_std: bool = True,
                      with_mean: bool = False) -> PipelineModel:
    '''
    Args:
        - dataframe: the input `DataFrame` to fit the pipeline
        - numerical_features: the list of column names in `dataframe` corresponding to numerical features
        - categorical_features: the list of column names in `dataframe` corresponding to categorical features
        - target_variable: the column name in `dataframe` corresponding to the target variable
        - with_std: whether to scale the data to unit standard deviation or not (True by default)
        - with_mean: whether to center the data with mean before scaling (False by default)

    Returns:
        the encoding pipeline fitted with `dataframe`
    '''
    
    from pyspark.ml import Pipeline
    from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler

    # Indexing, i.e. transform to numerical values, the target column and rename it as the variable `LABEL_COL`
    label_indexer = StringIndexer(inputCol=target_variable, outputCol=LABEL_COL)
    
    # Create a list of indexers, one for each categorical feature
    indexers = [StringIndexer(inputCol=c, outputCol=f'{c}_indexed', handleInvalid='keep') for c in categorical_features]

    # Create the one-hot encoder for the list of features just indexed (this encoder will keep any unseen label in the future)
    encoder = OneHotEncoder(inputCols=[indexer.getOutputCol() for indexer in indexers], 
                            outputCols=[f'{indexer.getOutputCol()}_encoded' for indexer in indexers], 
                            handleInvalid='keep')
    
    # Assemble all the features (both one-hot-encoded categorical and numerical) into a single vector
    features = encoder.getOutputCols() + numerical_features    
    assembler = VectorAssembler(inputCols=features, outputCol=FEATURES_COL)
    
    # Create a second feature column with the data scaled accordingly to `withStd` and `withMean`
    scaler = StandardScaler(inputCol=assembler.getOutputCol(), outputCol=SCALED_FEATURES_COL, withStd=with_std, withMean=with_mean)

    stages = [label_indexer] + indexers + [encoder] + [assembler] + [scaler]
    pipeline = Pipeline(stages=stages)

    transformer = pipeline.fit(dataframe)

    return transformer

In [0]:

data_encoder = PipelineModel.load(ENCODING_PIPELINE_PATH) if LOAD_ECONDING_PIPELINE \
               else encoding_pipeline(train_df, NUMERICAL_FEATURES, CATEGORICAL_FEATURES, TARGET_VARIABLE_COL)

In [0]:
encoded_train_df = data_encoder.transform(train_df)
encoded_test_df = data_encoder.transform(test_df)


In [0]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Assuming that encoded_train_df and encoded_test_df are already transformed by your encoding_pipeline

# Decision Tree model for multiclass classification
decision_tree = DecisionTreeClassifier(featuresCol=SCALED_FEATURES_COL, labelCol=LABEL_COL)

# Fit the model on the encoded training data
dt_model = decision_tree.fit(encoded_train_df)

# Make predictions on the encoded test data
dt_predictions = dt_model.transform(encoded_test_df)

# Show some predictions
dt_predictions.select("prediction", LABEL_COL, SCALED_FEATURES_COL).show(5)

# Evaluate the model
evaluator = MulticlassClassificationEvaluator(
    labelCol=LABEL_COL, predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(dt_predictions)
print(f"Test Accuracy = {accuracy}")

# You can also evaluate other metrics such as F1-score, Precision, Recall by changing the `metricName` parameter in `MulticlassClassificationEvaluator`


+----------+-----+--------------------+
|prediction|label|     scaled_features|
+----------+-----+--------------------+
|       1.0|  0.0|[3.65643383571563...|
|       4.0|  0.0|[3.89698869332850...|
|       4.0|  4.0|[3.89698869332850...|
|       4.0|  2.0|[4.4743203515994,...|
|       2.0|  4.0|[4.4743203515994,...|
+----------+-----+--------------------+
only showing top 5 rows

Test Accuracy = 0.5545819642205184


In [0]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
# Parameter grid for hyperparameter tuning
paramGrid = ParamGridBuilder() \
    .addGrid(decision_tree.maxDepth, [5, 10, 15]) \
    .addGrid(decision_tree.maxBins, [32, 40, 50]) \
    .addGrid(decision_tree.minInstancesPerNode, [1, 5, 10]) \
    .build()

# Evaluator for cross-validation
evaluator = MulticlassClassificationEvaluator(labelCol=LABEL_COL, predictionCol="prediction", metricName="accuracy")

# Cross-validator
crossval = CrossValidator(estimator=decision_tree,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5)  # Use 3+ folds in practice

# Run cross-validation and choose the best set of parameters
cvModel = crossval.fit(encoded_train_df)

# Make predictions on test data using the best model found
cvPredictions = cvModel.transform(encoded_test_df)

# Evaluate the best model's performance on the test data
testAccuracy = evaluator.evaluate(cvPredictions)
print(f"Test Accuracy: {testAccuracy}")

Test Accuracy: 0.5953815261044176
