# Distributed Tuning on Apache Spark for Aircraft Passenger forecasting

### Loading Libraries

In [1]:
%conda update -n base -c defaults conda

Error while loading conda entry point: anaconda-cloud-auth (cannot import name 'ChannelAuthBase' from 'conda.plugins.types' (/Users/isisromero/anaconda3/lib/python3.11/site-packages/conda/plugins/types.py))
Error while loading conda entry point: anaconda-cloud-auth (cannot import name 'ChannelAuthBase' from 'conda.plugins.types' (/Users/isisromero/anaconda3/lib/python3.11/site-packages/conda/plugins/types.py))
Retrieving notices: ...working... done
Collecting package metadata (current_repodata.json): done
Solving environment: done

## Package Plan ##

  environment location: /Users/isisromero/anaconda3

  added / updated specs:
    - conda


The following packages will be downloaded:

    package                    |            build
    ---------------------------|-----------------
    aiobotocore-2.12.3         |  py311hca03da5_0         159 KB
    aiohttp-3.9.5              |  py311h80987f9_0         813 KB
    aioitertools-0.7.1         |     pyhd3eb1b0_0          20 KB
    alabast

In [48]:
# Operation System 
import os
import uuid
import glob
import shutil
from os import devnull
from functools import partial
from contextlib import contextmanager, redirect_stdout, redirect_stderr

# Core (Python) functionality imports
import copy

# Date & Time
from datetime import datetime
from dateutil.relativedelta import relativedelta

# Math
import math

# Numerical Computing
import numpy as np

# Data Manipulation 
import pandas as pd

# PySpark
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import lit, concat_ws, col, to_date


# Statistical Models
from statsmodels.tsa.stattools import adfuller
from statsmodels.tsa.holtwinters import ExponentialSmoothing
from statsmodels.tsa.vector_ar.var_model import VAR
from statsmodels.tsa.seasonal import seasonal_decompose
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf

# Scikit Learn
from sklearn.metrics import explained_variance_score, mean_absolute_error, median_absolute_error, mean_squared_error, r2_score

# Hyperparameter Tuning 
from hyperopt import fmin, hp, tpe, SparkTrials, space_eval, STATUS_OK, Trials
from hyperopt.pyll import scope as ho_scope
from hyperopt.pyll.stochastic import sample as ho_sample

# DataVisualization imports
import seaborn as sns
import matplotlib.pylab as plt
from matplotlib import gridspec as gs

# MLflow 
import mlflow

#### Setting Data

In [49]:
directory = '/Users/isisromero/Desktop/MLEIA/chap_08/'
for file_name in glob.glob(directory + '*'):
    print(file_name)

/Users/isisromero/Desktop/MLEIA/chap_08/nyc.csv
/Users/isisromero/Desktop/MLEIA/chap_08/DTAS.ipynb


In [50]:
os.chdir('/Users/isisromero/Desktop/MLEIA/chap_08')

In [51]:
!curl -O 'https://raw.githubusercontent.com/alan-turing-institute/TCPD/master/datasets/jfk_passengers/air-passenger-traffic-per-month-port-authority-of-ny-nj-beginning-1977.csv'

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 56665  100 56665    0     0   200k      0 --:--:-- --:--:-- --:--:--  201k


In [52]:
!head --lines=10 '/Users/isisromero/Desktop/MLEIA/chap_08/air-passenger-traffic-per-month-port-authority-of-ny-nj-beginning-1977.csv'

Airport Code,Year,Month,Domestic Passengers,International Passengers,Total Passengers
ACY,2015,Jan,98177,90,98267
ACY,2015,Feb,96431,65,96496
ACY,2015,Mar,116493,197,116690
ACY,2015,Apr,105539,161,105700
ACY,2015,May,103668,425,104093
ACY,2015,Jun,96259,1102,97361
ACY,2015,Jul,109247,1894,111141
ACY,2015,Aug,108700,1932,110632
ACY,2015,Sep,82268,315,82583


In [53]:
file_path = '/Users/isisromero/Desktop/MLEIA/chap_08/air-passenger-traffic-per-month-port-authority-of-ny-nj-beginning-1977.csv'

with open(file_path, 'r') as file:
    for _ in range(10):
        print(file.readline().strip())

Airport Code,Year,Month,Domestic Passengers,International Passengers,Total Passengers
ACY,2015,Jan,98177,90,98267
ACY,2015,Feb,96431,65,96496
ACY,2015,Mar,116493,197,116690
ACY,2015,Apr,105539,161,105700
ACY,2015,May,103668,425,104093
ACY,2015,Jun,96259,1102,97361
ACY,2015,Jul,109247,1894,111141
ACY,2015,Aug,108700,1932,110632
ACY,2015,Sep,82268,315,82583


