# Convert Mayfly CSV files to parquet

In [1]:
import datetime
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

In [2]:
# https://monitormywatershed.org/sites/GMI_CTD8/
# etc...

stations = ['GMI_CTD1', 'GMI_CTD3', 'GMI_CTD5', 'GMI_CTD6', 'GMI_CTD7', 'GMI_CTD8']

In [3]:
files = ['GMI_CTD1_Maxim_DS3231_Temp_7254.csv',
'GMI_CTD1_Meter_Hydros21_Cond_7251.csv',
'GMI_CTD1_Meter_Hydros21_Depth_7252.csv',
'GMI_CTD1_Meter_Hydros21_Temp_7253.csv',
'GMI_CTD1_Sensirion_SHT40_Humidity_7256.csv',
'GMI_CTD3_Maxim_DS3231_Temp_7416.csv',
'GMI_CTD3_Meter_Hydros21_Cond_7413.csv',
'GMI_CTD3_Meter_Hydros21_Depth_7414.csv',
'GMI_CTD3_Meter_Hydros21_Temp_7415.csv',
'GMI_CTD3_Sensirion_SHT40_Humidity_7418.csv',
'GMI_CTD5_Maxim_DS3231_Temp_7976.csv',
'GMI_CTD5_Meter_Hydros21_Cond_7973.csv',
'GMI_CTD5_Meter_Hydros21_Depth_7974.csv',
'GMI_CTD5_Meter_Hydros21_Temp_7975.csv',
'GMI_CTD5_Sensirion_SHT40_Humidity_7978.csv',
'GMI_CTD6_Maxim_DS3231_Temp_7987.csv',
'GMI_CTD6_Meter_Hydros21_Cond_7984.csv',
'GMI_CTD6_Meter_Hydros21_Depth_7985.csv',
'GMI_CTD6_Meter_Hydros21_Temp_7986.csv',
'GMI_CTD6_Sensirion_SHT40_Humidity_7993.csv',
'GMI_CTD7_Maxim_DS3231_Temp_7998.csv',
'GMI_CTD7_Meter_Hydros21_Cond_7995.csv',
'GMI_CTD7_Meter_Hydros21_Depth_7996.csv',
'GMI_CTD7_Meter_Hydros21_Temp_7997.csv',
'GMI_CTD7_Sensirion_SHT40_Humidity_8000.csv',
'GMI_CTD8_Maxim_DS3231_Temp_8005.csv',
'GMI_CTD8_Meter_Hydros21_Cond_8002.csv',
'GMI_CTD8_Meter_Hydros21_Depth_8003.csv',
'GMI_CTD8_Meter_Hydros21_Temp_8004.csv',
'GMI_CTD8_Sensirion_SHT40_Humidity_8007.csv']

In [4]:
srcdir = 'data/src/mayfly'
dstdir = 'data/dst'

In [5]:
def get_text_header(fn):
    skiprows = 0
    text = ''
    columns = None
    
    with open(fn) as f:
        line = f.readline()
        while line[0] == '#' or line.startswith('DateTime') == False:
            line = f.readline()
            if line.startswith('DateTime'):
                columns = line.rstrip().strip().split(',')
            else:
                text += line
            skiprows += 1

    return skiprows, text, columns

In [6]:
def read_mayfly(fn, skiprows = 0):
    df = pd.read_csv(fn, skiprows=skiprows, parse_dates=True, na_values=-9999.0)

    df.rename(columns={'DateTime': '_DateTimeLocal_', 'DateTimeUTC': '_DateTimeUTC_'}, inplace=True)

    # assuming this is truly UTC
    df['DateTimeUTC'] = pd.to_datetime(df['_DateTimeUTC_'], utc=True)

    # apply the indicated offset to get local time
    df['DateTimeLocal'] = pd.to_datetime(df['_DateTimeLocal_'])
    df['DateTimeLocal'] = df.apply(
        lambda x:x['DateTimeLocal'].tz_localize(
            datetime.timezone(
                datetime.timedelta(hours=x['TimeOffset'])
            )
        ), axis=1)

    return df

In [7]:
def concat_mayfly(files, srcdir='.'):

    df_list = []

    for file in files:
        path = f'{srcdir}/{file}'
        skiprows, text, columns = get_text_header(path)

        for column in columns:
            if column in file:
                break
        print(f'    Reading series: {column} from {path}')
        df = read_mayfly(path, skiprows = skiprows)
        df.set_index(df['DateTimeUTC'], inplace=True)
        df.index.rename('DateTime', inplace=True)
        df_list.append(df[column])

    df = pd.concat(df_list, axis=1)
    df = df.asfreq(freq='5min')
    # df = df.resample('5min').first()
    
    return df

