In [32]:
import pandas as pd
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler,LabelEncoder
from sklearn.base import BaseEstimator, TransformerMixin
from geopy.distance import geodesic
import numpy as np
from tabulate import tabulate
import joblib
import cloudpickle

In [37]:
#Making a preprocessing pipeline

# Data loading
df_flightdata = pd.read_csv('data/Train.csv')
df_airportdata = pd.read_csv('data/airportdata.csv', index_col=0)

# Defining categorical data and features to drop. Here we use label encoding. Hot encoding is not used.
columns_to_drop = ['id', 'std', 'sta', 'fltid', 'arr_iata', 'dep_iata', 'ac','status']
categorical_columns = ['depstn', 'arrstn',  'arr_country', 'dep_country', 'season', 'airline_code', 'international_flight']

# Join the DataFrames first and then apply the preprocessing pipeline
merged_data = df_flightdata.join(df_airportdata[['iata', 'country', 'elevation', 'lat', 'lon']].add_prefix('dep_'), how='left', on='DEPSTN') \
    .join(df_airportdata[['iata', 'country', 'elevation', 'lat', 'lon']].add_prefix('arr_'), how='left', on='ARRSTN')
    
display(merged_data.T)


Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,107823,107824,107825,107826,107827,107828,107829,107830,107831,107832
ID,train_id_0,train_id_1,train_id_2,train_id_3,train_id_4,train_id_5,train_id_6,train_id_7,train_id_8,train_id_9,...,train_id_107823,train_id_107824,train_id_107825,train_id_107826,train_id_107827,train_id_107828,train_id_107829,train_id_107830,train_id_107831,train_id_107832
DATOP,2016-01-03,2016-01-13,2016-01-16,2016-01-17,2016-01-17,2016-01-17,2016-01-18,2016-01-18,2016-01-18,2016-01-18,...,2018-04-12,2018-02-09,2018-08-08,2018-10-15,2018-12-19,2018-07-05,2018-01-13,2018-11-07,2018-01-23,2018-11-13
FLTID,TU 0712,TU 0757,TU 0214,TU 0480,TU 0338,TU 0283,TU 0514,TU 0716,TU 0752,TU 0996,...,TU 9001,UG 1730,WKL 0000,UG 0011,SGT 0000,WKL 0000,UG 0003,SGT 0000,UG 0010,UG 0002
DEPSTN,CMN,MXP,TUN,DJE,TUN,TLS,TUN,TUN,TUN,TUN,...,MIR,TUN,TUN,DJE,TUN,TUN,DJE,TUN,TUN,TUN
ARRSTN,TUN,TUN,IST,NTE,ALG,TUN,BCN,ORY,FCO,NCE,...,TUN,NAP,TUN,TUN,TUN,TUN,TUN,TUN,DJE,DJE
STD,2016-01-03 10:30:00,2016-01-13 15:05:00,2016-01-16 04:10:00,2016-01-17 14:10:00,2016-01-17 14:30:00,2016-01-17 16:20:00,2016-01-18 07:15:00,2016-01-18 07:35:00,2016-01-18 07:40:00,2016-01-18 07:45:00,...,2018-04-12 17:05:00,2018-02-09 10:00:00,2018-08-08 22:00:00,2018-10-15 19:45:00,2018-12-19 23:45:00,2018-07-05 23:00:00,2018-01-13 08:00:00,2018-11-07 05:00:00,2018-01-23 18:00:00,2018-11-13 06:15:00
STA,2016-01-03 12.55.00,2016-01-13 16.55.00,2016-01-16 06.45.00,2016-01-17 17.00.00,2016-01-17 15.50.00,2016-01-17 18.15.00,2016-01-18 09.00.00,2016-01-18 09.55.00,2016-01-18 09.00.00,2016-01-18 09.15.00,...,2018-04-12 17.40.00,2018-02-09 11.15.00,2018-08-09 01.00.00,2018-10-15 20.45.00,2018-12-20 01.45.00,2018-07-06 02.00.00,2018-01-13 09.00.00,2018-11-07 12.50.00,2018-01-23 18.45.00,2018-11-13 07.05.00
STATUS,ATA,ATA,ATA,ATA,ATA,ATA,ATA,ATA,ATA,ATA,...,ATA,SCH,SCH,SCH,SCH,SCH,SCH,SCH,ATA,SCH
AC,TU 32AIMN,TU 31BIMO,TU 32AIMN,TU 736IOK,TU 320IMU,TU 736IOP,TU 32AIMH,TU 32AIMI,TU 32AIMC,TU 31AIMK,...,TU 736IOL,TU CR9ISA,TU 320IMV,UG AT7LBE,TU 736IOP,TU 32AIML,UG AT7AT7,TU 736IOK,TU CR9ISA,TU CR9ISA
target,260.0,20.0,0.0,0.0,22.0,53.0,10.0,15.0,16.0,21.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