In [57]:
LOCAL_DATA_LOCATION = "/Users/isisromero/Desktop/MLEIA/chap_08/air-passenger-traffic-per-month-port-authority-of-ny-nj-beginning-1977.csv"
RAW_DATA_LOCATION = "/Users/isisromero/Desktop/MLEIA/chap_08/nyc.csv"

In [58]:
import os

# Define the file URL and the local path where it should be downloaded
file_url = 'https://raw.githubusercontent.com/alan-turing-institute/TCPD/master/datasets/jfk_passengers/air-passenger-traffic-per-month-port-authority-of-ny-nj-beginning-1977.csv'
local_path = '/Users/isisromero/Desktop/MLEIA/chap_08/air-passenger-traffic-per-month-port-authority-of-ny-nj-beginning-1977.csv'

# Download the file using curl
os.system(f'curl -o {local_path} {file_url}')

# Check if the file was downloaded successfully
if os.path.exists(local_path):
    print("File downloaded successfully")
else:
    print("File download failed")

File downloaded successfully


  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 56665  100 56665    0     0   414k      0 --:--:-- --:--:-- --:--:--  412k


In [59]:
import shutil

# Define the new location
new_file_path = '/Users/isisromero/Desktop/MLEIA/chap_08/nyc.csv'

# Move the file
if os.path.exists(local_path):
    shutil.move(local_path, new_file_path)
    print(f"File moved to {new_file_path}")
else:
    print("File does not exist, cannot move")

File moved to /Users/isisromero/Desktop/MLEIA/chap_08/nyc.csv


In [60]:

# spark.stop()

In [64]:
spark = SparkSession.builder \
    .appName("AirportDataProcessing") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.2.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Define the schema of the CSV file
airport_schema = StructType([
    StructField('Airport_Code', StringType(), True),
    StructField('Year', StringType(), True),
    StructField('Month', StringType(), True),
    StructField('Domestic_Passengers', IntegerType(), True),
    StructField('International_Passengers', IntegerType(), True),
    StructField('Total_Passengers', IntegerType(), True)
])

# Read the file into a Spark DataFrame
raw_data = spark.read.csv(new_file_path, header=True, schema=airport_schema).withColumn("day", lit("1"))

# Format the data
formatted_data = (raw_data
                  .withColumn("date_raw", concat_ws("-", col("Year"), col("Month"), col("day")))
                  .withColumn("date", to_date(col("date_raw"), 'yyyy-MMM-d'))
                  .drop("Year", "Month", "day", "date_raw"))

# Define Delta table save location and names
delta_save_location = '/Users/isisromero/Desktop/MLEIA/chap_08/airport'
delta_table_nm = 'airport'
delta_database_nm = 'ben_demo'
delta_full_nm = "{}.{}".format(delta_database_nm, delta_table_nm)

# Write the data in Delta format
formatted_data.write.format("delta").mode("overwrite").option("mergeSchema", "true").option("overwriteSchema", "true").partitionBy("Airport_Code").save(delta_save_location)

# Create a database and register the Delta table (this will create the metadata for the Delta table)
spark.sql(f"CREATE DATABASE IF NOT EXISTS {delta_database_nm}")
spark.sql(f"CREATE TABLE IF NOT EXISTS {delta_full_nm} USING DELTA LOCATION '{delta_save_location}'")

# Clean up the raw data file if it exists
if os.path.exists(new_file_path):
    os.remove(new_file_path)

In [None]:
display(spark.table(delta_full_nm))

In [None]:
AIRPORT_FIELD = 'Airport_Code'
SERIES_FREQ = 'MS'
TID_COL = 'TestID'
BIG_FONT = 22
MED_FONT = 16
SMALL_FONT = 14

In [None]:

@contextmanager
def suppress_annoying_prints():
    with open(devnull, 'w') as black_hole:
        with redirect_stdout(black_hole) as chatter, redirect_stderr(black_hole) as noisy_errors:
            yield (chatter, noisy_errors)

In [None]:
def apply_index_freq(data, freq):
    return data.asfreq(freq)

def get_raw_data(delta_full_name):
  raw = spark.table(delta_full_name).toPandas()
  raw2 = raw.copy(deep=True)
  raw2['date'] = pd.to_datetime(raw2['date'])
  raw2.set_index('date', inplace=True)
  raw2.index = pd.DatetimeIndex(raw2.index.values, freq=raw2.index.inferred_freq)
  asc = raw2.sort_index()
  return asc
  
def get_airport_data_from_df(full_file, freq, airport):
    filtered = full_file[full_file[AIRPORT_FIELD] == airport]
    return apply_index_freq(filtered, freq)

def get_all_airports_from_df(full_file):
    return sorted(full_file[AIRPORT_FIELD].unique())

