In [1]:
# data_preprocessing.py
from datetime import date, datetime, timedelta
import polars as pl
from deltalake import DeltaTable
import json
from gcsfs import GCSFileSystem
import pyarrow

# custom imports
import sys
sys.path.append('..')
from src.gcp import GCPClient
from src.io import DataLoader, DataWriter


In [2]:

class TrainingDataPreprocessor():
    '''
    Preprocesses `flight event data` for training, creates target variable
    '''

    def __init__(self, df):
        self.df = df

    def create_target(self, df):
        '''
        Create target variable, time to landing
        '''
        landing_times = df.group_by('fa_flight_id').agg(pl.max('actual_in').alias('actual_in_filled'))
        df = df.join(landing_times, on='fa_flight_id')
        df = df.with_columns((
                                (pl.col('actual_in_filled') - pl.col('event_ts')).dt.minutes() 
                              ).alias('duration_to_arrival_minutes') 
                             )

        df = df.with_columns(pl.col('duration_to_arrival_minutes').alias('target'))

        
        df = df.sort(['actual_in_filled','crt_ts'])
        return df
    

    def create_target_v2(self, df):
        '''
        An alternative target variable which represents minutes to next event.
        '''
        event_sched_order = {
            'scheduled_out': 1,
            'actual_out': 2,
            'actual_off': 3,
            'actual_on': 4,
            'actual_in': 5
        }

        df = df.with_columns(
            pl.col('event_type').map_dict(event_sched_order, return_dtype=pl.Int8)
                .alias('event_seq_number_sched')
        )
        df = df.with_columns(
            pl.col("event_ts").sort_by('event_seq_number_sched').shift(-1).over("fa_flight_id")
                .alias("next_event_ts")
        )
        df = df.with_columns(
            (pl.col('next_event_ts') - pl.col('event_ts')).dt.minutes()
                .alias('duration_to_next_event_minutes')
        )

        return df

    def remove_incomplete_flights(self, df):
        # Remove flights that haven't landed yet
        return df.filter( pl.col('actual_in_filled').is_not_null() )

    def removed_arrival_events(self, df):
        # Arrival events are not useful for training
        return df.filter(pl.col('event_type') != 'actual_in')


    def process_data(self):
        df = self.df
        df = self.create_target(df)
        df = self.create_target_v2(df)
        df = self.remove_incomplete_flights(df)
        df = self.removed_arrival_events(df)
        return df


In [3]:

# Set up paths for loading and writing data
gcs_path_prefix = 'gs://datalake-flight-dev-1'
table_name_in = 'flightsummary-delta-processed-stream'
table_name_out = 'flightsummary-delta-processed-training'


table_path_in = f'{gcs_path_prefix}/{table_name_in}'
table_path_out = f'{gcs_path_prefix}/training/{table_name_out}'
table_path_out_local = f'../data/{table_name_out}'

client = GCPClient()
dl = DataLoader(client=client)

# Load data from delta table, last 1000 days of data
lookback_params = {
    'lookback_days': 1000,
    'lookback_date_column': 'crt_ts_date'}

dl.load_delta_table(path=table_path_in, lookback_params=lookback_params)
df = dl.return_as_polars_df_lazy()

# Process data using polars
df = TrainingDataPreprocessor(df).process_data()


In [4]:

# Instantiate DataWriter and write data to multiple locations
dw = DataWriter(df)

# dw.write_local(path=table_path_out_local, file_type_out='csv')
# dw.write_local(path=table_path_out_local, file_type_out='delta')
dw.write_gcs(path=table_path_out, client=client, file_type_out='delta')

Writing data to gs://datalake-flight-dev-1/training/flightsummary-delta-processed-training


## Scratch

In [5]:
# Below: Scratch for further Processing on the, raw (ingested) flight status table, which can be transformed to 'scheduled' flight data

In [6]:
# df.groupby('fa_flight_id').agg(
#     pl.count('fa_flight_id').alias('transaction_count'),
#     pl.n_unique('scheduled_out').alias('n_unique_scheduled_out'),
# ).filter(pl.col('n_unique_scheduled_out') > 1)

In [7]:
# df.filter(pl.col('last_scheduled_out_ts').is_null() ).shape

In [8]:
# df.filter(
#     (pl.col('last_scheduled_out_ts').cast(str) != pl.col('scheduled_out').cast(str).str.slice(0,19)) 
#     | pl.col('last_scheduled_out_ts').is_null() 
#     & (pl.col('crt_ts').str.strptime(pl.Datetime, "%Y-%m-%d %H:%M:%S") < pl.col('scheduled_out')) 
#     )

In [9]:
# to create training data for the schedule-based model, we need to capture snapshots of each scheduled flight at a given time before the scheduled departure time.

# to do so, we can use the following logic:
# 1. crt_ts < scheduled_out
# 2. scheduled_out != last_scheduled_out_ts

In [10]:
        # This logic only works on 'scheduled' flight event table where all event times are retroactively added. 
        # Primary flight event data is not suitable for this logic, as it is meant to be used in real-time

        # df = (
        # df.with_columns(
        #     pl.when(pl.col('event_type') == 'scheduled_out')
        #     .then(pl.col('actual_out') - pl.col('scheduled_out'))

        #     .when(pl.col('event_type') == 'actual_out')
        #     .then(pl.col('actual_off') - pl.col('actual_out'))

        #     .when(pl.col('event_type') == 'actual_off')
        #     .then(pl.col('actual_on') - pl.col('actual_off'))

        #     .when(pl.col('event_type') == 'actual_on')
        #     .then(pl.col('actual_in') - pl.col('actual_on'))

        #     .otherwise(pl.lit(None))
        #     .dt.minutes()
        #     .alias('duration_to_next_event_minutes')
        #     )
        # )