Notebook to extract the dataset

In [1]:
import os
import sys
import pandas as pd

operating_system ='curnagl'
if operating_system == 'win':
    os.chdir('C:/Users/fabau/OneDrive/Documents/GitHub/master-project-cleaned/')
    path = f'C:/Users/fabau/OneDrive/Documents/GitHub/master-project/'
elif operating_system == 'curnagl':
    os.chdir('/work/FAC/FGSE/IDYST/tbeucler/default/fabien/repos/cleaner_version/')
    path = f'/work/FAC/FGSE/IDYST/tbeucler/default/fabien/repos/cleaner_version/'
else:
    os.chdir('/Users/fabienaugsburger/Documents/GitHub/master-project-cleaned/')
    path = f'/Users/fabienaugsburger/Documents/GitHub/master-project-cleaned/'

# Add the path to your custom library
'''if operating_system == 'curnagl':
    custom_library_path = os.path.abspath(f'{path}/cleaner_version/util/processing/')
else:'''
custom_library_path = os.path.abspath(f'{path}/util/processing/')
sys.path.append(custom_library_path)

import extraction_squares

levels = pd.read_csv(f'{path}data/levels.csv')
variables = (f'{path}data/variable_list_SL.csv')

In [7]:
# get current working directory
cwd = os.getcwd()
print(cwd)

/work/FAC/FGSE/IDYST/tbeucler/default/fabien/repos/cleaner_version


In [2]:
# Creation of the configuration file

config_file = f'{path}/data/config_35_var.txt'
start_year = 1990
end_year = 2021
variable_to_exclude = ['specific_rain_water_content', 'relative_humidity', 'specific_humidity', 'vertical_velocity']
years_to_exclude = [2003, 2012, 2018]
full_pressure = levels['levels'].values

extraction_squares.generate_config(variables, config_file, start_year, end_year, variable_to_exclude, years_to_exclude, full_pressure)

In [4]:
from extraction_squares import filter_rows_exclude, filter_rows, read_column_values

def filtering_storms(
    variables_csv,
    timestep,
    path,
    choosen_directory,
    levels,
    filter_type='EU',
    continuous_EU=False,
    continuous_non_EU=True,
    all_details=False
):
    """
    Generalized filtering function for both EU and non-EU storms with options for continuous or non-continuous filtering.

    Args:
        variables_csv (str): Path to the variables CSV file.
        timestep (str): Timestep for the datasets.
        path (str): Base path for datasets and outputs.
        choosen_directory (str): Directory to save the output datasets.
        levels (pd.DataFrame): Dataframe containing levels information.
        filter_type (str): Either 'EU' or 'non_EU' to indicate the type of filtering.
        continuous_EU (bool): For EU filtering, if True keeps the first continuous EU steps, otherwise keeps all EU steps. Default False.
        continuous_non_EU (bool): For non-EU filtering, if True keeps steps up to landfall-1, otherwise keeps all non-EU steps. Default True.
        all_details (bool): Print detailed logs. Default False.
    """
    levels = levels['levels'].to_list()

    # Read variables
    variables = pd.read_csv(variables_csv)['variables']
    storms = [f"{i}" for i in range(1, 97)]
    stats = ["max", "mean", "min", "std"]

    base_dir_csv1 = f"{path}data/datasets_{timestep}"
    base_dir_csv2 = f"{path}pre_processing/tracks/ALL_TRACKS/tracks_{timestep}_EU"

    output_base_dir = f"{path}{choosen_directory}datasets_{timestep}_{filter_type}"
    os.makedirs(output_base_dir, exist_ok=True)

    print(f"Taking data from {base_dir_csv1}")
    print(f"With condition based on {base_dir_csv2}")

    for variable in variables:
        os.makedirs(os.path.join(output_base_dir, variable), exist_ok=True)
        for storm in storms:
            os.makedirs(os.path.join(output_base_dir, variable, f"storm_{storm}"), exist_ok=True)
            for level in levels:
                for stat in stats:
                    csv_file1 = os.path.join(base_dir_csv1, variable, f"storm_{storm}", f"{stat}_{storm}_{level}.csv")
                    csv_file2 = os.path.join(base_dir_csv2, f"storm_{storm}.csv")
                    output_file = os.path.join(output_base_dir, variable, f"storm_{storm}", f"{stat}_{storm}_{level}.csv")

                    os.makedirs(os.path.dirname(output_file), exist_ok=True)

                    if os.path.exists(csv_file1) and os.path.exists(csv_file2):
                        filter_values = read_column_values(csv_file2, 'step')

                        # EU-specific filtering
                        if filter_type == 'EU':
                            if continuous_EU:
                                # Keep only the first continuous block of EU steps
                                first_non_EU_idx = next((i for i, step in enumerate(filter_values) if step < filter_values[0]), len(filter_values))
                                filter_values = filter_values[:first_non_EU_idx]

                        # Non-EU-specific filtering
                        elif filter_type == 'non_EU':
                            if continuous_non_EU:
                                # Keep steps up to landfall-1
                                if len(filter_values) > 1:
                                    filter_values = filter_values[:-1]  # Exclude last step (landfall)
                                else:
                                    filter_values = []  # Handle edge case for storms with no valid landfall

                        # Apply filtering logic
                        if filter_type == 'EU':
                            filter_rows(csv_file1, output_file, '', filter_values)
                        elif filter_type == 'non_EU':
                            filter_rows_exclude(csv_file1, output_file, '', filter_values)

                        if all_details:
                            print(f"Filtered rows for {variable}, {storm}, level {level}, stat {stat} written to {output_file}")
                    else:
                        if all_details:
                            print(f"Skipped {variable}, {storm}, level {level}, stat {stat} due to missing files")
        print(f"Finished filtering {variable}")