def generate_splits_by_months(data, months):
    train = data[:-months].fillna(method='ffill').fillna(method='bfill')
    test = data[-months:].fillna(method='ffill').fillna(method='bfill')
    return train, test

def validate_data_counts(data, split_count):
    return split_count / 0.2 < len(data) * 0.8

In [None]:
def mape(y_true, y_pred):
    drop_case = y_true != 0
    return (np.fabs(y_true - y_pred) / y_true)[drop_case].mean() * 100

def aic(n, mse, param_count):
    return n * np.log(mse) + 2 * param_count

def bic(n, mse, param_count):
    return n * np.log(mse) + param_count * np.log(n)

def calculate_errors(y_true, y_pred, param_count):
    error_scores = {}
    pred_length = len(y_pred)
    try: 
        mse = mean_squared_error(y_true, y_pred)
    except ValueError:
        mse = 1e12
    try:
        error_scores['mae'] = mean_absolute_error(y_true, y_pred)
    except ValueError:
        error_scores['mae'] = 1e12
    try:
        error_scores['medae'] = median_absolute_error(y_true, y_pred)
    except ValueError:
        error_scores['medae'] = 1e12
    error_scores['mape'] = mape(y_true, y_pred)
    error_scores['mse'] = mse
    error_scores['rmse'] = math.sqrt(mse)
    error_scores['aic'] = aic(pred_length, mse, param_count)
    error_scores['bic'] = bic(pred_length, mse, param_count)
    try:
        error_scores['explained_var'] = explained_variance_score(y_true, y_pred)
    except ValueError:
        error_scores['explained_var'] = -1e4
    try:
        error_scores['r2'] = r2_score(y_true, y_pred)
    except ValueError:
        error_scores['r2'] = -1e4
        
    return error_scores
     

In [None]:
def extract_param_count_hwes(config):
    return len(config['model'].keys()) + len(config['fit'].keys())

def extract_individual_trial_params(hpopt_config, run):
    return space_eval(hpopt_config, {k:v[0] for (k, v) in run['misc']['vals'].items() if v})

def extract_metric(run, metric_name):
    test_ids = [x['tid'] + 1 for x in run]
    test_metric = [x['result']['loss'] for x in run]
    return pd.DataFrame(list(zip(test_ids, test_metric)), columns=[TID_COL, metric_name])

def collapse_dict(trial_params):
    values = {}
    for (k, v) in trial_params.items():
        if isinstance(v, dict):
            values = {**values, **collapse_dict(v)}
        else:
            values[k] = v
    return values

def extract_hyperopt_trials(trials_run, trials_configuration, metric_name):
    extracted_params = [collapse_dict(extract_individual_trial_params(trials_configuration, x)
                                     ) for x in trials_run.trials]
    params_df = pd.DataFrame(extracted_params)
    params_df[TID_COL] = [x['tid'] + 1 for x in trials_run]
    return extract_metric(trials_run, metric_name).merge(params_df, on=TID_COL)

In [None]:
def plot_predictions(y_true, y_pred, param_count, time_series_name, value_name, 
                     image_name, style='seaborn', plot_size=(16, 12)):
    validation_output = {}
    error_values = calculate_errors(y_true, y_pred, param_count)
    validation_output['errors'] = error_values
    text_str = '\n'.join((
        'mae = {:.3f}'.format(error_values['mae']),
        'medae = {:.3f}'.format(error_values['medae']),
        'mape = {:.3f}'.format(error_values['mape']),
        'aic = {:.3f}'.format(error_values['aic']),
        'bic = {:.3f}'.format(error_values['bic']),
        'mse = {:.3f}'.format(error_values['mse']),
        'rmse = {:.3f}'.format(error_values['rmse']),
        'explained var = {:.3f}'.format(error_values['explained_var']),
        'r squared = {:.3f}'.format(error_values['r2']),
    ))
    with plt.style.context(style=style):
        fig, axes = plt.subplots(1, 1, figsize=plot_size)
        axes.plot(y_true, 'b-o', label='Test data for {}'.format(time_series_name))
        axes.plot(y_pred, 'r-o', label='Forecast data for {}'.format(time_series_name))
        axes.legend(loc='upper left', fontsize=MED_FONT)
        axes.set_title('Raw and Predicted data trend for {}'.format(time_series_name))
        axes.set_ylabel(value_name)
        axes.set_xlabel(y_true.index.name)
        for i in (axes.get_xticklabels() + axes.get_yticklabels()):
            i.set_fontsize(SMALL_FONT)  
        for i in [axes.title, axes.xaxis.label, axes.yaxis.label]:
            i.set_fontsize(BIG_FONT)
        props = dict(boxstyle='round', facecolor='oldlace', alpha=0.5)
        axes.text(0.05, 0.9, text_str, transform=axes.transAxes, fontsize=MED_FONT, 
                  verticalalignment='top', bbox=props)
        validation_output['plot'] = fig
        plt.savefig(image_name, format='png')
        plt.tight_layout()
    return validation_output


