In [1]:
import os
import pandas as pd
import geopandas as gpd
import xarray as xr
from tqdm import tqdm
from glob import glob
import re
import csv

# File where we will save processed data
parquet_file = '/work/pi_kandread_umass_edu/Cloud_Freq/data/grades_level2.parquet'

# Read in the log of files that have already been added to the parquet record
processed_log = "/work/pi_kandread_umass_edu/Cloud_Freq/data/MODIS/processed_log_level2.csv"
processed_files = []
if os.path.exists(processed_log):
    # Open the CSV file in read mode
    with open(processed_log, mode='r', newline='') as file:
        reader = csv.reader(file)
        # Read each row in the CSV file
        for row in reader:
            # Append the file name (which is the first and only element in each row) to the list
            processed_files.append(row[0])
            
modis_file_list = glob('/work/pi_kandread_umass_edu/Cloud_Freq/data/MODIS/by_discharge_level2/MERIT_MODIS*.csv')
era5_file_list = glob('/work/pi_kandread_umass_edu/Cloud_Freq/data/ERA5/by_discharge_level2/ERA5*.csv')

# Remove files that have already been processed from this list
modis_files = [x for x in modis_file_list if x not in processed_files]
print(len(modis_files))

print(len(era5_file_list))

modis_files

28
54


['/work/pi_kandread_umass_edu/Cloud_Freq/data/MODIS/by_discharge_level2/MERIT_MODIS_Basin56_1_stratified_discharge.csv',
 '/work/pi_kandread_umass_edu/Cloud_Freq/data/MODIS/by_discharge_level2/MERIT_MODIS_Basin57_1_stratified_discharge.csv',
 '/work/pi_kandread_umass_edu/Cloud_Freq/data/MODIS/by_discharge_level2/MERIT_MODIS_Basin61_1_stratified_discharge.csv',
 '/work/pi_kandread_umass_edu/Cloud_Freq/data/MODIS/by_discharge_level2/MERIT_MODIS_Basin61_2_stratified_discharge.csv',
 '/work/pi_kandread_umass_edu/Cloud_Freq/data/MODIS/by_discharge_level2/MERIT_MODIS_Basin62_1_stratified_discharge.csv',
 '/work/pi_kandread_umass_edu/Cloud_Freq/data/MODIS/by_discharge_level2/MERIT_MODIS_Basin62_2_stratified_discharge.csv',
 '/work/pi_kandread_umass_edu/Cloud_Freq/data/MODIS/by_discharge_level2/MERIT_MODIS_Basin63_1_stratified_discharge.csv',
 '/work/pi_kandread_umass_edu/Cloud_Freq/data/MODIS/by_discharge_level2/MERIT_MODIS_Basin64_1_stratified_discharge.csv',
 '/work/pi_kandread_umass_edu/Cl

In [2]:
time_slice = slice('2000-02-24', '2023-08-31') #Joint MODIS/GRADES availability
basin_id_pattern = r"Basin(\d+)_"

for modis_file in tqdm(modis_files):
    l2_basin = int(re.search(basin_id_pattern, modis_file).group(1))
    l1_basin = l2_basin//10
    
    # Read in the era5 files for this l2 basin
    era5_files = [x for x in era5_file_list if str(l2_basin) in x]
    
    if not era5_files:
        #Empty list
        continue
    
    df_list = [pd.read_csv(csv_file) for csv_file in era5_files]
    era5 = pd.concat(df_list, ignore_index=True)
    era5['date_string'] = (era5['system:index']
                                .str.split('_')
                                .str[1:]
                                .apply(lambda x: ''.join(x)))
    era5['date'] = pd.to_datetime(era5['date_string'], format='%Y%m%d')
    
    multi_index = pd.MultiIndex.from_arrays([era5.date, era5.COMID], names=['time', 'id'])
    era5 = era5.set_index(multi_index)['temperature_2m']

    # Read in the MODIS data
    modis = pd.read_csv(modis_file)
    modis['date_string'] = (modis['system:index']
                            .str.split('_')
                            .str[2:]
                            .apply(lambda x: ''.join(x)))
    modis['date'] = pd.to_datetime(modis['date_string'], format='%Y%m%d')

    multi_index = pd.MultiIndex.from_arrays([modis.date, modis.COMID], names=['time', 'id'])
    modis = modis.set_index(multi_index)['cloudMask']
    
    modis_id_list = list(set(modis.index.get_level_values('id')))
     
    # Read in GRADES data
    grades_file = f'/work/pi_kandread_umass_edu/Datasets/GRADES_hydroDL/output_pfaf_{l1_basin:02.0f}_1979_202308.nc'
    with xr.open_dataset(grades_file) as grades:
        selected_data = grades.sel(time=time_slice, rivid=modis_id_list)
    grades_df = selected_data.to_dataframe()
    grades_df = grades_df.rename(columns={'Qout':'Q','rivid':'id'})
    grades_df.index = grades_df.index.rename(['time','id'])

    tmp = modis.to_frame().join([era5, grades_df],how='left')
    
    if not os.path.exists(parquet_file):
        tmp.to_parquet(parquet_file, engine='fastparquet')
    else:
        tmp.to_parquet(parquet_file, engine='fastparquet', append=True)
        
    # Add the processed filename to our log
    with open(processed_log, mode='a', newline='') as log:
        writer = csv.writer(log)
        writer.writerow([modis_file])

print('Done!')

100%|██████████| 28/28 [25:12<00:00, 54.01s/it]  

Done!



