In [None]:
#%%capture
#!sudo apt update
#!apt-get install openjdk-8-jdk-headless - qq > /dev/null
#!wget - q https: // dlcdn.apache.org/spark/spark-3.3.0/spark-3.3.0-bin-hadoop2.tgz
#!tar xf spark-3.3.0-bin-hadoop2.tgz
#!pip install - r requirements.txt

import os
from kaggle.api.kaggle_api_extended import KaggleApi
import pandas as pd
import numpy as np
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, FloatType
from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id, col, udf, rand
import matplotlib.pyplot as plt
import math
import pyspark.sql as ps
from zlib import crc32
import time as tm
from datetime import datetime as dt
from dateutil import parser
import itertools
from collections import defaultdict
from dataclasses import dataclass
from pyspark.sql import functions as F
from pyspark.rdd import RDD
from pyspark.broadcast import Broadcast
import findspark


In [None]:
path = './data'
worker_nodes = "*"
problem_to_solve = 'CANCELLED'

dataset_limit = 100000
use_all_dataset_frames = True
fold_number = 10
load_cached = True

def print_and_save_time(s: str):
  print(s)

# DA SCRIVERE
- perche' usiamo i dataframe invece degli rdd nella prima parte
- aggiungere k fold cross validation
- aggiungere griglia parametri
- aggiungere label stratification -> OK
- aggiungere performance modello pyspark
- aggiungere check e info extra su dataset di base (es sbilanciamento)
- auroc, auprc, f1, 
- confronto con tree classifier
- confrontare ogni pezzo con MLLib
- perche' sgd invece di no?

## Data Download

In [None]:
os.environ['KAGGLE_USERNAME'] = "davidetricella"
os.environ['KAGGLE_KEY'] = "e1ab3aae4a07f36b37a3a8bace74d9df"


dataset = 'yuanyuwendymu/airline-delay-and-cancellation-data-2009-2018'
path = './data'

In [None]:
def download_dataset():
    if not os.path.isdir(path):
        os.mkdir(path)
    if not os.listdir(path):
        try:
            api = KaggleApi()
            api.authenticate()
            api.dataset_download_files(dataset, path, unzip=True, quiet=False)
        except:
            print("Error downloading the dataset")

download_dataset()

## Data Loading

In [None]:
dataframe_schema = StructType([
    StructField('FL_DATE', StringType(), True),
    StructField('OP_CARRIER', StringType(), True),
    StructField('ORIGIN', StringType(), True),
    StructField('DEST', StringType(), True),
    StructField('CRS_DEP_TIME', StringType(), True),
    StructField('CRS_ARR_TIME', StringType(), True),
    StructField('CANCELLED', StringType(), True),
    StructField('DIVERTED', StringType(), True),
    StructField('CRS_ELAPSED_TIME', StringType(), True),
    StructField('DISTANCE', StringType(), True)
])

columns_to_get = [
    'FL_DATE',
    'OP_CARRIER',
    'ORIGIN',
    'DEST',
    'CRS_DEP_TIME',
    'CRS_ARR_TIME',
    'CANCELLED',
    'DIVERTED',
    'CRS_ELAPSED_TIME',
    'DISTANCE'
]

#Rounded max distance between airports found in the dataset
max_distance = 4970

findspark.init()
findspark.find()

spark = SparkSession.builder \
.appName("Airline Departure") \
.master('local[' + worker_nodes + ']') \
.getOrCreate()

context = spark.sparkContext

#The rows are loaded fom all files or only the first one with a limit value
def get_dataset(limit: float = -1, allFrames: bool = True):
    files = os.listdir(path)
    big_frame = spark.createDataFrame(
        spark.sparkContext.emptyRDD(), schema=dataframe_schema)
    if not allFrames:
        files = [files[0]]

    for f in files:
        if f.endswith('.csv'):
            frame = spark.read.option("header", True).csv(path + '/' + f)
            frame = frame.select(columns_to_get)
            frame = frame.orderBy(rand())

            if limit != -1:
                frame = frame.limit(limit)

            big_frame = frame.union(big_frame)

    big_frame = big_frame.withColumn(
        "id", monotonically_increasing_id()).orderBy(rand())
    big_frame.count()

    return big_frame

## Preprocessing

#### Chart Plotting Definitions

In [None]:
#These are the defintions of the charts plotted during data analysis, the main chart used is the bar charts, to have a grasp of the 
#various distributions of problematic flights regarding the most important columns chosen to train the model.
#During charts plotting, the data has been collected, taking advantage of the really low numbers of problematic flights in the dataset.