def annotate_num(x, y, z, metric, param, ax):
    xmax = x[np.argmin(y)]
    ymax = y.min()
    zmax = z[np.argmin(y)]
    text_value = "Best Model\n{}={:.4f} \niteration={} \n{}={:.3f}".format(param, xmax, zmax, metric, ymax)
    bbox_config = dict(boxstyle='round,pad=0.5', fc='ivory', ec='grey', lw=0.8)
    arrow = dict(facecolor='darkblue', shrink=0.01, connectionstyle='angle3,angleA=90,angleB=45')
    conf = dict(xycoords='data',textcoords='axes fraction',arrowprops=arrow,
                bbox=bbox_config,ha='left', va='center', fontsize=MED_FONT)
    ax.annotate(text_value, xy=(xmax,ymax), xytext=(0.3,0.8), **conf)
    
def annotate_str(x, y, data, metric, param, ax):
    xmax = x[np.argmin(y)]
    ymax = y.min()
    text_value = "Best Model\n{}={} \niteration={} \n{}={:.3f}".format(
        param, data[param].values[0], data[TID_COL].values[0], metric, ymax)
    bbox_config = dict(boxstyle='round,pad=0.5', fc='ivory', ec='grey', lw=0.8)
    arrow = dict(facecolor='darkblue', shrink=0.01, connectionstyle='angle3,angleA=90,angleB=45')
    conf = dict(xycoords='data',textcoords='axes fraction',arrowprops=arrow,
                bbox=bbox_config,ha='left', va='center', fontsize=MED_FONT)
    ax.annotate(text_value, xy=(xmax,ymax), xytext=(0.3,0.8), **conf)

def generate_hyperopt_report(hpopt_df, metric, plot_name, image_name, fig_size=(16, 36)):
    params = [x for x in list(hpopt_df) if x not in [TID_COL, metric]]
    COLS = 2
    ROWS = int(math.ceil(len(params)/COLS))
    with plt.style.context(style='seaborn'):
        u_filter = hpopt_df[metric].quantile(0.9)
        grid = gs.GridSpec(ROWS, COLS)
        fig = plt.figure(figsize=fig_size)
        for i in range(len(params)):
            column = params[i]
            unique_vals = sorted(hpopt_df[column].unique())
            ax = fig.add_subplot(grid[i])
            if len(unique_vals) > 6:
                x = hpopt_df[column]
                y = hpopt_df[metric]
                im = ax.scatter(x=x, y=y, c=hpopt_df[TID_COL], marker='o', s=80, cmap=plt.cm.coolwarm, alpha=0.6)
                fig.colorbar(im, ax=ax, orientation='vertical')
                annotate_num(x, y, hpopt_df[TID_COL], metric, column, ax)
            else:
                j = 0
                min_metric_row = hpopt_df[hpopt_df[metric] == hpopt_df[metric].min()]
                for i in unique_vals:
                    y_interim = hpopt_df[hpopt_df[column] == i]
                    y_pre_filter = y_interim[(y_interim[metric] < u_filter)]
                    y = y_pre_filter[metric]
                    ax.boxplot(y, positions=[j+1], widths=0.4)
                    if isinstance(i, str): 
                        x = np.random.normal(1+j, 0.05, size=len(y))
                    else:
                        x = np.random.normal(1+i, 0.05, size=len(y))
                    sp = ax.scatter(x=x, y=y, c=y_pre_filter[TID_COL], marker='o', alpha=0.6, s=80, 
                                    cmap=plt.cm.coolwarm)
                    if min_metric_row[metric].values[0] in y_pre_filter[metric].values:
                        annotate_str(x, y, min_metric_row, metric, column, ax)
                    j+=1
                fig.colorbar(sp, ax=ax, orientation='vertical')
                ax.set_xticklabels(unique_vals)
            ax.set_title('Hyperopt trials {} vs. {}'.format(column, metric))
            ax.set_ylabel(metric)
            ax.set_xlabel(column)
            for i in (ax.get_xticklabels() + ax.get_yticklabels()):
                i.set_fontsize(SMALL_FONT)  
            for i in [ax.title, ax.xaxis.label, ax.yaxis.label]:
                i.set_fontsize(MED_FONT)
        fig.suptitle(plot_name, size=BIG_FONT)
        fig.tight_layout()
        fig.subplots_adjust(top=0.96)
        plt.savefig(image_name, format='png')
    return fig

