In [None]:
!pip install pyarrow

In [45]:
import pandas as pd
import numpy as np
import os

def load_csv_files(csv_directory="..\..\PROMICE-AWS-toolbox\out\L4"):
    
    # List all CSV files in the directory
    csv_files = [f for f in os.listdir(csv_directory) if f.endswith('.csv')]

    # Combine all CSV files into a single DataFrame
    dfs = []
    for f in csv_files:
        df = pd.read_csv(os.path.join(csv_directory, f), index_col=False)
        df.insert(0, 'stid', f[:-7])
        dfs.append(df)
    df = pd.concat(dfs)

    #read metadata from promice repository
    station = pd.read_csv('..\..\PROMICE-AWS-toolbox\metadata\AWS_station_locations.csv', index_col=False)
    station.to_csv('..\\data\\new_promice\\AWS_station_locations.csv', index=False)

    output_file = "..\\Data\\new_promice\\all_promice_data.parquet.gzip"
    df.to_parquet(output_file, compression='gzip', engine='pyarrow')

    #display(output station and columns summary)
    print('Stations loaded:')
    display(df['stid'].unique())
    print('columns in dataset:')
    print(list(df.columns))
    print("FINISHED LOADING CSV's")
    return df


def process_hourly_data(dataframe, directory="../data/new_promice/all_promice_data.parquet.gzip", add_meta_data=False):

    if not isinstance(dataframe, pd.DataFrame):
        df_hourly = pd.read_parquet(directory)
    else:
        df_hourly = dataframe.copy()

    # Add year column to dataframe
    df_hourly["Datetime"] = pd.to_datetime(df_hourly.time)

    #Rename Index Column to Datetime
    df_hourly = df_hourly.reset_index(inplace=False)

    if add_meta_data==True:
        add_nice_to_haves(df_hourly, 'hourly')

    #display(df_hourly.head(10))

    # Convert the DataFrame to a compressed Parquet file
    output_file = "../data/new_promice/all_promice_data_hourly.parquet.gzip"
    df_hourly.to_parquet(output_file, compression='gzip', engine='pyarrow')

    print("FINISHED PROCESSING HOURLY DATA")
    return df_hourly

def process_daily_data(dataframe=None, directory="", add_meta_data=False):
    
    if not isinstance(dataframe, pd.DataFrame):
        dataframe = pd.read_parquet(directory)
    else:
        df = dataframe.copy()

    # Define the date column that you want to group by (replace "date_column" with the name of your column)
    min_values_per_day = 20

    # Group the data by weather station and date
    grouped = df.groupby(['stid','date'])

    # Specify columns containing numerical values to be averaged
    columns_to_average = (df
                .select_dtypes(exclude=['object'])
                .drop(columns=['index', 'Datetime', 'DayOfYear', 'DayOfCentury'])
                .columns
    )
    # Calculate the number of non-NaN values for each variable within each group
    valid_date_observations = grouped[columns_to_average].apply(lambda x: x.notna().sum() <= min_values_per_day)

    # Calculate average per day per station
    df_filtered = grouped[columns_to_average].mean()

    #Remove means with less than 20 observations per day
    df_masked = df_filtered.mask(valid_date_observations, np.nan)

    df_daily = df_masked.reset_index().copy()

    df_daily["Datetime"] = pd.to_datetime(df_daily['date'])

    if add_meta_data==True:
        df_daily = add_nice_to_haves(df_daily, 'daily')

    # Convert the DataFrame to a compressed Parquet file
    output_file = "..\\Data\\new_promice\\all_promice_data_daily.parquet.gzip"
    df_daily.to_parquet(output_file, compression='gzip', engine='pyarrow')

    print("FINISHED PROCESSING DAILY DATA")
    # display(df_daily)
    return df_daily

def process_monthly_data(dataframe=None, directory="", add_meta_data=False):

    if not isinstance(dataframe, pd.DataFrame):
        df = pd.read_parquet(directory)
    else:
        df = dataframe.copy()

    df['month_year'] = df['Datetime'].dt.to_period('M')
    # Define the date column that you want to group by (replace "date_column" with the name of your column)
    min_values_per_month = 24

    # Create a new column with the month and year of the date column

    # Group the data by weather station and date
    grouped = df.groupby(['stid','month_year'])

    # Specify columns containing numerical values to be averaged
    columns_to_average = (df
                .select_dtypes(exclude=['object'])
                .drop(columns=['Datetime', 'DayOfYear', 'DayOfCentury'])
                .columns
    )

    # Calculate the number of non-NaN values for each variable within each group
    valid_date_observations = grouped[columns_to_average].apply(lambda x: x.notna().sum() <= min_values_per_month)

    # Calculate average per day per station
    df_filtered = grouped[columns_to_average].mean()

    #Remove means with less than 20 observations per day
    df_masked = df_filtered.mask(valid_date_observations, np.nan)

    df_monthly = df_masked.reset_index().copy()

    df_monthly["Datetime"] = pd.to_datetime(df_monthly['month_year'].astype(str) + '-01')

    if add_meta_data==True:
        df_monthly = add_nice_to_haves(df_monthly, 'monthly')

    #display(df_monthly)

    # Convert the DataFrame to a compressed Parquet file
    output_file = "..\\Data\\new_promice\\all_promice_data_monthly.parquet.gzip"
    df_monthly.to_parquet(output_file, compression='gzip', engine='pyarrow')

    print('FINISHED PROCESSING MONTHLY DATA')
    return df_monthly