In [30]:
import json
# Frontend should only use known data(airports, airlines, ...) because label encoding on the api needs known data  
# Unique airports and airlines of the original dataset is extracted

# Get unique values for 'DEPSTN' and 'ARRSTN'
unique_depstn = df_flightdata['DEPSTN'].unique()
unique_arrstn = df_flightdata['ARRSTN'].unique()

# Combine unique values into a single DataFrame and make it unique
unique_airports = pd.concat([pd.Series(unique_depstn), pd.Series(unique_arrstn)], ignore_index=True).unique()

# Create a dictionary for airports
org_airports = {"airports": unique_airports.tolist()}

# Save airports data to a JSON file
with open('./data/airports_in_org_dataset.json', 'w') as airports_json_file:
    json.dump(org_airports, airports_json_file)


# Get values for the airline code out of flight id
unique_airline_code = (df_flightdata['FLTID'].str[:2]).unique()

# Create a dictionary for airline codes
org_airline_codes = {"airline_code": unique_airline_code.tolist()}

# Save airports data to a JSON file
with open('./data/airline_codes_in_org_dataset.json', 'w') as airline_codes_json_file:
    json.dump(org_airline_codes, airline_codes_json_file)

In [40]:
df_flightdata.head()

Unnamed: 0,ID,DATOP,FLTID,DEPSTN,ARRSTN,STD,STA,STATUS,AC,target
0,train_id_0,2016-01-03,TU 0712,CMN,TUN,2016-01-03 10:30:00,2016-01-03 12.55.00,ATA,TU 32AIMN,260.0
1,train_id_1,2016-01-13,TU 0757,MXP,TUN,2016-01-13 15:05:00,2016-01-13 16.55.00,ATA,TU 31BIMO,20.0
2,train_id_2,2016-01-16,TU 0214,TUN,IST,2016-01-16 04:10:00,2016-01-16 06.45.00,ATA,TU 32AIMN,0.0
3,train_id_3,2016-01-17,TU 0480,DJE,NTE,2016-01-17 14:10:00,2016-01-17 17.00.00,ATA,TU 736IOK,0.0
4,train_id_4,2016-01-17,TU 0338,TUN,ALG,2016-01-17 14:30:00,2016-01-17 15.50.00,ATA,TU 320IMU,22.0


In [34]:


class FixColumnNames(BaseEstimator, TransformerMixin):
    def fit(self, X, y=None):
        return self
    
    def transform(self, X):
        X.columns = X.columns.str.replace(' ', '_').str.lower().str.replace('-', '_')
        return X
    
    def get_state(self):
        # Return a dictionary with any essential attributes
        return {}

    @classmethod
    def from_state(cls, state):
        # Create an instance of the class using the state dictionary
        return cls()

class DropColumns(BaseEstimator, TransformerMixin):
    def __init__(self, columns_to_drop):
        self.columns_to_drop = columns_to_drop

    def fit(self, X, y=None):
        return self
    
    def transform(self, X):
        X.drop(self.columns_to_drop, axis=1, inplace=True)
        return X

    def get_state(self):
        # Return a dictionary with any essential attributes
        return {'columns_to_drop': self.columns_to_drop}

    @classmethod
    def from_state(cls, state):
        # Create an instance of the class using the state dictionary
        return cls(columns_to_drop=state['columns_to_drop'])
    
