In [None]:
from pymcs.io import L2Reader
from pathlib import Path
from tqdm import tqdm_notebook as tqdm
import pandas as pd

In [None]:
def get_list_of_hourfolders(daystring):
    root = Path('/cabeus/data/mcs/level2')
    dayfolder = root / daystring[:4]
    return list(dayfolder.glob(f'{daystring}*'))

get_list_of_hourfolders('080301')

def get_hour_subfiles(subfolder='080301000000'):
    base = Path('/cabeus/data/mcs/level2')
    product = 'post2d_v*'
    folder = base / subfolder[:4] / subfolder
    try:
        folder = list(folder.glob('post2d_v*'))[0]
    except IndexError:
        return None
    filelist = sorted(list(folder.glob('*.out')))
    return filelist

get_hour_subfiles()[:5]

def convert_4hfiles_to_df(subfolder, write=False):
    filelist = get_hour_subfiles(subfolder)
    if filelist is None:
        return pd.DataFrame()
    bucket = []
    for f in filelist:
        l2 = L2Reader(f)
        bucket.append(l2.header.to_frame().T)
    df = pd.concat(bucket)
    if write:
        df.to_parquet(subfolder.parent / f'{subfolder}.parquet')
    return df

def cols_to_numeric(df):
    for col in df.columns:
        df[col] = pd.to_numeric(df[col], errors='ignore')

def convert_dayfiles_to_df(daystring, write=False):
    hourfolders = get_list_of_hourfolders(daystring)
    bucket = []
    for folder in tqdm(hourfolders):
        bucket.append(convert_4hfiles_to_df(folder.name))
    df = pd.concat(bucket)
    cols_to_numeric(df)
    if write:
        df.to_parquet(folder.parent / f'{daystring}.parquet')
    return df

def convert_month_to_df(month, write=True):
    root = Path('/cabeus/data/mcs/level2')
    base = root / month
    folders = [item for item in base.glob('*') if item.is_dir()]
    savename = folders[0].parent / f'{month}.parquet'
    if savename.exists():
        return pd.read_parquet(savename)
    bucket = []
    for folder in tqdm(folders):
        bucket.append(convert_4hfiles_to_df(folder.name))
    df = pd.concat(bucket)
    cols_to_numeric(df)
    if write:
        df.to_parquet(folder.parent / f'{month}_header.parquet')
    return len(df)

from dask.distributed import Client, progress
client = Client()
client

In [None]:
base = Path('/cabeus/data/mcs/level2')
months = [p.name for p in list(base.glob('*'))]

In [None]:
months[:5]

In [None]:
lazy_results = []

In [None]:
import dask

In [None]:
for month in months:
    lazy_result = dask.delayed(convert_month_to_df)(month)
    lazy_results.append(lazy_result)

In [None]:
lazy_results[0]

In [None]:
dask.compute(*lazy_results)

## combine month parquets

In [84]:
!pwd

/home/klay6683/src/pymcs/notebooks
