# Flugpreis Vorhersage - Kaufen oder Warten?
## Projektarbeit Data Mining
___
### Wintersemester 2021/22
### Gruppe G:
Max Grundmann - s0559326
### Inhalte
1. Exploratory Data Analysis (EDA)
### 2. Datenvorbereitung
3. Modelauswahl
4. Testing
___
Dieses Notebook implementiert die nötigen Schritte zur Transformation der Daten, Feature Engineering und -generierung und wendet sie auf die Trainings- und Testdaten an, um sie für das Training vorzubereiten.
___

Benötigte Bibliotheken importieren

In [89]:
import pandas as pd
import numpy as np
import os
from numpy import asarray
from datetime import datetime, timedelta
import logging
import tqdm

from sklearn.impute import SimpleImputer
from sklearn.preprocessing import MinMaxScaler
import joblib

Dateipfade festlegen 

In [90]:
logging.basicConfig(level=logging.INFO, format='%(levelname)s -  %(asctime)s: %(message)s')

dirname = os.getcwd()

In [91]:
def get_price_history(df: pd.DataFrame) -> pd.DataFrame:
    """Generiert die Preishistorie für alle Flüge in einem Dataframe und verkettet sie in einem Feld.

    Args:
        df (pd.DataFrame): Input Dataframe

    Returns:
        pd.DataFrame: Output Dataframe mit Preishistorie.
    """
    temp = df.copy()
    temp = temp.sort_values(by='Request_Date', axis=0, ascending=True)
    temp['Price_In_Eur'] = temp['Price_In_Eur'].astype(str)

    price_history = temp.groupby(['flight_unique_id'])['Price_In_Eur'].apply(','.join).reset_index()
    return price_history

In [92]:
def merge_price_history(df: pd.DataFrame) -> pd.DataFrame:
    """Führt die Daten mit der generierten Preishistorie zusammen.

    Args:
        df (pd.DataFrame): Input Dataframe

    Returns:
        pd.DataFrame: Output Dataframe mit Preishistorie
    """
    prices = get_price_history(df)
    temp = df.copy()
    
    split_to_columns = prices
    split_to_columns['flight_unique_id'] = prices['flight_unique_id']

    return pd.merge(left=temp, right=split_to_columns, how='left', on='flight_unique_id')

In [93]:
def get_previous_requests(df: pd.DataFrame) -> pd.DataFrame:
    """Findet die Anzahl vorheriger Anfragen für eine gegebene FlightID

    Args:
        df (pd.DataFrame): Input Dataframe

    Returns:
        pd.DataFrame: Output Dataframe mit Spalte Anzahl vorheriger Anfragen
    """
    temp = df.copy()
    temp = temp.sort_values(['flight_unique_id', 'Request_Date'])
    unique_flights = temp['flight_unique_id'].unique()

    requests_counter = 0
    flight_id_index = 0
    current_flight = unique_flights[flight_id_index]
    number_of_requests_per_row = []

    for index, row in  temp.iterrows():
        if row['flight_unique_id'] != current_flight:       
            flight_id_index += 1
            current_flight = unique_flights[flight_id_index]
            requests_counter = 0
        number_of_requests_per_row.append(requests_counter)
        requests_counter += 1

    temp['previous_requests'] = number_of_requests_per_row
    return temp

In [94]:
def get_price_of_closest_flight(df: pd.DataFrame, new_name:str, fill_with_column: str, previous: bool =False) -> pd.DataFrame:
    """Liefert den Preis des jeweils dichtesten Fluges der selben Strecke zurück.

    Args:
        df (pd.DataFrame): Input Dataframe
        new_name (str): Name der neuen Spalte
        fill_with_column (str): Name der Spalte die den relevanten Wert enthält
        previous (bool, optional): Richtung der Berechnung. Defaults to False.

    Returns:
        pd.DataFrame: Output Dataframe
    """
    temp = df.copy()

    temp['Flight_Date_Time'] = temp.apply(lambda x: x['Flight_Date'] + timedelta(hours=x['Departure_hour']), axis=1)
    temp = temp.sort_values(['route_abb', 'Flight_Date_Time'], axis=0)

    routen = temp['route_abb'].unique()
    df_list = []

    temp[new_name] = 0.0

    for route in routen:
        subset = temp[temp['route_abb'] == route].reset_index(drop=True)

        for index, row in subset.iterrows():
            f1 = subset[subset['flight_unique_id'] != row['flight_unique_id']] 

            if previous:
                f2 = f1[f1['Flight_Date_Time'] <= row['Flight_Date_Time']]
                f3 = f2[f2['Request_Date'] >= row['Request_Date']]
            else:
                f2 = f1[f1['Flight_Date_Time'] >= row['Flight_Date_Time']]
                f3 = f2[f2['Request_Date'] >= row['Request_Date']]
                
            f4 = f3.sort_values(['Flight_Date_Time', 'Request_Date'])

            if index >= len(f4):
                if len(f4) == 0:
                    subset.at[index, new_name] = 0
                else:
                    subset.at[index, new_name] = f4[fill_with_column].iloc[0] 
            else:    
                subset.at[index, new_name] = f4[fill_with_column].iloc[0] 
        
        df_list.append(subset)
    return pd.concat(df_list).reset_index(drop=True).drop(['Flight_Date_Time'], axis=1)