In [None]:
def filtering_storms(
    variables_csv,
    timestep,
    path,
    choosen_directory,
    levels,
    filter_type='EU',
    continuous_EU=False,
    continuous_non_EU=True,
    all_details=False
):
    """
    Generalized filtering function for both EU and non-EU storms with options for continuous or non-continuous filtering.

    Args:
        variables_csv (str): Path to the variables CSV file.
        timestep (str): Timestep for the datasets.
        path (str): Base path for datasets and outputs.
        choosen_directory (str): Directory to save the output datasets.
        levels (pd.DataFrame): Dataframe containing levels information.
        filter_type (str): Either 'EU' or 'non_EU' to indicate the type of filtering.
        continuous_EU (bool): For EU filtering, if True keeps the first continuous EU steps, otherwise keeps all EU steps. Default False.
        continuous_non_EU (bool): For non-EU filtering, if True keeps continuous steps up to a gap, otherwise keeps all non-EU steps. Default True.
        all_details (bool): Print detailed logs. Default False.
    """
    levels = levels['levels'].to_list()

    # Read variables
    variables = pd.read_csv(variables_csv)['variables']
    storms = [f"{i}" for i in range(1, 97)]
    stats = ["max", "mean", "min", "std"]

    base_dir_csv1 = f"{path}data/datasets_{timestep}"
    base_dir_csv2 = f"{path}pre_processing/tracks/ALL_TRACKS/tracks_{timestep}_EU"

    output_base_dir = f"{path}{choosen_directory}datasets_{timestep}_{filter_type}"
    os.makedirs(output_base_dir, exist_ok=True)

    print(f"Taking data from {base_dir_csv1}")
    print(f"With condition based on {base_dir_csv2}")

    for variable in variables:
        os.makedirs(os.path.join(output_base_dir, variable), exist_ok=True)
        for storm in storms:
            os.makedirs(os.path.join(output_base_dir, variable, f"storm_{storm}"), exist_ok=True)
            for level in levels:
                for stat in stats:
                    csv_file1 = os.path.join(base_dir_csv1, variable, f"storm_{storm}", f"{stat}_{storm}_{level}.csv")
                    csv_file2 = os.path.join(base_dir_csv2, f"storm_{storm}.csv")
                    output_file = os.path.join(output_base_dir, variable, f"storm_{storm}", f"{stat}_{storm}_{level}.csv")

                    os.makedirs(os.path.dirname(output_file), exist_ok=True)

                    if os.path.exists(csv_file1) and os.path.exists(csv_file2):
                        filter_values = read_column_values(csv_file2, 'step')
                        # Ensure steps are numeric
                        filter_values = [int(value) for value in filter_values]  # Convert to integers

                        # EU-specific filtering
                        if filter_type == 'EU':
                            if continuous_EU:
                                # Keep only the first continuous block of EU steps
                                first_non_EU_idx = next((i for i, step in enumerate(filter_values) if step < filter_values[0]), len(filter_values))
                                filter_values = filter_values[:first_non_EU_idx]

                        # Non-EU-specific filtering
                        elif filter_type == 'non_EU':
                            if continuous_non_EU:
                                # Identify the first discontinuity in steps
                                continuous_values = []
                                for i in range(len(filter_values) - 1):
                                    continuous_values.append(filter_values[i])
                                    if filter_values[i + 1] - filter_values[i] > 1:  # Check for a gap
                                        break
                                filter_values = continuous_values

                        # Apply filtering logic
                        if filter_type == 'EU':
                            filter_rows(csv_file1, output_file, '', filter_values)
                        elif filter_type == 'non_EU':
                            filter_rows_exclude(csv_file1, output_file, '', filter_values)

                        if all_details:
                            print(f"Filtered rows for {variable}, {storm}, level {level}, stat {stat} written to {output_file}")
                    else:
                        if all_details:
                            print(f"Skipped {variable}, {storm}, level {level}, stat {stat} due to missing files")
        print(f"Finished filtering {variable}")
        print('lol')

