In [146]:
import pandas as pd
import matplotlib.pyplot as plt
import awswrangler as wr
import boto3
pd.set_option('display.max_columns', None)

boto3.setup_default_session(profile_name='patricio_ferreira_fellow_dssgx_24')

bucket = "dssgx-munich-2024-bavarian-forest"
raw_data_folder = "raw-data"
preprocessed_data_folder = "preprocessed_data"

def load_csv_files_from_aws_s3(path: str, **kwargs) -> pd.DataFrame:
    """Loads individual or multiple CSV files from an AWS S3 bucket.
    Args:
        path (str): The path to the CSV files on AWS S3.
        **kwargs: Additional arguments to pass to the read_csv function.
    Returns:
        pd.DataFrame: The DataFrame containing the data from the CSV files.
    """
    df = wr.s3.read_csv(path=path, **kwargs)
    return df
df = load_csv_files_from_aws_s3(
    path="s3://dssgx-munich-2024-bavarian-forest/preprocessed_data/joined_sensor_weather_visitorcenter_2016-2024.csv"
)
df.head()


  df: pd.DataFrame = parser_func(f, **pandas_kwargs)


Unnamed: 0,Time,Bayerisch Eisenstein IN,Bayerisch Eisenstein OUT,Brechhäuslau IN,Brechhäuslau OUT,Deffernik IN,Deffernik OUT,Diensthüttenstraße IN,Diensthüttenstraße OUT,Felswandergebiet IN,Felswandergebiet OUT,Ferdinandsthal IN,Ferdinandsthal OUT,Fredenbrücke IN,Fredenbrücke OUT,Gfäll IN,Gfäll OUT,Gsenget IN,Gsenget OUT,Klingenbrunner Wald IN,Klingenbrunner Wald OUT,Klosterfilz IN,Klosterfilz OUT,Racheldiensthütte IN,Racheldiensthütte OUT,Sagwassersäge IN,Sagwassersäge OUT,Scheuereck IN,Scheuereck OUT,Schillerstraße IN,Schillerstraße OUT,Schwarzbachbrücke IN,Schwarzbachbrücke OUT,Falkenstein 2 OUT,Falkenstein 2 IN,Lusen 2 IN,Lusen 2 OUT,Lusen 3 IN,Lusen 3 OUT,Waldhausreibe IN,Waldhausreibe OUT,Waldspielgelände IN,Waldspielgelände OUT,Wistlberg IN,Wistlberg OUT,Bucina MERGED IN,Bucina MERGED OUT,Falkenstein 1 MERGED IN,Falkenstein 1 MERGED OUT,Lusen 1 MERGED IN,Lusen 1 MERGED OUT,Trinkwassertalsperre MERGED IN,Trinkwassertalsperre MERGED OUT,Bayerisch Eisenstein_active,Brechhäuslau_active,Bučina_active,Deffernik_active,Diensthüttenstraße_active,Felswandergebiet_active,Ferdinandsthal_active,Fredenbrücke_active,Gfäll_active,Gsenget_active,Klingenbrunner Wald_active,Klosterfilz_active,Racheldiensthütte_active,Sagwassersäge_active,Scheuereck_active,Schillerstraße_active,Schwarzbachbrücke_active,TFG Falkenstein 1_active,TFG Falkenstein 2_active,TFG Lusen 1_active,TFG Lusen 2_active,TFG Lusen 3_active,Trinkwassertalsperre_active,Waldhausreibe_active,Waldspielgelände_active,Wistlberg_active,working_sensors,traffic_abs,sum_IN_abs,sum_OUT_abs,diff_abs,occupancy_abs,traffic_norm,sum_IN_norm,sum_OUT_norm,diff_norm,occupancy_norm,Temperature (°C),Relative Humidity (%),Precipitation (mm),Wind Speed (km/h),Sunshine Duration (min),Tag,Monat,Jahr,Wochentag,Wochenende,Jahreszeit,Laubfärbung,Besuchszahlen_HEH,Besuchszahlen_HZW,Besuchszahlen_WGM,Parkpl_HEH_PKW,Parkpl_HEH_BUS,Parkpl_HZW_PKW,Parkpl_HZW_BUS,Schulferien_Bayern,Schulferien_CZ,Feiertag_Bayern,Feiertag_CZ,HEH_geoeffnet,HZW_geoeffnet,WGM_geoeffnet,Lusenschutzhaus_geoeffnet,Racheldiensthuette_geoeffnet,Waldschmidthaus_geoeffnet,Falkensteinschutzhaus_geoeffnet,Schwellhaeusl_geoeffnet,Temperatur,Niederschlagsmenge,Schneehoehe,GS mit,GS max,Total
0,2016-01-01 00:00:00,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,-1.4,96.0,0.0,0.0,0.0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
1,2016-01-01 01:00:00,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,-1.4,97.0,0.0,2.9,0.0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
2,2016-01-01 02:00:00,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,-1.3,95.0,0.0,2.5,0.0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
3,2016-01-01 03:00:00,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,-1.4,98.0,0.0,2.5,0.0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
4,2016-01-01 04:00:00,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,-1.3,100.0,0.0,3.6,0.0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,