###########################################################################################
### Helper functions

def add_station_info(df):
    station = pd.read_csv('../data/new_promice/AWS_station_locations.csv')
    station.rename(columns={'timestamp':'station_location_timestamp'})
    df.merge(station, how='left',on=['stid','stid'])
    return df

# helper functions for adding metadata like season, day, month, year, date, DayOfYear and DayOfCentury
def add_nice_to_haves(df, period):
    def add_season(df):
        seasons = {
            1: "Winter",
            2: "Winter",
            3: "Spring",
            4: "Spring",
            5: "Spring",
            6: "Summer",
            7: "Summer",
            8: "Summer",
            9: "Autumn",
            10: "Autumn",
            11: "Autumn",
            12: "Winter",
        }

        # Extract the month from the index and use the dictionary to map it to the corresponding season
        df["season"] = df['Datetime'].dt.month.map(seasons)
        return df

    def add_common(df):
        df["year"] = df['Datetime'].dt.strftime("%Y")
        df["month"] = df['Datetime'].dt.strftime("%B")
        df["date"] = df['Datetime'].dt.date
        df['DayOfYear'] = df['Datetime'].dt.dayofyear 
        df['DayOfCentury'] = df['Datetime'].dt.dayofyear+365*(df['Datetime'].dt.year-1)
        df = add_season(df)
        return df
        
    def add_day(df):
        # Add day column to dataframe
        df["day"] = df['Datetime'].dt.strftime("%d")
        return df

    def add_hour(df):
        # Add hour column to dataframe
        df["hour"] = df['Datetime'].dt.strftime("%h")
        return df

    if period == 'hourly':
        df = add_hour(
            add_day(
            add_common(df)))

    elif period == 'daily':
        df = add_day(
            add_common(df))
    elif period == 'monthly':
        df = add_common(df)
    
    return df 

#### Unused code
    # Process data
    # echo -ne 'Processing dairly data...\r'
    # python scripts/process_data_daily.py
    # echo -ne 'Processing hourly data...\r'
    # python scripts/process_data_hourly.py

    # Delete unprocessed data
    # rm -r data_daily data_hourly

    # %% process data hourly
    # Reading station metadata

    # Delete irrelevant columns from dataframe (i.e. null columns and flag columns)
    # null_columns = df_hourly.columns[df_hourly.isnull().all()]
    # flag_columns = df_hourly.filter(regex="flag$").columns
    # print(null_columns)
    # print(flag_columns)

    #df_hourly = df_hourly.drop(
    #    columns=[
    #           'gps_geounit', 
    #           'batt_v_ini', 
    #           'freq_vw',
    #    ]
    #)


In [46]:
# code to ensure path points to current directory to load local functions 
#import sys
#sys.path.insert(0, ".")
"""
These functions process the dataframe in question.
- Functions can be initiated with a dataframe from the previous step, or with a parquet file using directory='<path>'
- I have included both versions of the inititation functions for ease of use, choose on of the grayed out versions of each step
- Hourly, daily and monthly files can have added 'meta data' such as year, season, month, day, DayOfYear and DayOfCentury
"""
df = load_csv_files()

df_hourly = process_hourly_data(dataframe=df, add_metadata=True)
# df_hourly = process_hourly_data(directory="../data/new_promice/all_promice_data.parquet.gzip")

df_daily = process_daily_data(dataframe=df_hourly, add_meta_data=True)
# df_daily = process_daily_data(directory="../data/new_promice/all_promice_data_hourly.parquet.gzip", add_meta_data=True)

df_monthly = process_monthly_data(dataframe=df_daily, add_meta_data=True)
#df_monthly = process_monthly_data(directory='..\\Data\\new_promice\\all_promice_data_daily.parquet.gzip', add_meta_data=True)

### helper function to add station information such as # of booms, location and classification

# df_hourly = add_station_info(df_hourly)
# df_daily = add_station_info(df_daily)
# df_monthly = add_station_info(df_monthly)