def generate_forecast_plots(forecast_data, **plot_conf):
    images = []
    for airport in forecast_data['Airport'].unique():
        filtered = forecast_data[forecast_data['Airport'] == airport]
        real_data = filtered[plot_conf['target_col']]
        forecast_historic = filtered[filtered['is_future'] == False][plot_conf['forecast_col']]
        forecast_future = filtered[filtered['is_future'] == True][plot_conf['forecast_col']]
        min_scale = np.min([filtered[plot_conf['forecast_col']].min(), filtered[plot_conf['target_col']].min()])
        forecast_boundary = filtered[filtered['is_future'] != True].index[-1]
        with plt.style.context(style='seaborn'):
            fig, ax = plt.subplots(1,1,figsize=plot_conf['figsize'])
            ser1 = ax.plot(real_data, 'b-o', label='Historic Data for {} at {}'.format(
                plot_conf['target_col'], airport))
            ser2 = ax.plot(forecast_historic, 'r--o', label='Forecast during historic for {} at {}'.format(
                plot_conf['target_col'], airport))
            ser3 = ax.plot(forecast_future, 'r:o', label='Future forecast for {} at {}'.format(
                plot_conf['target_col'], airport))
            ax.legend(loc='upper left', fontsize=MED_FONT)
            ax.set_title('Raw, Predicted, and Forecast data for {}'.format(airport))
            ax.set_ylabel(plot_conf['target_col'])
            ax.set_xlabel('Date')
            boundary = ax.axvline(forecast_boundary, color='black')
            bbox_conf = dict(boxstyle='round,pad=0.5', fc='ivory', ec='k', lw=0.8)
            left_box = ax.text(forecast_boundary - relativedelta(months=1), 
                               min_scale, 
                               "<-- Historic Data", 
                               bbox=bbox_conf, 
                               fontsize=MED_FONT,
                               ha='right'                              
                              )
            right_box = ax.text(forecast_boundary + relativedelta(months=1), 
                                min_scale, 
                                "Forecast Data -->", 
                                bbox=bbox_conf, 
                                fontsize=MED_FONT)
            for i in (ax.get_xticklabels() + ax.get_yticklabels()):
                i.set_fontsize(SMALL_FONT)  
            for i in [ax.title, ax.xaxis.label, ax.yaxis.label]:
                i.set_fontsize(BIG_FONT)
            plt.tight_layout()
            plt.savefig('{}{}'.format(airport, plot_conf['image_base_name']), format='png')
            images.append(fig)
    return images
     

#### First Major Changes

In [None]:
def exp_smoothing_raw(train, test, selected_hp_values):
    output = {}
    model = ExponentialSmoothing(train, 
                               trend=selected_hp_values['model']['trend'],
                               seasonal=selected_hp_values['model']['seasonal'],
                               seasonal_periods=selected_hp_values['model']['seasonal_periods'],
                               damped=selected_hp_values['model']['damped']
                              )
    model_fit = model.fit(smoothing_level=selected_hp_values['fit']['smoothing_level'],
                          smoothing_seasonal=selected_hp_values['fit']['smoothing_seasonal'],
                          damping_slope=selected_hp_values['fit']['damping_slope'],
                          use_brute=selected_hp_values['fit']['use_brute'],
                          use_boxcox=selected_hp_values['fit']['use_boxcox'],
                          use_basinhopping=selected_hp_values['fit']['use_basinhopping'],
                          remove_bias=selected_hp_values['fit']['remove_bias']
                         )
    forecast = model_fit.predict(train.index[-1], test.index[-1])
    output['model'] = model_fit
    output['forecast'] = forecast[1:]
    return output

def hwes_minimization_function(selected_hp_values, train, test, loss_metric):
    model_results = exp_smoothing_raw(train, test, selected_hp_values)
    errors = calculate_errors(test, model_results['forecast'], extract_param_count_hwes(selected_hp_values))
    mlflow.log_params(selected_hp_values)
    mlflow.log_metrics(errors)
    return {'loss': errors[loss_metric], 'status': STATUS_OK}