In [22]:
# test of the function
filtering_storms(
    variables,
    '1h',
    path,
    'test/',
    levels,
    filter_type='non_EU',
    continuous_EU=False,
    continuous_non_EU=True,
    all_details=False
)

Taking data from /work/FAC/FGSE/IDYST/tbeucler/default/fabien/repos/cleaner_version/data/datasets_1h
With condition based on /work/FAC/FGSE/IDYST/tbeucler/default/fabien/repos/cleaner_version/pre_processing/tracks/ALL_TRACKS/tracks_1h_EU
Finished filtering 10m_u_component_of_wind
Finished filtering 10m_v_component_of_wind
Finished filtering 2m_dewpoint_temperature
Finished filtering 2m_temperature
Finished filtering convective_available_potential_energy
Finished filtering convective_precipitation
Finished filtering convective_rain_rate
Finished filtering convective_snowfall
Finished filtering geopotential
Finished filtering high_cloud_cover
Finished filtering instantaneous_10m_wind_gust
Finished filtering k_index
Finished filtering large_scale_precipitation
Finished filtering large_scale_snowfall
Finished filtering mean_large_scale_precipitation_rate
Finished filtering mean_top_net_long_wave_radiation_flux
Finished filtering mean_top_net_short_wave_radiation_flux
Finished filtering mea

In [2]:
# creation of the EU dataset with timestep 1 hour

extraction_squares.filtering_EU_storms(f'{path}data/variable_list_SL.csv', '1h',path,'data/', levels)

Taking data from /work/FAC/FGSE/IDYST/tbeucler/default/fabien/repos/cleaner_version/data/datasets_1h
With condition based on /work/FAC/FGSE/IDYST/tbeucler/default/fabien/repos/cleaner_version/pre_processing/tracks/ALL_TRACKS/tracks_1h_EU
Finished filtering 10m_u_component_of_wind
Finished filtering 10m_v_component_of_wind
Finished filtering 2m_dewpoint_temperature
Finished filtering 2m_temperature
Finished filtering convective_available_potential_energy
Finished filtering convective_precipitation
Finished filtering convective_rain_rate
Finished filtering convective_snowfall
Finished filtering geopotential
Finished filtering high_cloud_cover
Finished filtering instantaneous_10m_wind_gust
Finished filtering k_index
Finished filtering large_scale_precipitation
Finished filtering large_scale_snowfall
Finished filtering mean_large_scale_precipitation_rate
Finished filtering mean_top_net_long_wave_radiation_flux
Finished filtering mean_top_net_short_wave_radiation_flux
Finished filtering mea

In [3]:
# creation of the non EU dataset with timestep 1 hour

extraction_squares.filtering_non_EU_storms(f'{path}data/variable_list_SL.csv', '1h',path,'data/', levels)

Taking data from /work/FAC/FGSE/IDYST/tbeucler/default/fabien/repos/cleaner_version/data/datasets_1h
With condition based on /work/FAC/FGSE/IDYST/tbeucler/default/fabien/repos/cleaner_version/pre_processing/tracks/ALL_TRACKS/tracks_1h_EU
Finished filtering 10m_u_component_of_wind
Finished filtering 10m_v_component_of_wind
Finished filtering 2m_dewpoint_temperature
Finished filtering 2m_temperature
Finished filtering convective_available_potential_energy
Finished filtering convective_precipitation
Finished filtering convective_rain_rate
Finished filtering convective_snowfall
Finished filtering geopotential
Finished filtering high_cloud_cover
Finished filtering instantaneous_10m_wind_gust
Finished filtering k_index
Finished filtering large_scale_precipitation
Finished filtering large_scale_snowfall
Finished filtering mean_large_scale_precipitation_rate
Finished filtering mean_top_net_long_wave_radiation_flux
Finished filtering mean_top_net_short_wave_radiation_flux
Finished filtering mea