array(['CEN1', 'CEN2', 'CP1', 'DY2', 'EGP', 'HUM', 'JAR', 'JAR_O',
       'KAN_B', 'KAN_L', 'KAN_M', 'KAN_U', 'KPC_Lv3', 'KPC_L', 'KPC_Uv3',
       'KPC_U', 'LYN_L', 'LYN_T', 'MIT', 'NAE', 'NAU', 'NEM', 'NSE',
       'NUK_K', 'NUK_L', 'NUK_N', 'NUK_Uv3', 'NUK_U', 'QAS_A', 'QAS_Lv3',
       'QAS_L', 'QAS_M', 'QAS_Uv3', 'QAS_U', 'Roof_GEUS', 'Roof_PROMICE',
       'SCO_L', 'SCO_U', 'SDL', 'SDM', 'SWC', 'SWC_O', 'TAS_A', 'TAS_L',
       'TAS_U', 'THU_L2', 'THU_L', 'THU_U2', 'THU_U', 'TUN', 'UPE_L',
       'UPE_U', 'UWN', 'WEG_B', 'ZAK_L', 'ZAK_Uv3', 'ZAK_U'], dtype=object)

['stid', 'time', 'p_u', 't_u', 'rh_u', 'rh_u_cor', 'qh_u', 'wspd_u', 'wdir_u', 'dsr', 'dsr_cor', 'usr', 'usr_cor', 'albedo', 'dlr', 'ulr', 'cc', 't_surf', 'dlhf_u', 'dshf_u', 'z_boom_u', 'z_stake', 'z_pt', 'z_pt_cor', 'precip_u', 'precip_u_cor', 't_i_1', 't_i_2', 't_i_3', 't_i_4', 't_i_5', 't_i_6', 't_i_7', 't_i_8', 'tilt_x', 'tilt_y', 'rot', 'gps_lat', 'gps_lon', 'gps_alt', 'gps_time', 'gps_geoid', 'gps_geounit', 'gps_hdop', 'gps_numsat', 'gps_q', 'batt_v', 'batt_v_ini', 'batt_v_ss', 'fan_dc_u', 'freq_vw', 't_log', 't_rad', 'msg_lat', 'msg_lon', 'z_surf_1', 'z_surf_2', 'z_surf_1_adj_flag', 'z_surf_2_adj_flag', 'z_surf_combined', 'depth_t_i_1', 'depth_t_i_2', 'depth_t_i_3', 'depth_t_i_4', 'depth_t_i_5', 'depth_t_i_6', 'depth_t_i_7', 'depth_t_i_8', 't_i_10m', 'p_l', 't_l', 'rh_l', 'rh_l_cor', 'qh_l', 'wspd_l', 'wdir_l', 'dlhf_l', 'dshf_l', 'z_boom_l', 'precip_l', 'precip_l_cor', 't_i_9', 't_i_10', 't_i_11', 'fan_dc_l', 'depth_t_i_9', 'depth_t_i_10', 'depth_t_i_11', 'z_pt_cor_adj_flag', 

In [40]:
null_columns = df_hourly.columns[df_hourly.isnull().all()]
flag_columns = df_hourly.filter(regex="flag$").columns
print("null columns:\n", null_columns)
print("flag_columns:\n", flag_columns)

null columns:
 Index(['gps_geounit', 'batt_v_ini', 'freq_vw'], dtype='object')
flag_columns:
 Index(['z_surf_1_adj_flag', 'z_surf_2_adj_flag', 'z_pt_cor_adj_flag'], dtype='object')


In [47]:
df_monthly.info

<bound method DataFrame.info of          stid month_year         p_u        t_u       rh_u   rh_u_cor  \
0        CEN1    2017-05         NaN        NaN        NaN        NaN   
1        CEN1    2017-06         NaN        NaN        NaN        NaN   
2        CEN1    2017-07         NaN        NaN        NaN        NaN   
3        CEN1    2017-08  799.124410  -7.917199  88.455853  94.813997   
4        CEN1    2017-09  793.052183 -16.824626  84.394839  98.425428   
...       ...        ...         ...        ...        ...        ...   
8195  ZAK_Uv3    2022-08  911.193593   1.671179  70.281977  70.517806   
8196  ZAK_Uv3    2022-09  910.964335  -1.641274  68.437326  70.609115   
8197  ZAK_Uv3    2022-10  905.758516  -8.781081  62.708000  68.377748   
8198  ZAK_Uv3    2022-11  906.854591  -7.709388  57.323370  61.688199   
8199  ZAK_Uv3    2022-12         NaN        NaN        NaN        NaN   

          qh_u    wspd_u      wdir_u         dsr  ...  z_pt_cor_adj  \
0          NaN      