In [None]:
def run_tuning(train, test, **params):
    param_count = extract_param_count_hwes(params['tuning_space'])
    output = {}
    trial_run = SparkTrials(parallelism=params['parallelism'], timeout=params['timeout'])
    with mlflow.start_run(run_name='PARENT_RUN_{}_{}'.format(params['airport_name'], params['run_number']), nested=True):
      mlflow.set_tag('airport', params['airport_name'])
      tuning = fmin(partial(params['minimization_function'], 
                            train=train, 
                            test=test,
                            loss_metric=params['loss_metric']
                           ), 
                    params['tuning_space'], 
                    algo=params['hpopt_algo'], 
                    max_evals=params['iterations'], 
                    trials=trial_run,
                    show_progressbar=False
                   )

      best_run = space_eval(params['tuning_space'], tuning)
      generated_model = params['forecast_algo'](train, test, best_run)
      extracted_trials = extract_hyperopt_trials(trial_run, params['tuning_space'], params['loss_metric'])
      output['best_hp_params'] = best_run
      output['best_model'] = generated_model['model']
      output['hyperopt_trials_data'] = extracted_trials
      output['hyperopt_trials_visualization'] = generate_hyperopt_report(extracted_trials, 
                                                                         params['loss_metric'], 
                                                                         params['hyperopt_title'], 
                                                                         params['hyperopt_image_name'])
      output['forecast_data'] = generated_model['forecast']
      output['series_prediction'] = build_future_forecast(generated_model['model'],
                                                          params['airport_name'],
                                                          params['future_forecast_periods'],
                                                          params['train_split_cutoff_months'],
                                                          params['target_name']
                                                         )
      output['plot_data'] = plot_predictions(test, 
                                             generated_model['forecast'], 
                                             param_count,
                                             params['name'], 
                                             params['target_name'], 
                                             params['image_name'])
      mlflow.log_artifact(params['image_name'])
      mlflow.log_artifact(params['hyperopt_image_name'])
        
    return output

def run_all_models(**config):
    all_data = get_raw_data(config['source_data'])
    model_outputs = {}
    airports = get_all_airports_from_df(all_data)
    base = config['base_config']
    for airport in airports:
        data = get_airport_data_from_df(all_data, config['series_freq'], airport)
        if validate_data_counts(data, all_model_config['train_split_cutoff_months']):
            print("Starting tuning of Airport {}".format(airport))
            run_config = {'minimization_function': base['minimization_function'],
                  'tuning_space': base['tuning_space'],
                  'forecast_algo': base['forecast_algo'],
                  'loss_metric': base['loss_metric'],
                  'hpopt_algo': base['hpopt_algo'],
                  'iterations': base['iterations'],
                  'parallelism': base['parallelism'],
                  'timeout': base['timeout'],
                  'name': '{} {}'.format(base['base_name'], airport),
                  'target_name': base['target_name'],
                  'run_number': base['run_number'],
                  'image_name': '{}_{}.png'.format(base['fit_base_image_name'], airport),
                  'airport_name': airport,
                  'future_forecast_periods': config['future_forecast_periods'],
                  'train_split_cutoff_months': config['train_split_cutoff_months'],
                  'hyperopt_title': '{}_Hyperopt Training Report'.format(airport),
                  'hyperopt_image_name': '{}_{}.png'.format(base['tuning_base_image_name'], airport),
                  'verbose': base['verbose']
            }
            train, test = generate_splits_by_months(data, config['train_split_cutoff_months'])
            if base['verbose']:
                model_outputs[airport] = run_tuning(train=train[config['forecast_field']], 
                                                    test=test[config['forecast_field']], 
                                                    **run_config)
            else:
                with suppress_annoying_prints():
                    model_outputs[airport] = run_tuning(train=train[config['forecast_field']], 
                                                        test=test[config['forecast_field']], 
                                                        **run_config)
    return model_outputs

def build_forecast_dataset(run_data, **run_config):
    run_keys = run_data.keys()
    coll = []
    for airport in run_keys:
        forecast_df = pd.DataFrame(run_data[airport]['series_prediction'], 
                                   columns=['{}_pred'.format(run_config['forecast_field']), 'Airport', 'is_future'])
        data = get_airport_data_from_df(get_raw_data(run_config['source_data']), run_config['series_freq'], airport)
        train, test = generate_splits_by_months(data, run_config['train_split_cutoff_months'])
        coll.append(forecast_df.merge(test[run_config['forecast_field']], 
                                      how='left', right_index=True, left_index=True))
    return pd.concat(*[coll])

def build_future_forecast(model, airport, future_periods, test_periods, forecast_column):
    forecast_df = pd.DataFrame(model.forecast(test_periods + future_periods), 
                               columns=['{}_pred'.format(forecast_column)])
    forecast_df['Airport'] = airport
    series_end = forecast_df[:test_periods].index.values[-1]
    forecast_df['is_future'] = np.where(forecast_df.index > series_end, True, False)
    return forecast_df