In [147]:
#slice at first non null value on feiertag_bayern
first_non_null_index = df['Feiertag_Bayern'].first_valid_index()

df = df.loc[first_non_null_index: ]

In [148]:
def add_nearest_holiday_distance(df):
    """
    Add columns to the DataFrame calculating the distance to the nearest holiday 
    for both 'Feiertag_Bayern' and 'Feiertag_CZ'.

    Args:
        df (pd.DataFrame): DataFrame with 'Time', 'Feiertag_Bayern', and 'Feiertag_CZ' columns.
            - 'Time': Datetime column with timestamps.
            - 'Feiertag_Bayern': Boolean column indicating if the date is a holiday in Bayern.
            - 'Feiertag_CZ': Boolean column indicating if the date is a holiday in CZ.

    Returns:
        pd.DataFrame: DataFrame with two new columns:
            - 'Distance_to_Nearest_Holiday_Bayern': Distance in days to the nearest holiday in Bayern.
            - 'Distance_to_Nearest_Holiday_CZ': Distance in days to the nearest holiday in CZ.
    """
    
    # Ensure the Time column is in datetime format
    df['Time'] = pd.to_datetime(df['Time'])

    # Extract date from Time column
    df['Date'] = df['Time'].dt.date

    # Extract unique dates for holidays
    bayern_holidays = df[df['Feiertag_Bayern']]['Date'].unique()
    cz_holidays = df[df['Feiertag_CZ']]['Date'].unique()

    # Create a DataFrame with unique dates
    dates_df = pd.DataFrame({'Date': df['Date'].unique()})

    def get_nearest_holiday_distance(date, holidays):
        """
        Calculate the distance in days to the nearest holiday.

        Args:
            date (pd.Timestamp): The date for which to calculate the distance.
            holidays (np.ndarray): Array of holiday dates.

        Returns:
            float: Distance in days to the nearest holiday, or NaN if no holidays are provided.
        """
        if len(holidays) == 0:
            return np.nan
        nearest_holiday = min(abs((date - pd.to_datetime(holidays)).days))
        return nearest_holiday

    # Apply the function to calculate distances for both sets of holidays
    dates_df['Distance_to_Nearest_Holiday_Bayern'] = dates_df['Date'].apply(
        lambda x: get_nearest_holiday_distance(pd.to_datetime(x), bayern_holidays)
    )
    dates_df['Distance_to_Nearest_Holiday_CZ'] = dates_df['Date'].apply(
        lambda x: get_nearest_holiday_distance(pd.to_datetime(x), cz_holidays)
    )

    # Merge the distances back with the original DataFrame
    df = df.merge(dates_df, on='Date', how='left')

    return df

In [149]:
df = add_nearest_holiday_distance(df)