In [95]:
def calculate_delta_till_next_flight(df: pd.DataFrame, forward: bool=True) -> pd.DataFrame:
    """Diese Funktion berechnet die Zeit bis zum nächsten Flug der selben FlightID.

    Args:
        df (pd.DataFrame): Input Dataframe
        forward (bool, optional): Bestimmt die Richtung der Berechnung. Defaults to True.

    Returns:
        pd.DataFrame: _description_
    """
    temp = df.copy()
    org = df.copy()

    temp['Departure_Time'] = temp.apply(lambda x: x['Flight_Date'] + timedelta(hours=x['Departure_hour']), axis=1)
    org['Flight_Date_Time'] = org.apply(lambda x: x['Flight_Date'] + timedelta(hours=x['Departure_hour']), axis=1)
    
    temp = temp.sort_values(by=['route_abb', 'Departure_Time'], axis=0, ascending=forward)
    routen = temp['route_abb'].unique()

    sorted_df = temp
    df_list = []

    for route in routen:
        temp = sorted_df[sorted_df['route_abb'] == route]
        temp = pd.DataFrame(temp[['Departure_Time', 'route_abb']])
        temp = temp.drop_duplicates()
        temp.rename(columns={ 'Departure_Time': 0}, inplace=True)

        if forward:
            temp['delta'] = temp[0] - temp[0].shift()
            column_name = 'last_departure'
        else:
            temp['delta'] = temp[0].shift() - temp[0]
            column_name = 'next_departure'

        temp['delta'] = temp['delta'].dt.seconds / 60 / 60
        temp['delta'] = temp['delta'].fillna(0)
        temp.rename(columns={ 0: 'Flight_Date_Time', 'delta' :column_name,}, inplace=True)
        
        df_list.append(temp)

    mapping = pd.concat(df_list)

    mapping = mapping.dropna()

    org = pd.merge(left=org, right=mapping, how='inner', on=['Flight_Date_Time', 'route_abb'])

    return org.drop(columns='Flight_Date_Time', axis=1).reset_index(drop=True)

In [96]:
def get_last_n_prices(df: pd.DataFrame, n:int =10) -> pd.DataFrame:
    """Liefert die Preishistorie für eine gegebene Anzahl an historischen Anfragen.

    Args:
        df (pd.DataFrame): Input Dataframe
        n (int, optional): Anzahl an historischen Preisen. Defaults to 10.

    Returns:
        (pd.DataFrame): Output Dataframe mit Preishistorie
    """
    last_n_requests = n
    Prices = []
    temp = df.copy()

    for row in temp.itertuples():
        row_prices = []
        for i in range(row.previous_requests):
            if len(row_prices) >= last_n_requests:
                row_prices.pop(0)    
            row_prices.append(getattr(row, 'Price_In_Eur_y').split(',')[i])
        Prices.append(','.join(row_prices))

    temp['Prices_cut'] = Prices
    split_to_columns = temp['Prices_cut'].str.split(',', expand=True)
    split_to_columns = split_to_columns.apply(pd.to_numeric)
    split_to_columns.columns = split_to_columns.columns.map(str)

    return pd.concat([temp.drop(['Prices_cut', 'Price_In_Eur_y'], 1), split_to_columns], axis=1).rename(columns={'Price_In_Eur_x' : 'Price_In_Eur'})