class LabelEncoderTransformer(BaseEstimator, TransformerMixin):
    def __init__(self, columns):
        self.columns = columns
        self.label_encoders = {}

    def fit(self, X, y=None):
        for col in self.columns:
            label_encoder = LabelEncoder()
            label_encoder.fit(X[col])
            self.label_encoders[col] = label_encoder
        return self

    def transform(self, X):
        X_encoded = X.copy()
        for col, label_encoder in self.label_encoders.items():
            X_encoded[col] = label_encoder.transform(X_encoded[col])
        return X_encoded
    
    def get_state(self):
        state = {
            'columns': self.columns,
            'label_encoders': {col: label_encoder.classes_.tolist() for col, label_encoder in self.label_encoders.items()}
        }
        return state

    @classmethod
    def from_state(cls, state):
        columns = state['columns']
        label_encoders = {col: LabelEncoder() for col in columns}

        for col, classes in state['label_encoders'].items():
            label_encoder = label_encoders[col]
            label_encoder.classes_ = classes

        instance = cls(columns=columns)
        instance.label_encoders = label_encoders

        return instance
    
class CalculateFlightDistance(BaseEstimator, TransformerMixin):
    def fit(self, X, y=None):
        return self
    
    def transform(self, X):
        def calculate_distance(row):
            dep_coords = (row['dep_lat'], row['dep_lon'])
            arr_coords = (row['arr_lat'], row['arr_lon'])
            distance = geodesic(dep_coords, arr_coords).kilometers
            return int(round(distance, 0))

        X['flight_distance_in_km'] = X.apply(calculate_distance, axis=1)
        return X

    def get_state(self):
        # Return a dictionary with any essential attributes
        return {}

    @classmethod
    def from_state(cls, state):
        # Create an instance of the class using the state dictionary
        return cls()

class AddAdditionalFlightDataFeatures(BaseEstimator, TransformerMixin):
    def fit(self, X, y=None):
        return self
    
    def transform(self, X):
        X['sta'] = pd.to_datetime(X['sta'], format='%Y-%m-%d %H.%M.%S')
        X['std'] = pd.to_datetime(X['std'], format='%Y-%m-%d %H:%M:%S')
        X['datop'] = pd.to_datetime(X['datop'], format='%Y-%m-%d')
        X['std_time'] = X['std'].dt.time
        X['sta_time'] = X['sta'].dt.time
        X['std_time'] = X['std_time'].astype(str).str.replace(':', '').astype(int)
        X['sta_time'] = X['sta_time'].astype(str).str.replace(':', '').astype(int)
        
        X['elevation_dif'] = (X['arr_elevation'] - X['dep_elevation'])
        X['flight_time_in_min'] = (X['sta'] - X['std']).dt.total_seconds() / 60
        X['average_flight_speed_km_h'] = (X['flight_distance_in_km'] * 60 / X['flight_time_in_min']).round().astype(int)
        X['international_flight'] = np.where(X['arr_country'] != X['dep_country'], 'international', 'domestic')
        X['airline_code'] = X['fltid'].str[:2]
        # Extract year, month, and day components
        X['year'] = X['datop'].dt.year
        X['month'] = X['datop'].dt.month
        X['day'] = X['datop'].dt.day
        X['datop'] = X['datop'].astype(str).str.replace('-', '').astype(int)
        
        # Create the seasons column
        X.loc[(X['month'] < 3) | (X['month'] == 12), 'season'] = 'winter'
        X.loc[(X['month'] >= 3) & (X['month'] < 6), 'season'] = 'spring' 
        X.loc[(X['month'] >= 6) & (X['month'] < 9), 'season'] = 'summer' 
        X.loc[(X['month'] >= 9) & (X['month'] < 12), 'season'] = 'autumn'
        
        return X
    
    def get_state(self):
        # Return a dictionary with any essential attributes
        return {}

    @classmethod
    def from_state(cls, state):
        # Create an instance of the class using the state dictionary
        return cls()