In [150]:
def add_daily_max_values(df, columns):
    """
    Add columns to the DataFrame that show the maximum value for each specified column,
    repeated for every hour of that day.

    Args:
        df (pd.DataFrame): DataFrame with 'Time' and multiple weather-related columns.
            - 'Time': Datetime column with timestamps.
            - columns (list of str): List of column names to compute the daily maximum values for.

    Returns:
        pd.DataFrame: DataFrame with new columns that contain the maximum values for each day,
                      repeated for every hour.
    """
    # Ensure the Time column is in datetime format
    df['Time'] = pd.to_datetime(df['Time'])

    # Extract the date from the Time column
    df['Date'] = df['Time'].dt.date

    # Create a DataFrame to store daily max values
    daily_max_df = df.groupby('Date')[columns].max().reset_index()

    # Rename columns to indicate they are daily maximum values
    daily_max_df = daily_max_df.rename(columns={col: f'Daily_Max_{col}' for col in columns})

    # Merge the daily max values back into the original DataFrame
    df = df.merge(daily_max_df, on='Date', how='left')

    return df

def add_moving_z_scores(df, columns, window_size):
    """
    Add moving z-score columns for the specified columns based on their daily maximum values.

    Args:
        df (pd.DataFrame): DataFrame with 'Time' and daily maximum columns.
            - 'Time': Datetime column with timestamps.
            - columns (list of str): List of column names to compute the moving z-scores for.
            - window_size (int): Size of the moving window in days.

    Returns:
        pd.DataFrame: DataFrame with new columns that contain the moving z-scores for each column.
    """
    # Ensure the Time column is in datetime format
    df['Time'] = pd.to_datetime(df['Time'])

    # Extract unique dates with daily max values
    daily_df = df[['Date'] + [f'Daily_Max_{col}' for col in columns]].drop_duplicates()

    # Calculate rolling mean and standard deviation for daily max values
    for col in columns:
        daily_max_col = f'Daily_Max_{col}'

        # Calculate rolling mean and standard deviation over the specified window size
        daily_df[f'Rolling_Mean_{daily_max_col}'] = daily_df[daily_max_col].rolling(window=window_size, min_periods=window_size).mean()
        daily_df[f'Rolling_Std_{daily_max_col}'] = daily_df[daily_max_col].rolling(window=window_size, min_periods=window_size).std()

        # Calculate the z-score
        daily_df[f'ZScore_{daily_max_col}'] = (
            (daily_df[daily_max_col] - daily_df[f'Rolling_Mean_{daily_max_col}']) /
            daily_df[f'Rolling_Std_{daily_max_col}']
        )

        # Drop the rolling mean and std columns as they are intermediate calculations
        daily_df.drop(columns=[f'Rolling_Mean_{daily_max_col}', f'Rolling_Std_{daily_max_col}'], inplace=True)

    # Merge the z-scores back into the original hourly DataFrame
    df = df.merge(daily_df[['Date'] + [f'ZScore_Daily_Max_{col}' for col in columns]], on='Date', how='left')

    return df



In [151]:
columns = [
    'Temperature (°C)',
    'Relative Humidity (%)',
    'Precipitation (mm)',
    'Wind Speed (km/h)',
    'Sunshine Duration (min)'
]

# Add daily max values
df = add_daily_max_values(df, columns)



In [152]:
# Add moving z-scores
window_size = 120 # Define the window size in days
df = add_moving_z_scores(df, columns, window_size)

In [155]:
# List of columns to drop (daily max columns)
daily_max_columns = [f'Daily_Max_{col}' for col in columns]

# Drop the daily max columns from the DataFrame
df.drop(columns=daily_max_columns, inplace=True)

In [156]:
output_file_name = "holidays_deltaweather_features_df.csv"
output_bucket = "dssgx-munich-2024-bavarian-forest"
output_data_folder = "preprocessed_data"

def write_csv_file_to_aws_s3(df: pd.DataFrame, path: str, **kwargs) -> pd.DataFrame:
    """Writes an individual CSV file to AWS S3.

    Args:
        df (pd.DataFrame): The DataFrame to write.
        path (str): The path to the CSV files on AWS S3.
        **kwargs: Additional arguments to pass to the to_csv function.
    """

    wr.s3.to_csv(df, path=path, **kwargs)
    return

write_csv_file_to_aws_s3(
    df=df,
    path=f"s3://{output_bucket}/{output_data_folder}/{output_file_name}",
    )