def plot_balancing_chart(data: ps.DataFrame, label: str):
  total_positives = data.filter(col(label) == 1).count()
  total_negatives = data.filter(col(label) == 0).count()
  fig, ax = plt.subplots()

  labels = ['REGULAR', label]
  counts = [total_negatives, total_positives]
  bar_colors = ['tab:blue', 'tab:red']

  ax.bar(labels, counts, color=bar_colors)

  ax.set_ylabel('Counts')
  ax.set_title('Regular flights and problematic flights counts')

  plt.show()


def plot_problematic_flights_per_carrier_chart(data: ps.DataFrame, label: str):
  problematic_flights = data.filter(col(label) == 1).collect()
  fig, ax = plt.subplots()

  flights_dict = defaultdict(int)
  labels = []
  counts = []

  for row in problematic_flights:
    flights_dict[row["OP_CARRIER"]] += 1

  for key in flights_dict:
    single_carrier_total_flights = data.filter(col("OP_CARRIER") == key).count()
    flights_dict[key] = flights_dict[key] / single_carrier_total_flights * 100

  mean = sum(flights_dict.values()) / len(flights_dict)

  for key in flights_dict:
    if flights_dict[key] > mean:
      labels.append(key)
      counts.append(flights_dict[key])
  labels.append("Mean")
  counts.append(mean)

  ax.bar(labels, counts)

  ax.set_ylabel('Percentages')
  ax.set_title('Problematic flights per carrier')

  plt.show()

def plot_problematic_flights_per_origin_chart(data: ps.DataFrame, label: str):
  problematic_flights = data.filter(col(label) == 1).collect()
  fig, ax = plt.subplots()
  fig.set_size_inches(18.5, 10.5)

  flights_dict = defaultdict(int)
  labels = []
  counts = []

  for row in problematic_flights:
    flights_dict[row["ORIGIN"]] += 1

  for key in flights_dict:
    single_origin_total_flights = data.filter(col("ORIGIN") == key).count()
    flights_dict[key] = flights_dict[key] / single_origin_total_flights * 100

  for key in flights_dict:
    condition = 5 if problem_to_solve == 'CANCELLED' else 1
    if flights_dict[key] > condition:
      labels.append(key)
      counts.append(flights_dict[key])
  labels.append("Majority")
  counts.append(1)

  ax.bar(labels, counts)

  ax.set_ylabel('Percentages')
  ax.set_title('Problematic flights percentage per origin')

  plt.show()


def plot_problematic_flights_per_month_chart(data: ps.DataFrame, label: str):
  problematic_flights = data.filter(col(label) == 1).collect()
  fig, ax = plt.subplots()

  flights_dict = defaultdict(int)
  labels = []
  counts = []

  for row in problematic_flights:
    month = parser.parse(row["FL_DATE"]).month
    flights_dict[month] += 1

  for key in flights_dict:
    condition_string = ("-0" + str(key) + "-") if key < 10 else ("-" + str(key) + "-")
    single_month_total_flights = data.filter(col("FL_DATE").contains(condition_string)).count()
    flights_dict[key] = flights_dict[key] / single_month_total_flights * 100

  for key in sorted(flights_dict):
    labels.append(key)
    counts.append(flights_dict[key])

  ax.bar(labels, counts)

  ax.set_ylabel('Percentages')
  ax.set_title('Problematic flights percentage per month')

  plt.show()


def plot_problematic_flights_per_weekday_chart(data: ps.DataFrame, label: str):
  problematic_flights = data.filter(col(label) == 1).collect()
  fig, ax = plt.subplots()

  flights_dict = defaultdict(int)
  labels = []
  counts = []

  for row in problematic_flights:
    weekday = parser.parse(row["FL_DATE"]).isoweekday()
    flights_dict[weekday] += 1    

  dates_frame = data.select("FL_DATE").collect()

  for key in flights_dict:
    single_weekday_total_flights = 0

    for row in dates_frame:
      parsed_value = parser.parse(row["FL_DATE"]).isoweekday()
      if parsed_value == key:
        single_weekday_total_flights += 1

    flights_dict[key] = flights_dict[key] / single_weekday_total_flights * 100

  for key in sorted(flights_dict):
    labels.append(key)
    counts.append(flights_dict[key])

  ax.bar(labels, counts)

  ax.set_ylabel('Percentages')
  ax.set_title('Problematic flights percentage per weekday')

  plt.show()