In [None]:
RUN_NUMBER = 1
hpopt_space = {
    'model': {
          'trend': hp.choice('trend', ['add', 'mul']),
          'seasonal': hp.choice('seasonal', ['add', 'mul']),
          'seasonal_periods': hp.quniform('seasonal_periods', 12, 120, 12),
          'damped': hp.choice('damped', [True, False])
    },
    'fit': {
          'smoothing_level': hp.uniform('smoothing_level', 0.01, 0.99),
          'smoothing_seasonal': hp.uniform('smoothing_seasonal', 0.01, 0.99),
          'damping_slope': hp.uniform('damping_slope', 0.01, 0.99),
          'use_brute': hp.choice('use_brute', [True, False]),
          'use_boxcox': hp.choice('use_boxcox', [True, False]),
          'use_basinhopping': hp.choice('use_basinhopping', [True, False]),
          'remove_bias': hp.choice('remove_bias', [True, False])
    }
}
base_config = {
              'minimization_function': hwes_minimization_function,
              'tuning_space': hpopt_space,
              'forecast_algo': exp_smoothing_raw,
              'loss_metric': 'bic',
              'hpopt_algo': tpe.suggest,
              'iterations': 1000,
              'parallelism': 32,
              'timeout': 3600,
              'base_name': 'Total Passengers HPOPT',
              'target_name': 'Total_Passengers',
              'run_number': RUN_NUMBER,
              'fit_base_image_name': 'total_passengers_validation',
              'tuning_base_image_name': 'total_passengers_hpopt',
              'verbose': True
}
all_model_config = {
    'source_data': delta_full_nm,
    'train_split_cutoff_months': 12,
    'future_forecast_periods': 36,
    'series_freq': 'MS',
    'forecast_field': 'Total_Passengers',
    'base_config': base_config
}
plot_conf = {
    'forecast_col': 'Total_Passengers_pred',
    'target_col': 'Total_Passengers',
    'image_base_name': '_forecast.png',
    'figsize': (16,12)
}

### Execution of the Run using SparkTrials

In [None]:
all_airports = run_all_models(**all_model_config)

#### Alternative Approach: Distributing the Airport Modeling Tasks to Workers

In [None]:
def validate_data_counts_udf(data, split_count):
    return list(data.groupBy(col('Airport_Code')).count().withColumn('check', when(((lit(split_count) / 0.2) < (col('count') * 0.8)), True).otherwise(False)).filter(col('check')).select('Airport_Code').toPandas()['Airport_Code'])

def exp_smoothing_raw_udf(train, test, selected_hp_values):
    output = {}
    model = ExponentialSmoothing(train, 
                               trend=selected_hp_values['model']['trend'],
                               seasonal=selected_hp_values['model']['seasonal'],
                               seasonal_periods=selected_hp_values['model']['seasonal_periods'],
                               damped=selected_hp_values['model']['damped']
                              )
    model_fit = model.fit(smoothing_level=selected_hp_values['fit']['smoothing_level'],
                          smoothing_seasonal=selected_hp_values['fit']['smoothing_seasonal'],
                          damping_slope=selected_hp_values['fit']['damping_slope'],
                          use_brute=selected_hp_values['fit']['use_brute'],
                          use_boxcox=selected_hp_values['fit']['use_boxcox'],
                          use_basinhopping=selected_hp_values['fit']['use_basinhopping'],
                          remove_bias=selected_hp_values['fit']['remove_bias']
                         )
    forecast = model_fit.predict(train.index[-1], test.index[-1])
    output['model'] = model_fit
    output['forecast'] = forecast[1:]
    return output

def hwes_minimization_function_udf(selected_hp_values, train, test, loss_metric, airport, experiment_name, param_count, name, target_name, image_name, trial):
    model_results = exp_smoothing_raw_udf(train, test, selected_hp_values)
    errors = calculate_errors(test, model_results['forecast'], extract_param_count_hwes(selected_hp_values))
    with mlflow.start_run(run_name='{}_{}_{}_{}'.format(airport, experiment_name,str(uuid.uuid4())[:8], len(trial.results))):
      mlflow.set_tag('airport', airport)
      mlflow.set_tag('parent_run', experiment_name)
      mlflow.log_param('id', mlflow.active_run().info.run_id)
      mlflow.log_param('damping_slope', selected_hp_values['fit']['damping_slope'])
      mlflow.log_param('remove_bias', selected_hp_values['fit']['remove_bias'])
      mlflow.log_param('smoothing_level', selected_hp_values['fit']['smoothing_level'])
      mlflow.log_param('smoothing_seasonal', selected_hp_values['fit']['smoothing_seasonal'])
      mlflow.log_param('use_basinhopping', selected_hp_values['fit']['use_basinhopping'])
      mlflow.log_param('use_boxcox', selected_hp_values['fit']['use_boxcox'])
      mlflow.log_param('use_brute', selected_hp_values['fit']['use_brute'])
      mlflow.log_param('damped', selected_hp_values['model']['damped'])
      mlflow.log_param('seasonal', selected_hp_values['model']['seasonal'])
      mlflow.log_param('seasonal_periods', selected_hp_values['model']['seasonal_periods'])
      mlflow.log_param('trend', selected_hp_values['model']['trend'])
      mlflow.log_metrics(errors)
      img = plot_predictions(test, 
                       model_results['forecast'], 
                       param_count,
                       name, 
                       target_name, 
                       image_name)
      mlflow.log_artifact(image_name)    
    return {'loss': errors[loss_metric], 'status': STATUS_OK}