In [97]:
def prep_data(data: pd.DataFrame, last_n_requests: int, test: bool = False) -> pd.DataFrame:
    """Wendet die benötigten Transformationen auf die Rohdaten an und gibt ein fertiges DataFrame für das Training zurück.

    Args:
        data (pd.DataFrame): Input Dataframe
        last_n_requests (int): Anzahl Schritte die die Preishistorie zurückgeht
        test (bool): Test- oder Trainingsset
    Returns:
        pd.DataFrame: Output Dataframe
    """
    
    logging.info('Pipeline gestartet.')
    
    # Zum sortieren des Dataframes
    data.insert(0, 'row_id', range(0, len(data)))

    # Datentypen ändern
    data['Flight_Date'] = pd.to_datetime(data['Flight_Date'])
    data['Request_Date'] = pd.to_datetime(data['Request_Date']).dt.tz_localize(None)
    
    # Abstand zu letztem und nächsten Flug berechnen
    forward = calculate_delta_till_next_flight(data, forward=True).reset_index(drop=True)
    backward = calculate_delta_till_next_flight(forward, forward=False)
    data = backward

    # Preis des dichtesten letzten und nächsten Fluges berechnen
    data = get_price_of_closest_flight(data, 'price_of_next_flight', 'Price_In_Eur')
    data = get_price_of_closest_flight(data, 'price_of_previous_flight', 'Price_In_Eur', previous=True)

    # One Hot Encoding für Routen-Bezeichnungen
    data = pd.get_dummies(data,prefix=['route'], columns = ['route_abb'], drop_first=False)
    
    # Flag, wenn die Anfrage die letzte Anfrage vor dem Flug ist
    is_last_request = pd.DataFrame(data.groupby('flight_unique_id')['Request_Date'].max()).reset_index()
    is_last_request['is_last_request'] = 1

    data = data.merge(is_last_request, 
                      on=['flight_unique_id', 'Request_Date'], 
                      how='left')
    data['is_last_request'] = data['is_last_request'].fillna(0)
    data['is_last_request'] = data['is_last_request'].astype(int)
    
    # Anzahl der bisherigen Requests als Feature hinzufügen
    data = get_previous_requests(data)
    
    # Preishistorie berechnen
    data = merge_price_history(data)
    data = get_last_n_prices(data, last_n_requests)

    # Preisänderung zur letzten Abfrage in %
    data['PriceChange'] = data.apply(
        lambda row:( (row['Price_In_Eur'] / row[str(last_n_requests-1)] )-1 if (row['previous_requests'] >= last_n_requests or row['previous_requests'] == 0) else (row['Price_In_Eur'] / row[ str(row['previous_requests'] -1) ]) - 1), axis=1)

    # Datumsfelder in einzelne Bestandteile zerlegen
    data['flight_weekday'] = data['Flight_Date'].dt.weekday
    data['flight_day'] = data['Flight_Date'].dt.day
    data['flight_month'] = data['Flight_Date'].dt.month 
    data['flight_is_weekend'] = data['flight_weekday'] >= 5

    data['request_weekday'] = data['Request_Date'].dt.weekday
    data['request_day'] = data['Request_Date'].dt.day
    data['request_month'] = data['Request_Date'].dt.month
    data['request_is_weekend'] = data['request_weekday'] >= 5
    
    data['request_hour'] = data['Request_Date'].dt.hour
    
    # Cyclische Features in Sinus und Cosinus Repräsentation umwandeln
    # Quelle: https://www.mikulskibartosz.name/time-in-machine-learning/
    def encode(data, col, max_val):
        data[col + '_sin'] = np.sin(2 * np.pi * data[col]/max_val)
        data[col + '_cos'] = np.cos(2 * np.pi * data[col]/max_val)
        return data

    data = encode(data, 'request_weekday', 7)
    data = encode(data, 'request_month', 12)
    data = encode(data, 'request_day', 365)
    data = encode(data, 'request_hour', 24)

    data = encode(data, 'flight_weekday', 7)
    data = encode(data, 'flight_month', 12)
    data = encode(data, 'flight_day', 365)
    data = encode(data, 'Departure_hour', 24)
    
    # Tage bis zum Flug berechnen
    data['Request_Date_w/o_Time'] = pd.to_datetime(data['Request_Date']).dt.date
    data['days_remaining'] = (pd.to_datetime(data['Flight_Date']).dt.date - data['Request_Date_w/o_Time']).dt.days
    data.drop(['Request_Date_w/o_Time'],1, inplace=True)
    
    # Relevante Feiertage im Zeitraum der Daten, die in Berlin und oder Frankfurt gelten
    # sowie Public Holidays in Großbritannien. 
    feiertage = {
        '2019-06-09':'Pfingstsonntag',
        '2019-06-10':'Pfingstmontag',
        '2019-06-20':'Fronleichnam',
        '2019-06-20':'Schulferien Beginn',
        '2019-08-02':'Schulferien Ende',
        '2019-08-26':'Summer Bank Holidays',
        '2019-07-15':'School Summer Holidays Beginn',
        '2019-09-06':'School Summer Holidays End'}

    feiertage_df = pd.DataFrame(feiertage.items(), columns=['Datum_Feiertag', 'Feiertag_Bezeichnung'])
    feiertage_df['Datum_Feiertag'] = pd.to_datetime(feiertage_df['Datum_Feiertag'])
    
    from datetime import datetime

    day_diff_list = []
    for index, row in feiertage_df.iterrows():
        day_diff_list.append(abs((data['Flight_Date'] - row['Datum_Feiertag']).dt.days))
        
    feiertage_diff_df = pd.concat(day_diff_list, axis=1)
    feiertage_diff_df = feiertage_diff_df.min(axis=1)
    feiertage_diff_df = feiertage_diff_df.reset_index().drop('index', 1)
    feiertage_diff_df.columns = ['Days_Untill_Event']

    data = pd.concat([data, feiertage_diff_df], axis=1)

    try:
        data.drop(['index'], axis=1, inplace=True)
    except:
        pass

    # Features skalieren
    numerics = ['int16', 'int32', 'int64', 'float16', 'float32', 'float64']
    dont_scale = ['is_last_request', 'row_id']
    if not test: 
        dont_scale.append('buy')
        dont_scale.append('min_future_price_in_Eur')
    to_be_scaled = data.select_dtypes(include=numerics).drop(dont_scale, axis=1)
    data.drop(to_be_scaled.columns, axis=1, inplace=True)
    
    if not test:
        scaler = MinMaxScaler()
        scaled = pd.DataFrame(scaler.fit_transform(to_be_scaled), columns=to_be_scaled.columns)
        joblib.dump(scaler, '../Models/Scalers/minmax.gz')
    else:
        scaler = joblib.load('../Models/Scalers/minmax.gz')
        scaled = pd.DataFrame(scaler.transform(to_be_scaled), columns=to_be_scaled.columns)

    data = pd.concat([data.reset_index(), scaled.reset_index()], axis=1)
    
    # Dataframe wieder in ursprüngliche Reihenfolge bringen
    data = data.sort_values('row_id')

    # Nicht mehr benötigte Spalten entfernen
    drop_list = ['Request_Date', 
               'Flight_Date',  
            #    'index', 
               'Departure_hour', 
               'flight_weekday', 
               'flight_day', 
               'row_id',
               'flight_month', 
               'request_weekday', 
               'request_day', 
               'request_month', 
            #    'days_remaining',
               'request_hour', 
            #    'Days_Untill_Event',
            #    'previous_requests',
               'flight_unique_id']
    if not test: drop_list.append('min_future_price_in_Eur')
    data.drop(drop_list, inplace=True, axis=1)
    
    # Boolean in Int umwandeln
    data['request_is_weekend'] = data['request_is_weekend'].astype(int)
    data['flight_is_weekend'] = data['flight_is_weekend'].astype(int)
    
    try:
        data.drop(['index'], axis=1, inplace=True)
        data.drop(['index'], axis=1, inplace=True)
    except:
        pass

    logging.info('Pipeline beendet.')
    return data.reset_index(drop=True)