# Define the preprocessing steps in the pipeline
preprocessing_steps = [
    ('column_name_fixer', FixColumnNames()),
    ('calculate_flight_distance', CalculateFlightDistance()),
    ('add_additional_flight_data_features', AddAdditionalFlightDataFeatures()),
    ('drop_columns', DropColumns(columns_to_drop)),
    ('encode_labels', LabelEncoderTransformer(categorical_columns))

]

# Create the pipeline
preprocessing_pipeline = Pipeline(steps=preprocessing_steps)
# df_processed contains the preprocessed data
df_processed = preprocessing_pipeline.fit_transform(merged_data)



In [35]:
# Serialization for ColumnNameFixer
def serialize_fix_column_names(transformer, filename):
    state = transformer.get_state()
    joblib.dump(state, filename)

# Serialization for CalculateDistance
def serialize_calculate_flight_distance(transformer, filename):
    state = transformer.get_state()
    joblib.dump(state, filename)

# Serialization for CustomFeaturesAdder
def serialize_add_additional_flight_data_features(transformer, filename):
    state = transformer.get_state()
    joblib.dump(state, filename)

# Serialization for DropColumns
def serialize_drop_columns(transformer, filename):
    state = transformer.get_state()
    joblib.dump(state, filename)

# Serialization for LabelEncoderTransformer
def serialize_label_encoder_transformer(transformer, filename):
    state = transformer.get_state()
    joblib.dump(state, filename)

# Serialize all custom classes
serialize_fix_column_names(preprocessing_pipeline.named_steps['column_name_fixer'], 'data/pipelines/classes/column_name_fixer.pkl')
serialize_calculate_flight_distance(preprocessing_pipeline.named_steps['calculate_flight_distance'], 'data/pipelines/classes/calculate_flight_distance.pkl')
serialize_add_additional_flight_data_features(preprocessing_pipeline.named_steps['add_additional_flight_data_features'], 'data/pipelines/classes/add_additional_flight_data_features.pkl')
serialize_drop_columns(preprocessing_pipeline.named_steps['drop_columns'], 'data/pipelines/classes/drop_columns.pkl')
serialize_label_encoder_transformer(preprocessing_pipeline.named_steps['encode_labels'], 'data/pipelines/classes/label_encoder_transformer.pkl')


In [36]:
# Fit and save the preprocessing pipeline
joblib.dump(preprocessing_pipeline, 'data/pipelines/preprocessing_pipeline.pkl')

['data/pipelines/preprocessing_pipeline.pkl']

In [None]:
# Manually serialize each custom class
custom_classes = {
    'column_name_fixer': preprocessing_pipeline.named_steps['column_name_fixer'],
    'calculate_distance': preprocessing_pipeline.named_steps['calculate_distance'],
    'custom_features_adder': preprocessing_pipeline.named_steps['custom_features_adder'],
    'label_encoding': preprocessing_pipeline.named_steps['label_encoding'],
    'drop_columns': preprocessing_pipeline.named_steps['drop_columns']
}

for name, custom_class in custom_classes.items():
    custom_class_state = custom_class.get_state()  # Define a method to get the class state
    with open(f'{name}_state.pkl', 'wb') as file:
        joblib.dump(custom_class_state, file)

In [27]:
# Serialize the pipeline using cloudpickle: this doesnt work
# with open('data/preprocessing_pipeline_cp.joblib', 'wb') as f:
#     cloudpickle.dump(preprocessing_pipeline, f)
# Saving using cloudpickle - allows lambda function, classes  etc. , Joblib causing errors on the API 
# def dump_pickle(data,path):
#     path = str(path)
#     with open(path,"wb") as f:
#         cloudpickle.dump(data,f)
        
# dump_pickle(preprocessing_pipeline,'data/preprocessing_pipeline_cp.joblib')

IndexError: tuple index out of range

In [1]:

# Defining categorical data and features to drop. Here we use label encoding. Hot encoding is not used.
categorical_columns = ['depstn', 'arrstn', 'status', 'arr_country', 'dep_country', 'season', 'airline_code', 'international_flight','ac','dep_iata','arr_iata','fltid']
columns_to_drop = ['id', 'std', 'sta', 'fltid', 'arr_iata', 'dep_iata', 'ac']
# Importing the custom classes from a file so it also works on the api
# https://stackoverflow.com/questions/57888291/how-to-properly-pickle-sklearn-pipeline-when-using-custom-transformer
from custom_transformer_classes import ColumnNameFixer,CalculateDistance,CustomFeaturesAdder,DropColumns,LabelEncoderTransformer
# Define the preprocessing steps in the pipeline
preprocessing_steps = [
    ('column_name_fixer', ColumnNameFixer()),
    ('calculate_distance', CalculateDistance()),
    ('custom_features_adder', CustomFeaturesAdder()),
    ('label_encoding', LabelEncoderTransformer(categorical_columns)),
    ('drop_columns', DropColumns(columns_to_drop))

]

# Create the pipeline
preprocessing_pipeline = Pipeline(steps=preprocessing_steps)
preprocessing_pipeline.fit_transform(merged_data)
# Save the preprocessing pipeline to a file
joblib.dump(preprocessing_pipeline, 'data/preprocessing_pipeline_imported_classes.joblib')


NameError: name 'BaseEstimator' is not defined

In [13]:
import joblib

# Load the joblib file
loaded_pipeline = joblib.load('data/preprocessing_pipeline.joblib')

# Check if the custom classes are included in the loaded pipeline
print(loaded_pipeline.named_steps)


{'column_name_fixer': ColumnNameFixer(), 'calculate_distance': CalculateDistance(), 'custom_features_adder': CustomFeaturesAdder(), 'label_encoding': LabelEncoderTransformer(columns=['depstn', 'arrstn', 'status', 'arr_country',
                                 'dep_country', 'season', 'airline_code',
                                 'international_flight', 'ac', 'dep_iata',
                                 'arr_iata', 'fltid']), 'drop_columns': DropColumns(columns_to_drop=['id', 'std', 'sta', 'fltid', 'arr_iata',
                             'dep_iata', 'ac'])}


In [77]:
# tabulate library to compare the df's

# Extract the headers of merged_data
headers_merged = merged_data.columns.tolist()

# Initialize an empty list to store rows
headers_list = []
combined_df = pd.DataFrame()

# Iterate over the first ten rows in both DataFrames and add them to the table_data
for index, header_merged in enumerate(headers_merged):
    if header_merged in df_processed.columns:
        # Headers match, compare data
        header_processed = header_merged
        row_processed = df_processed.head(10).iloc[:, df_processed.columns.get_loc(header_merged)]
        row_merged = merged_data.head(10).iloc[:, index]
    else:
        # Headers don't match, create an empty column
        header_processed = f'{header_merged} (dropped)'
        row_processed = pd.Series([np.nan] * 10)
        row_merged = merged_data.head(10).iloc[:, index]

    # Combine values from both DataFrames in a single cell
    combined_row = []
    for val_processed, val_merged in zip(row_processed, row_merged):
        combined_row.append(f'{val_processed} | {val_merged}')
    headers_list.append(header_processed)
    combined_df[header_processed] = combined_row

# Create the table
# Convert the DataFrame to a table and print it
combined_table = tabulate(combined_df, headers='keys', tablefmt='pretty')
print("\nTable Representation of Combined DataFrame:")
print(combined_table)


Table Representation of Combined DataFrame:
+---+------------------+---------------------+-----------------+-----------+-----------+---------------------------+---------------------------+---------+-----------------+---------------+--------------------+-------------+---------------+-------------------------------+-------------------------------+--------------------+-------------+---------------+-------------------------------+-------------------------------+-----------------------+-----------------+-----------------+-----------------+--------------------+---------------------------+----------------------+--------------+-------------+-------+---------+------------+
|   |   id (dropped)   |        datop        | fltid (dropped) |  depstn   |  arrstn   |       std (dropped)       |       sta (dropped)       | status  |  ac (dropped)   |    target     | dep_iata (dropped) | dep_country | dep_elevation |            dep_lat            |            dep_lon            | arr_iata (dropped) | a