In [8]:
for station in stations:

    station_files = [file for file in files if station in file]
    print(f'Processing {station}')

    station_df = concat_mayfly(station_files, srcdir=srcdir)
    station_df.to_parquet(f'{dstdir}/mayfly_{station}.parquet', index=True)

Processing GMI_CTD1
    Reading series: Maxim_DS3231_Temp from data/src/mayfly/GMI_CTD1_Maxim_DS3231_Temp_7254.csv
    Reading series: Meter_Hydros21_Cond from data/src/mayfly/GMI_CTD1_Meter_Hydros21_Cond_7251.csv
    Reading series: Meter_Hydros21_Depth from data/src/mayfly/GMI_CTD1_Meter_Hydros21_Depth_7252.csv
    Reading series: Meter_Hydros21_Temp from data/src/mayfly/GMI_CTD1_Meter_Hydros21_Temp_7253.csv
    Reading series: Sensirion_SHT40_Humidity from data/src/mayfly/GMI_CTD1_Sensirion_SHT40_Humidity_7256.csv
Processing GMI_CTD3
    Reading series: Maxim_DS3231_Temp from data/src/mayfly/GMI_CTD3_Maxim_DS3231_Temp_7416.csv
    Reading series: Meter_Hydros21_Cond from data/src/mayfly/GMI_CTD3_Meter_Hydros21_Cond_7413.csv
    Reading series: Meter_Hydros21_Depth from data/src/mayfly/GMI_CTD3_Meter_Hydros21_Depth_7414.csv
    Reading series: Meter_Hydros21_Temp from data/src/mayfly/GMI_CTD3_Meter_Hydros21_Temp_7415.csv
    Reading series: Sensirion_SHT40_Humidity from data/src/mayf

In [9]:
station_df.head()

Unnamed: 0_level_0,Maxim_DS3231_Temp,Meter_Hydros21_Cond,Meter_Hydros21_Depth,Meter_Hydros21_Temp,Sensirion_SHT40_Humidity
DateTime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2023-07-04 19:55:00+00:00,24.0,,,,49.95
2023-07-04 20:00:00+00:00,24.0,,,,49.96
2023-07-04 20:05:00+00:00,23.75,,,,51.38
2023-07-04 20:10:00+00:00,,,,,
2023-07-04 20:15:00+00:00,,,,,


In [10]:
station_df.index

DatetimeIndex(['2023-07-04 19:55:00+00:00', '2023-07-04 20:00:00+00:00',
               '2023-07-04 20:05:00+00:00', '2023-07-04 20:10:00+00:00',
               '2023-07-04 20:15:00+00:00', '2023-07-04 20:20:00+00:00',
               '2023-07-04 20:25:00+00:00', '2023-07-04 20:30:00+00:00',
               '2023-07-04 20:35:00+00:00', '2023-07-04 20:40:00+00:00',
               ...
               '2024-07-26 08:55:00+00:00', '2024-07-26 09:00:00+00:00',
               '2024-07-26 09:05:00+00:00', '2024-07-26 09:10:00+00:00',
               '2024-07-26 09:15:00+00:00', '2024-07-26 09:20:00+00:00',
               '2024-07-26 09:25:00+00:00', '2024-07-26 09:30:00+00:00',
               '2024-07-26 09:35:00+00:00', '2024-07-26 09:40:00+00:00'],
              dtype='datetime64[ns, UTC]', name='DateTime', length=111622, freq='5min')

In [11]:
station_df.index

DatetimeIndex(['2023-07-04 19:55:00+00:00', '2023-07-04 20:00:00+00:00',
               '2023-07-04 20:05:00+00:00', '2023-07-04 20:10:00+00:00',
               '2023-07-04 20:15:00+00:00', '2023-07-04 20:20:00+00:00',
               '2023-07-04 20:25:00+00:00', '2023-07-04 20:30:00+00:00',
               '2023-07-04 20:35:00+00:00', '2023-07-04 20:40:00+00:00',
               ...
               '2024-07-26 08:55:00+00:00', '2024-07-26 09:00:00+00:00',
               '2024-07-26 09:05:00+00:00', '2024-07-26 09:10:00+00:00',
               '2024-07-26 09:15:00+00:00', '2024-07-26 09:20:00+00:00',
               '2024-07-26 09:25:00+00:00', '2024-07-26 09:30:00+00:00',
               '2024-07-26 09:35:00+00:00', '2024-07-26 09:40:00+00:00'],
              dtype='datetime64[ns, UTC]', name='DateTime', length=111622, freq='5min')