Pipeline starten

In [98]:
last_n_requests = 15

filename_train = os.path.join(dirname, '../Data/raw/train_set.csv')
raw_data_train = pd.read_csv(filename_train, index_col=0)

data_train = prep_data(raw_data_train, last_n_requests)

INFO -  2022-03-20 11:11:31,987: Pipeline gestartet.
  return pd.concat([temp.drop(['Prices_cut', 'Price_In_Eur_y'], 1), split_to_columns], axis=1).rename(columns={'Price_In_Eur_x' : 'Price_In_Eur'})
  data.drop(['Request_Date_w/o_Time'],1, inplace=True)
  feiertage_diff_df = feiertage_diff_df.reset_index().drop('index', 1)
INFO -  2022-03-20 11:31:46,537: Pipeline beendet.


In [99]:
filename_test = os.path.join(dirname, '../Data/raw/test_set.csv')
raw_data_test = pd.read_csv(filename_test, index_col=0)

data_test = prep_data(raw_data_test, last_n_requests, test=True)

INFO -  2022-03-20 11:31:46,712: Pipeline gestartet.
  return pd.concat([temp.drop(['Prices_cut', 'Price_In_Eur_y'], 1), split_to_columns], axis=1).rename(columns={'Price_In_Eur_x' : 'Price_In_Eur'})
  data.drop(['Request_Date_w/o_Time'],1, inplace=True)
  feiertage_diff_df = feiertage_diff_df.reset_index().drop('index', 1)
INFO -  2022-03-20 11:32:19,691: Pipeline beendet.


Bearbeitete Daten speichern

In [100]:
filename = os.path.join(dirname, f'../Data/prepped/train_set_n{last_n_requests}.csv')

if input(f'{filename} speichern? y/n') == "y":
    data_train.to_csv(filename)
    logging.info('Datei gespeichernt.')
else:
    logging.info('Nicht gespeichert.')

INFO -  2022-03-20 11:44:39,253: Datei gespeichernt.


In [101]:
filename = os.path.join(dirname, f'../Data/prepped/test_set_n{last_n_requests}.csv')

if input(f'{filename} speichern? y/n') == "y":
    data_test.to_csv(filename)
    logging.info('Datei gespeichernt.')
else:
    logging.info('Nicht gespeichert.')

INFO -  2022-03-20 11:44:40,589: Datei gespeichernt.