def plot_problematic_flights_per_origin_box(data: ps.DataFrame, label: str):
  problematic_flights = data.filter(col(label) == 1).collect()
  fig, ax = plt.subplots()

  flights_dict = defaultdict(int)
  labels = []
  counts = []

  for row in problematic_flights:
    flights_dict[row["ORIGIN"]] += 1

  for key in flights_dict:
    single_origin_total_flights = data.filter(col("ORIGIN") == key).count()
    flights_dict[key] = flights_dict[key] / single_origin_total_flights * 100

  for key in flights_dict:
    labels.append(key)
    counts.append(flights_dict[key])

  ax.boxplot(counts)

  ax.set_ylabel('Percentages')
  ax.set_title('Problematic flights percentage per origin')

  plt.show()


#### Dataset Reading

In [None]:
start_time = tm.time()
data = get_dataset(dataset_limit, use_all_dataset_frames).cache()

finish_time = tm.time() - start_time
print_and_save_time("Dataset reading concluded: " +
                    str(finish_time) + " seconds")

#### Null Rows Dropping

In [None]:
#Various tests showed that the rows with null values are extremely rare, so removing them form the dataset will not bring any significative loss of informations.

common_start_time = tm.time()

data = data.dropna(how='any')
print("Dataframe rows after NaN dropping: " + str(data.count()))

null_removal_finish_time = tm.time() - common_start_time
print_and_save_time("Null values removal concluded: " +
                    str(null_removal_finish_time) + " seconds")

#### Data Analysis

In [None]:
plot_balancing_chart(data, problem_to_solve)

In [None]:
plot_problematic_flights_per_carrier_chart(data, problem_to_solve)

In [None]:
plot_problematic_flights_per_origin_box(data, problem_to_solve)

In [None]:
#Based on the box plot, for the CANCELLED problem, we decided to aggregate in one column the airports with a percentage lower than 5 to improve chart readability.
#For the DIVERTED problem, the value 1 is enough.

plot_problematic_flights_per_origin_chart(data, problem_to_solve)

In [None]:
plot_problematic_flights_per_month_chart(data, problem_to_solve)

In [None]:
plot_problematic_flights_per_weekday_chart(data, problem_to_solve)

From the various charts plotted, we can see that, despite some outliers, the features don't seem to show any significant correlation with the cancellation or diversion of the flights.
The model will probably have a hard time differentiating between problematic and regular flights, due to the scarce impact of the features to determine the result of a flight.

#### Dataframe Balancing

In [None]:
#Due to the really low amount of problematic flights found in the dataset, the training set would be really
#unbalanced and probably lead to poor results, so to balance it we proceeded using the undersampling technique, limiting the number of normal flights to be taken.

start_time = tm.time()
irregular_flights = data.filter(col(problem_to_solve) == 1)

regular_flights = data.filter(col(problem_to_solve) == 0).limit(irregular_flights.count())

flight_ids = irregular_flights.rdd.map(lambda x: x.id).collect() + \
    regular_flights.rdd.map(lambda x: x.id).collect()

data = data.filter(data.id.isin(flight_ids)).orderBy(rand())
print("Balanced dataframe rows: " + str(data.count()))

finish_time = tm.time() - start_time
print_and_save_time("Dataset balancing concluded: " +
                    str(finish_time) + " seconds")

In [None]:
plot_balancing_chart(data, problem_to_solve)

#### Column Conversions

In [None]:
#The string and timestamps columns have been converted into float numbers, to keep the values between 0 and 1 various multipliers have been used.

columns_start_time = tm.time()

@udf(returnType=DoubleType())
def str_to_float(s: str):
  encoding = "utf-8"
  b = s.encode(encoding)
  return float(crc32(b) & 0xffffffff) / 2**32

date_multiplier: float = 1 / 365
@udf(returnType=DoubleType())
def date_to_day_of_year(date_string) -> float:
  date = dt.strptime(date_string, "%Y-%m-%d")
  day = date.timetuple().tm_yday - 1
  return day * date_multiplier

@udf(returnType=DoubleType())
def time_to_interval(time) -> float:
  t = int(float(time))
  h = t // 100
  m = t % 100
  t = h * 60 + m
  return float(t / 1140)

distance_multiplier = float(1) / float(max_distance)

data = data.select(
  (data.CANCELLED.cast('double')).alias("CANCELLED"),
  (data.DIVERTED.cast('double')).alias("DIVERTED"),
  str_to_float(data.OP_CARRIER).alias("OP_CARRIER"),
  str_to_float(data.ORIGIN).alias("ORIGIN"),
  str_to_float(data.DEST).alias("DEST"),
  date_to_day_of_year(data.FL_DATE).alias("FL_DATE"),
  time_to_interval(data.CRS_DEP_TIME).alias("CRS_DEP_TIME"),
  time_to_interval(data.CRS_ARR_TIME).alias("CRS_ARR_TIME"),
  time_to_interval(data.CRS_ELAPSED_TIME).alias("CRS_ELAPSED_TIME"),
  (data.DISTANCE.cast('double') * distance_multiplier).alias("DISTANCE"),
  data.id
)

