# Import libraries

In [17]:
import os
import numpy as np
import matplotlib.pyplot as plt

import findspark
findspark.init()
findspark.find() 

import databricks.koalas as ks
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, StandardScaler
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql import SparkSession

In [2]:
DATASET_PATH = 'datasets/historical-hourly-weather-dataset/'
AGGREGATED_DATASET_PATH = 'datasets/historical-hourly-weather-dataset/aggregated_sampled_weather_measurements'

# Pre-processing dataset

### Load dataset into Koalas `DataFrame` objects

In [None]:
city_attributes_df = ks.read_csv(os.path.join(DATASET_PATH, 'city_attributes.csv'))
humidity_df = ks.read_csv(os.path.join(DATASET_PATH, 'humidity.csv'))
pressure_df = ks.read_csv(os.path.join(DATASET_PATH, 'pressure.csv'))
temperature_df = ks.read_csv(os.path.join(DATASET_PATH, 'temperature.csv'))
weather_description_df = ks.read_csv(os.path.join(DATASET_PATH, 'weather_description.csv'))
wind_direction_df = ks.read_csv(os.path.join(DATASET_PATH, 'wind_direction.csv'))
wind_speed_df = ks.read_csv(os.path.join(DATASET_PATH, 'wind_speed.csv'))

### Combine all data into one `DataFrame`

# Machine Learning pipeline

### Load data

In [3]:
# Get all the csv files in the aggregated dataset folder
csv_files = [file for file in os.listdir(AGGREGATED_DATASET_PATH) if file.endswith('.csv')]

# Read each CSV file into a Koalas DataFrame and store them in a list
dfs = [ks.read_csv(os.path.join(AGGREGATED_DATASET_PATH, file)) for file in csv_files]

# Combine the DataFrames using the concat function
data = ks.concat(dfs, ignore_index = True)

your 131072x1 screen size is bogus. expect trouble
23/11/10 16:39:20 WARN Utils: Your hostname, DASH resolves to a loopback address: 127.0.1.1; using 172.28.34.48 instead (on interface eth0)
23/11/10 16:39:21 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/10 16:39:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

### Pre-processing

Select relevant features and label column

In [4]:
# Select relevant features
numerical_cols = [
    'humidity',
    'pressure',
    'temperature',
    'wind_direction',
    'wind_speed',
    'latitude',
    'longitude'
]
nominal_cols = []
# Select the label column
label_col = 'weather_condition'
# Select the features and the label
df_selected = data[numerical_cols + [label_col]]

In [5]:
spark = SparkSession.builder.getOrCreate()
df_selected = df_selected.to_spark()

Train-Test split

In [6]:
train_data, test_data = df_selected.randomSplit([0.8, 0.2], seed = 42)

Encode

In [7]:
def encode(
    df,
    numerical_cols = [],
    nominal_cols = [],
    label_col = '',
    with_std = True,
    with_mean = True,
):
    # Convert categorical label to numerical label
    label_indexer = StringIndexer(
        inputCol = label_col,
        outputCol = 'label',
        handleInvalid = 'keep'
    )
    
    # Assemble features into a vector
    feature_cols = numerical_cols + nominal_cols
    vector_assembler = VectorAssembler(
        inputCols = feature_cols,
        outputCol = 'raw_features'
    )
    
    # Scale the features
    scaler = StandardScaler(
        inputCol = 'raw_features',
        outputCol = 'scaled_features',
        withStd = with_std,
        withMean = with_mean
    )
    
    stages = [label_indexer, vector_assembler, scaler]
    pipeline = Pipeline(stages = stages)
    
    transformer = pipeline.fit(df)
    
    return transformer

data_encoder = encode(
    df = df_selected,
    numerical_cols = numerical_cols,
    nominal_cols = nominal_cols,
    label_col = label_col
)

                                                                                

### Build a `PySpark` Machine Learning pipeline

Define the classifier

In [32]:
mode = 'LogisticRegression'

In [33]:
if mode == 'RandomForest':
    classifier = RandomForestClassifier(
        featuresCol = 'scaled_features',
        labelCol = 'label',
        numTrees = 30
    )
elif mode == 'LogisticRegression':
    classifier = LogisticRegression(
        featuresCol = 'scaled_features',
        labelCol = 'label'
    )

Define the pipeline with the encoding and classifier stages

In [34]:
pipeline = Pipeline(stages = [data_encoder, classifier])

Define the evaluator

In [35]:
evaluator = MulticlassClassificationEvaluator(
    labelCol = 'label',
    predictionCol = 'prediction',
    metricName = 'accuracy'
)

Define hyperparameter tuning (optional)

In [36]:
# Define the grid of hyperparameters
param_grid = ParamGridBuilder().build()

# Set up the cross validator for model training and hyperparameter tuning
cross_validator = CrossValidator(
    estimator = pipeline,
    estimatorParamMaps = param_grid,
    evaluator = evaluator,
    numFolds = 5
)

### Start training

Fit the model using the training data

In [37]:
model = cross_validator.fit(train_data)

23/11/10 16:47:02 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS


Make predictions on the test data

In [38]:
predictions = model.transform(test_data)

Evaluate the model performance

In [39]:
accuracy = evaluator.evaluate(predictions)
print('Accuracy:', accuracy)

Accuracy: 0.48947995236204844