#### The Big Change...

In [None]:
def run_udf_tuning(train, test, **params):
    param_count = extract_param_count_hwes(params['tuning_space'])
    output = {}
    trial_run = Trials()
    
    tuning = fmin(partial(params['minimization_function'], 
                          train=train, 
                          test=test,
                          loss_metric=params['loss_metric'],
                          airport=params['airport_name'],
                          experiment_name=params['experiment_name'],
                          param_count=param_count,
                          name=params['name'],
                          target_name=params['target_name'],
                          image_name=params['image_name'],
                          trial=trial_run
                         ), 
                  params['tuning_space'], 
                  algo=params['hpopt_algo'], 
                  max_evals=params['iterations'], 
                  trials=trial_run,
                  show_progressbar=False
                 )

    best_run = space_eval(params['tuning_space'], tuning)
    generated_model = params['forecast_algo'](train, test, best_run)
    forecasted_data = build_future_forecast(generated_model['model'],
                                                        params['airport_name'],
                                                        params['future_forecast_periods'],
                                                        params['train_split_cutoff_months'],
                                                        params['target_name']
                                                       )
    output['plot_data'] = plot_predictions(test, 
                                           generated_model['forecast'], 
                                           param_count,
                                           params['name'], 
                                           params['target_name'], 
                                           params['image_name'])
      
    return forecasted_data.reset_index().rename(columns={'index':'date'})

#### The pandas_udf

In [None]:
output_schema = StructType([
  StructField('date', DateType()),
  StructField('Total_Passengers_pred', IntegerType()),
  StructField('Airport', StringType()),
  StructField('is_future', BooleanType())
])

@pandas_udf(output_schema, PandasUDFType.GROUPED_MAP)
def forecast_airports(airport_df):
  airport = airport_df['Airport_Code'][0]
  hpopt_space = {
    'model': {
          'trend': hp.choice('trend', ['add', 'mul']),
          'seasonal': hp.choice('seasonal', ['add', 'mul']),
          'seasonal_periods': hp.quniform('seasonal_periods', 12, 120, 12),
          'damped': hp.choice('damped', [True, False])
    },
    'fit': {
          'smoothing_level': hp.uniform('smoothing_level', 0.01, 0.99),
          'smoothing_seasonal': hp.uniform('smoothing_seasonal', 0.01, 0.99),
          'damping_slope': hp.uniform('damping_slope', 0.01, 0.99),
          'use_brute': hp.choice('use_brute', [True, False]),
          'use_boxcox': hp.choice('use_boxcox', [True, False]),
          'use_basinhopping': hp.choice('use_basinhopping', [True, False]),
          'remove_bias': hp.choice('remove_bias', [True, False])
    }
  }

  run_config = {'minimization_function': hwes_minimization_function_udf,
                  'tuning_space': hpopt_space,
                  'forecast_algo': exp_smoothing_raw,
                  'loss_metric': 'bic',
                  'hpopt_algo': tpe.suggest,
                  'iterations': 600,
                  'experiment_name': RUN_NAME,
                  'name': '{} {}'.format('Total Passengers HPOPT', airport),
                  'target_name': 'Total_Passengers',
                  'image_name': '{}_{}.png'.format('total_passengers_validation', airport),
                  'airport_name': airport,
                  'future_forecast_periods': 36,
                  'train_split_cutoff_months': 12,
                  'hyperopt_title': '{}_Hyperopt Training Report'.format(airport),
                  'hyperopt_image_name': '{}_{}.png'.format('total_passengers_hpopt', airport),
                  'verbose': True
            }
  

  airport_data = airport_df.copy(deep=True)
  airport_data['date'] = pd.to_datetime(airport_data['date'])
  airport_data.set_index('date', inplace=True)
  airport_data.index = pd.DatetimeIndex(airport_data.index.values, freq=airport_data.index.inferred_freq)
  asc = airport_data.sort_index()
  asc = apply_index_freq(asc, 'MS')
 
  train, test = generate_splits_by_months(asc, run_config['train_split_cutoff_months'])
  
  tuning = run_udf_tuning(train['Total_Passengers'], test['Total_Passengers'], **run_config)
  
  return tuning


In [None]:
RUN_NAME = 'PANDAS_UDF_RUN_1'
raw_data = spark.table(delta_full_nm)
filtered_data = raw_data.where(col('Airport_Code').isin(validate_data_counts_udf(raw_data, 12))).repartition('Airport_Code')
grouped_apply = filtered_data.groupBy('Airport_Code').apply(forecast_airports)
display(grouped_apply)
     