data.count()

columns_finish_time = tm.time() - columns_start_time
print_and_save_time("Columns conversion concluded: " +
                    str(columns_finish_time) + " seconds")

#### Z Score Normalization

In [None]:
#The various columns have been normalized using the z-score method, subtracting every value for the mean of its column and dividing by the column standard deviation.

z_start_time = tm.time()
column_list = data.columns
column_mean_dict = dict()
column_stddv_dict = dict()

for c in column_list:
    column_mean_dict[c] = data.agg({c: 'mean'}).head()[0]
    column_stddv_dict[c] = data.agg({c: 'stddev'}).head()[0]

data = data.select(
  problem_to_solve,

  ((data.OP_CARRIER - column_mean_dict["OP_CARRIER"]) / column_stddv_dict["OP_CARRIER"]).alias('OP_CARRIER'),

  ((data.ORIGIN - column_mean_dict["ORIGIN"]) / column_stddv_dict["ORIGIN"]).alias('ORIGIN'),

  ((data.DEST - column_mean_dict["DEST"]) / column_stddv_dict["DEST"]).alias('DEST'),

  ((data.FL_DATE - column_mean_dict["FL_DATE"]) / column_stddv_dict["FL_DATE"]).alias('FL_DATE'),

  ((data.CRS_DEP_TIME - column_mean_dict["CRS_DEP_TIME"]) / column_stddv_dict["CRS_DEP_TIME"]).alias('CRS_DEP_TIME'),

  ((data.CRS_ARR_TIME - column_mean_dict["CRS_ARR_TIME"]) /  column_stddv_dict["CRS_ARR_TIME"]).alias('CRS_ARR_TIME'),

  ((data.CRS_ELAPSED_TIME - column_mean_dict["CRS_ELAPSED_TIME"]) / column_stddv_dict["CRS_ELAPSED_TIME"]).alias('CRS_ELAPSED_TIME'),

  ((data.DISTANCE - column_mean_dict["DISTANCE"]) / column_stddv_dict["DISTANCE"]).alias('DISTANCE'),

  data.id
)

data.count()

z_finish_time = tm.time() - z_start_time
print_and_save_time("Z score normalization concluded: " +
                    str(z_finish_time) + " seconds")

#### Preprocessed dataset Saving/Loading

In [None]:
def save_dataset(data):
  data.write.format('csv').option('header', True).mode('overwrite').option(
      'sep', ',').save('./preprocessed/' + problem_to_solve)
  print('Preprocessed dataset saved')

save_dataset(data)

In [None]:
def load_dataset():
    data = spark.read.format("csv") \
        .option("header", True) \
        .load('./preprocessed/' + problem_to_solve)

    print('Preprocessed dataset loaded')
    return data


if load_cached:
    data = load_dataset().cache()

#### Splitting into Folds

In [None]:
#During the main phases of preprocessing the Dataframe structure have been preferred, to use the columns name notation and the sql functions to perform transformations.
#The training part though found the RDD data structures to be more suitable for the tasks required, so a conversion has been accomplished.

start_time = tm.time()
folds: list[RDD] = []

def format_rdd(rdd: RDD) -> RDD:
    return rdd.map(lambda x: (float(x[0]), [float(y) for y in x[1:]]))

temp = data

k_elements_half_number = math.floor((temp.count() / fold_number) / 2)

i = 0
while i < fold_number:
    k_positives = temp.where(
        col(problem_to_solve) == 1).limit(k_elements_half_number)

    k_negatives = temp.where(
        col(problem_to_solve) == 0).limit(k_elements_half_number)

    k_ids = k_positives.rdd.map(lambda x: x.id).collect() + \
        k_negatives.rdd.map(lambda x: x.id).collect()

    k_sample = temp.filter(temp.id.isin(k_ids))
    k_sample = k_sample.drop(k_sample.id)

    k_sample = format_rdd(k_sample.rdd)

    folds.append(k_sample)
    temp = temp.filter(~temp.id.isin(k_ids))

    print("Split " + str(i + 1) + " of " + str(fold_number) + " completed")
    print("Dataframe rows: " + str(temp.count()))
    i += 1

finish_time = tm.time() - start_time
print_and_save_time("Dataset splitting concluded: " +
                    str(finish_time) + " seconds")


#### Bonus: Pandas

In [None]:
#To make a comparison between PySpark parallel processing and a normal sequential data processing, dataset loading and preprocessing has been performed also with the Pandas library.

def pandas_save_dataset(data):
    data.to_csv(path_or_buf=path + '/' + 'preprocessed.csv', index=False)
    print('Preprocessed dataset saved')

# Data Load

files = os.listdir(path)
data = pd.DataFrame()

for f in files:
    if f.endswith('.csv'):
        frame = pd.read_csv(filepath_or_buffer=path +
                            '/' + f, usecols=columns_to_get)
        frame.sample(frac=1)
        frame = frame.head(dataset_limit)
        data = pd.concat([data, frame])

data = data.dropna(how='any', axis='index')
print("Dataset acquisition completed")

# Problem Selection

irregulars = data.loc[data[problem_to_solve] == 1]
regulars = data.loc[data[problem_to_solve] == 0]

data = pd.concat([regulars.sample(len(irregulars)), irregulars]).sample(frac=1)

oppositeIndex = 'DIVERTED' if problem_to_solve == 'CANCELLED' else 'CANCELLED'
data = data.drop(oppositeIndex, axis=1)
print("Dataset balancing completed")

# Names Conversion

def str_to_float(s: str):
    encoding = "utf-8"
    b = s.encode(encoding)
    return float(crc32(b) & 0xffffffff) / 2**32

for c in ['OP_CARRIER', 'ORIGIN', 'DEST']:
    data[c] = data[c].apply(str_to_float)

# Dates Conversion

multiplier: float = 1 / 365

def date_to_day_of_year(date_string) -> float:
    date = dt.strptime(date_string, "%Y-%m-%d")
    day = date.timetuple().tm_yday - 1
    return day * multiplier

data["FL_DATE"] = data["FL_DATE"].apply(date_to_day_of_year)

# Time Conversion
    
def time_to_interval(time) -> float:
    t = int(float(time))
    h = t // 100
    m = t % 100
    t = h * 60 + m
    return float(t / 1140)

for c in ["CRS_DEP_TIME", "CRS_ARR_TIME", "CRS_ELAPSED_TIME"]:
    data[c] = data[c].apply(time_to_interval)

# Distance Conversion
    
multiplier: float = float(1) / float(max_distance)

data["DISTANCE"] = data["DISTANCE"].apply(lambda x: x * multiplier)

print("Dataset conversions completed")

#Z-score normalization

def z_score_normalize(x, m, s) -> float:
    return (x - m) / s

column_list = list(data)
column_list.remove(problem_to_solve)

for c in column_list:
    column_mean = data[c].mean()
    column_stddv = data[c].std()
    data[c] = data[c].apply(z_score_normalize, args=(column_mean, column_stddv))

print("Dataset normalization completed")
# Create Folds

folds = []

data = data.drop_duplicates()

irregulars = data.loc[data[problem_to_solve] == 1]
regulars = data.loc[data[problem_to_solve] == 0]

k_elements_half_number = round((len(data) / fold_number) / 2)

for i in range(1, fold_number + 1):
    k_irregulars_sample = irregulars.head(k_elements_half_number)
    k_regulars_sample = regulars.head(k_elements_half_number)
    k_sample = pd.concat([k_irregulars_sample, k_regulars_sample])

    folds.append(k_sample)
    irregulars = irregulars.drop(k_irregulars_sample.index)
    regulars = regulars.drop(k_regulars_sample.index)



## Models

### Generic Functions

In [None]:
def sigmoid(x):
    '''
    Calculates the sigmoid of the given data
    '''
    g = 1.0 / (1.0 + np.exp(-x))
    return g

def binary_cross_entropy(y, y_label, w, l2):
    '''
    Calculates the binary cross entropy loss of the calculated y and the given y_label
    '''
    loss = -np.mean(y_label*(np.log(y)) + (1-y_label)
                    * np.log(1-y)) + regularize(w, l2)
    return loss

def regularize(W, l2):
    '''
    Calculates the regularization term for the loss
    '''
    return (l2 / 2) * np.sum(np.square(W))

### Parallel Model

In [None]:
@dataclass
class ParallelLogisticRegression:
    iterations: int
    learning_rate: float
    batch_size: int
    l2: float
    W: Broadcast
    b: float

def parallel_initialize(self: ParallelLogisticRegression, feature_number: int):
    self.W = context.broadcast(np.random.rand(feature_number))
    self.b = np.random.rand()

def parallel_train(self: ParallelLogisticRegression, data: RDD):

    if self.batch_size != 0:
        num_chunks = data.count() // self.batch_size
        chunk_percent = 1/num_chunks
        batches = data.randomSplit([chunk_percent] * num_chunks)
    else:
        batches = [data]

    losses = []
    gradients = []

    for _ in range(self.iterations):
        _losses = []
        _gradients = []

        for batch in batches:
            batch = batch.cache()
            Y = parallel_evaluate(self, batch)
            _losses.append(parallel_binary_cross_entropy(self, batch.map(lambda x: x[0]).zip(Y)))
            (dW, db) = parallel_gradient(self, batch, Y)
            _gradients.append(dW)
            parallel_update(self, dW, db)
        losses.append(np.mean(_losses))
        gradients.append(np.mean(_gradients))

    return (losses, gradients)


def parallel_evaluate(self: ParallelLogisticRegression, X: RDD) -> RDD:
    Z: RDD = X.map(lambda x: np.dot(x[1], self.W.value))
    Z = Z.map(lambda x: sigmoid(x))
    return Z

def parallel_binary_cross_entropy(self: ParallelLogisticRegression, X_Y: RDD)-> float:
    L: RDD = X_Y.map(lambda y: y[0] * np.log(y[1]) + (1 - y[0]) * np.log(1 -  y[1]))
    return -L.reduce(lambda a, b: a + b)/L.count() + regularize(self.W.value, self.l2)

def parallel_gradient(self: ParallelLogisticRegression, X: RDD, Y: RDD)-> tuple[np.ndarray, np.ndarray]:
    m = X.count()
    dw = X.zip(Y).map(lambda x: np.dot((x[1] - x[0][0]), x[0][1])).reduce(lambda a, b: (a + b) * 1/m + self.W.value * self.l2)
    db = X.zip(Y).map(lambda x: x[1] - x[0][0]).reduce(lambda a, b: a + b) * 1/m
    return dw, db

def parallel_update(self: ParallelLogisticRegression, dW: list[float], db: float):
    self.W = context.broadcast(self.W.value - self.learning_rate * dW)
    self.b = self.b - self.learning_rate * db

### Serial Model

In [None]:
class SerialLogisticRegression():
    def __init__(self, iterations: int, learning_rate: float, batch_size: int, l2: float):
        self.iterations = iterations
        self.learning_rate = learning_rate
        self.batch_size = batch_size
        self.l2 = l2

    def initialize(self, columns_number):
        self.W = np.random.rand(columns_number)
        self.b = np.random.rand()

    def evaluate(self, X):
        Z = np.dot(X, self.W) + self.b
        Z = sigmoid(Z)
        return Z

    def gradient(self, X, Y, Y_label):
        '''
        Calculates the gradient w.r.t weights and bias
        '''

        # Number of training examples.
        m = X.shape[0]

        # Gradient of loss w.r.t weights with regularization
        dw = (1/m)*np.dot(X.T, (Y - Y_label)) + self.l2 * self.W

        # Gradient of loss w.r.t bias with regularization
        db = (1/m)*np.sum((Y - Y_label))

        return dw, db

    def update(self, dW, db):
        self.W = self.W - self.learning_rate * dW
        self.b = self.b - self.learning_rate * db

    def train(self, X, Y_labels):
        losses = []
        gradients = []

        if self.batch_size == 0:
            self.batch_size = X.shape[0]

        for _ in range(self.iterations):
            _losses = []
            _gradients = []
            for b in range(X.shape[0]//self.batch_size):
                b_X = X[b*self.batch_size:b*self.batch_size+self.batch_size, :]
                b_Y_labels = Y_labels[b*self.batch_size:b *
                                      self.batch_size+self.batch_size]
                Y = self.evaluate(b_X)
                _losses.append(binary_cross_entropy(
                    Y, b_Y_labels, self.W, self.l2))
                (dW, db) = self.gradient(b_X, Y, b_Y_labels)
                _gradients.append(dW)
                self.update(dW, db)
            losses.append(np.mean(_losses))
            gradients.append(np.mean(_gradients))

        return (losses, gradients)

## Experiments


In [None]:
def describe(labels: RDD, predictions: RDD):
    zipped = labels.zip(predictions)
    tn = zipped.filter(lambda x: round(x[0]) == 0 and round(x[1]) == 0).count()
    tp = zipped.filter(lambda x: round(x[0]) == 1 and round(x[1]) == 1).count()
    fn = zipped.filter(lambda x: round(x[0]) == 1 and round(x[1]) == 0).count()
    fp = zipped.filter(lambda x: round(x[0]) == 0 and round(x[1]) == 1).count()
    precision = tp / (tp + fp)
    recall = tp / (tp + fn)
    f1 = 2 * (precision * recall) / (precision + recall)
    acc = (tp + tn)/(tp + tn + fn + fp)
    specificity = tn/(tn+fp)
    return pd.DataFrame([[tn, tp, fn, fp, precision, recall, f1, acc, specificity]], columns=['TN', 'TP', 'FN', 'FP', 'Precision', 'Recall', 'F1', 'Accuracy', 'Specificity'])

In [None]:
def make_roc(labels: RDD, results: RDD):
    labels = labels.collect()
    results = results.collect()
    labels_and_results = sorted(list(zip(labels, results)), key=lambda x: x[1])

    labels_by_weights = np.array([k for (k, _) in labels_and_results])

    length = labels_by_weights.size

    true_positives = labels_by_weights.cumsum()

    num_positive = true_positives[-1]

    false_positives = np.arange(1.0, length + 1, 1.) - true_positives

    true_positives_rate = true_positives / num_positive
    false_positives_rate = false_positives / (length - num_positive)

    fig, ax = plt.subplots()
    ax.set_xlim(-.05, 1.05), ax.set_ylim(-.05, 1.05)
    ax.set_ylabel('True Positive Rate (Sensitivity)')
    ax.set_xlabel('False Positive Rate (1 - Specificity)')
    plt.plot(false_positives_rate, true_positives_rate,
             color='#8cbfd0', linestyle='-', linewidth=3.)
    plt.plot((0., 1.), (0., 1.), linestyle='--',
             color='#d6ebf2', linewidth=2.)
    plt.show()

def plot_loss_gradient(iterations, train_losses, gradients):
    fig, ax = plt.subplots()
    ax.set_xlabel('Iterations')
    ax.set_ylabel('Loss/Gradient')
    ax.plot(range(iterations), train_losses, label='Loss')
    ax.plot(range(iterations), gradients, label='Gradient')
    ax.grid()
    ax.legend()

    fig.clear()
    plt.close()

### Hyperparameters Tuning

In [None]:
grid: dict[str, list] = { 'iter': [10, 20, 50], 'lr': [0.001, 0.01, 0.1], 'l2': [1, 0.1, 0.01]}

#### K-Fold Cross Validation
The following code defines a base class with train and evaluation methods to apply the K-Fold Cross Validation to each model

In [None]:
import itertools

class Tuner:
    def __init__(self, grid: dict, model_factory) -> None:
        self.params = list(itertools.product(*grid.values()))
        self.model_factory = model_factory

    def search(self, input: list[RDD]):
        models = dict[float, (list, KFoldTrainer)]()
        for i in self.params:
            model: KFoldTrainer = self.model_factory(*i)
            test_loss, train_losses = model.train(input)
            print("Completed configuration " + '|'.join(map(lambda x: str(x), i)))
            models[test_loss] = (train_losses, model)
        lowest = min(list(models.keys()))
        return (lowest, *models.get(lowest))

class KFoldTrainer:
    def __init__(self, iterations, lr, l2, batch_size):
        self.iterations = iterations
        self.lr = lr
        self.l2 = l2
        self.batch_size = batch_size

    def train(self, data: list[RDD]):
        test_set = data[0]
        rest = data[1:]
        train_losses = []
        for i, fold in enumerate(rest):
            evaluation_set = fold
            remaining_folds= rest[:i] + rest[i + 1:]
            train_set = remaining_folds[0]
            for train_fold in remaining_folds[1:]:
                train_set = train_set.union(train_fold)
            self.train_impl(train_fold)
            loss = self.test_impl(evaluation_set)
            train_losses.append(loss)
        test_loss = self.test_impl(test_set)
        return test_loss, train_losses

In [None]:
# COMPARARE WEIGHTS

In [None]:
class ParallelModelEvaluator(KFoldTrainer):
    def __init__(self, iterations, lr, l2, batch_size):
        super().__init__(iterations, lr, l2, batch_size)
        self.model = ParallelLogisticRegression(self.iterations, self.lr, self.batch_size, self.l2, None, None)
        parallel_initialize(self.model, 8)

    def train_impl(self, train_data: RDD):
        return parallel_train(self.model, train_data)
    
    def test_impl(self, test_data: RDD):
        value: RDD = parallel_evaluate(self.model, test_data)
        return parallel_binary_cross_entropy(self.model, test_data.map(lambda x: x[0]).zip(value))

In [None]:
data = [np.array([x+1, x+2, x+3, x+12]) for x in [1,2,3,4]]
data = pd.DataFrame(data, columns= ['n_iters', 'lr', 'alpha', 'cost'])

import plotly.graph_objects as go

fig = go.Figure(data=[go.Scatter3d(
    x=data['lr'],
    y=data['l2'],
    z=data['iterations'],
    mode='markers',
    text=('lr', 'l2', 'iterations'),
    marker=dict(
        color=data['cost'], 
        colorscale='Viridis', 
        #opacity=1.0,
        #size=14, colorbar=dict(thickness=20)
    )
)])


# tight layout
fig.update_layout(
    autosize=False,
    width=600, 
    height=600,
    title='Hyperparameters space', 
    margin=dict(l=0, r=0, b=0, t=0),
    scene=dict(
        xaxis_title='learning rate',
        yaxis_title='alpha',
        zaxis_title='no. iters',
    ),
)

fig.show()

### Sequential Implementation

In [None]:
class SequentialEvaluator(KFoldTrainer):
    def __init__(self, iterations, lr, l2, batch_size):
        super().__init__(iterations, lr, l2, batch_size)
        self.model = SerialLogisticRegression(self.iterations, self.lr, self.batch_size, self.l2)
        self.model.initialize(8)

    def train_impl(self, train_data: RDD):
        return self.model.train(np.array(train_data.map(lambda x: x[1]).collect()), np.array(train_data.map(lambda x: x[0]).collect()))

    def test_impl(self, test_data: RDD):
        res = self.model.evaluate(np.array(test_data.map(lambda x: x[1]).collect()))
        return binary_cross_entropy(res, np.array(test_data.map(lambda x: x[0]).collect()), self.model.W, self.l2)


In [None]:
e: SequentialEvaluator = SequentialEvaluator(10, 0.1, 0.1, 0)
e.train(folds)

In [None]:
t = Tuner(grid, lambda it, lr, l2: SequentialEvaluator(it, lr, l2, 0))
test_loss, train_losses, best_model = t.search(folds)
print("Iterations {} Lr {} L2 {} Batch {} Loss {}".format(best_model.iterations, best_model.lr, best_model.l2, best_model.batch_size, test_loss))

In [None]:
t = Tuner(grid, lambda it, lr, l2: ParallelModelEvaluator(it, lr, l2, 0))
test_loss, train_losses, best_model = t.search(folds)
print("Iterations {} Lr {} L2 {} Batch {} Loss {}".format(best_model.iterations, best_model.lr, best_model.l2, best_model.batch_size, test_loss))

In [None]:
labels = folds[0].map(lambda x: x[0]).coalesce(1)
input = np.array(folds[0].collect())
predictions = context.parallelize(e.model.evaluate(input)).coalesce(1)
describe(labels, predictions)

In [None]:
make_roc(labels, predictions)

# Pyspark ML Implementations
The following cells use some state-of-the-art models provided by the Pyspark ML library, in particular their implementation of the Logistic Regression and Decision Tree models.

They should be most performant implementations of a distributed machine learning model

In [None]:
from pyspark.ml.feature import VectorAssembler

ml_data = data.select([col(c).cast('float').alias(c) if c != problem_to_solve else col(c).cast('float').alias('label') for c in data.columns[:-1]])

assembler = VectorAssembler(inputCols=ml_data.columns[1:-1], outputCol='features')

ml_data = assembler.transform(ml_data).select(['features', 'label'])

train_set, test_set = ml_data.randomSplit([0.9, 0.1])

In [None]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

class SparkMLTuner:
    def __init__(self, estimator, grid_builder: ParamGridBuilder, num_fold: int):
        self.evaluator = BinaryClassificationEvaluator()
        self.cv = CrossValidator(estimator=estimator, estimatorParamMaps=grid_builder.build(), evaluator=self.evaluator, numFolds=num_fold)

    def train(self, data: ps.DataFrame):
        self.model = self.cv.fit(data)
        return self.model.bestModel.params

    def test(self, data: ps.DataFrame):
        predictions = self.model.transform(data)
        return predictions, self.evaluator.evaluate(predictions)

### Logistic Regression

In [None]:
from pyspark.ml.classification import LogisticRegression

e = LogisticRegression(featuresCol='features', labelCol='label')
g = ParamGridBuilder().addGrid(e.regParam, grid['l2']).addGrid(e.maxIter, grid['iter'])
t1 = SparkMLTuner(e, g, 10)
t1.train(train_set)
predictions, loss = t1.test(test_set)

labels = test_set.select(col('label')).rdd.map(lambda x: float(x[0]))
predictions = predictions.select(col('prediction')).rdd.map(lambda x: float(x[0]))

describe(labels, predictions)

In [None]:
make_roc(labels, predictions)

### Decision Tree

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier
e = DecisionTreeClassifier(featuresCol='features', labelCol='label')
g = ParamGridBuilder()
t = SparkMLTuner(e, g, 10)
t.train(train_set)
predictions, loss = t.test(test_set)

labels = test_set.select(col('label')).rdd.map(lambda x: float(x[0]))
predictions = predictions.select(col('prediction')).rdd.map(lambda x: float(x[0]))

describe(labels, predictions)

In [None]:
make_roc(labels, predictions)