# Combining all FD events and their Composites into NetCDF Files

In [2]:
from datetime import datetime
import geemap.colormaps as cm
import geemap
import ee
import wxee
import geopandas as gpd
import pandas as pd
import zlib
import numpy as np
import xarray as xr
from mpi4py import MPI
from dask.distributed import Client
# geemap.ee_initialize()

In [3]:
# workdir = '/content/drive/MyDrive/Flash_Drought/'
workdir = './'
latlon = pd.read_csv(workdir+'LONLAT.csv').drop(columns=['Unnamed: 0'])
latlon.head()

print(latlon.LON.max()-latlon.LON.min())
print(latlon.LAT.max())
print(latlon.LAT.min())

359.75
89.875
-59.875


In [4]:
nested_dict = {'dict1': {'key_A': 'value_A'}, 'dict2': {'key_B': 'value_B'}}

In [15]:
import timeit
import os, psutil
from xarray import Dataset, Variable

SV_Dict = {1: ['fstdate1', 'lstdate1', 'SV1'], 
           2: ['fstdate2', 'lstdate2', 'SV2'], 
           3: ['fstdate3', 'lstdate3', 'SV3'], 
           4: ['fstdate4', 'lstdate4', 'SV4'], 
           5: ['fstdate5', 'lstdate5', 'SV5'], 
           6: ['fstdate6', 'lstdate6', 'SV6']}


type_Dict = {'fstdate1' : str, 'lstdate1' : str, 'SV1' : np.float64,
             'fstdate2' : str, 'lstdate2' : str, 'SV2' : np.float64,
             'fstdate3' : str, 'lstdate3' : str, 'SV3' : np.float64,
             'fstdate4' : str, 'lstdate4' : str, 'SV4' : np.float64,
             'fstdate5' : str, 'lstdate5' : str, 'SV5' : np.float64, 
             'fstdate6' : str, 'lstdate6' : str, 'SV6' : np.float64}


var_attrs = {'LON': {'standard_name': 'longitude', 'long_name': 'longitude', 'units' : 'degrees_east', 'axis': 'X', 'coordinates' : 'lon lat' },\
            'LAT': {'standard_name': 'latitude', 'long_name' : 'latitude', 'units' : 'degrees_north', 'axis': 'Y', 'coordinates' : 'lon lat'},\
            'fstdate': {'standard_name': 'Date of onset of event', 'long_name' : 'Date of onset of event',\
                         'units' : 'day', 'coordinates' : 'lon lat'},\
            'lstdate': {'standard_name': 'Date of recovery', 'long_name' : 'Date of recovery (end of rapid intensifiaction) of event'\
                         , 'units' : 'day', 'coordinates' : 'lon lat'},\
             'SV': {'standard_name': 'Severity of event', 'long_name' : 'Severity of event', 'units' : '[]', 'coordinates' : 'lon lat'},\
             'VEGID': {'standard_name': 'Landcover class', 'long_name' : 'Landcover class number according to GLDAS dominant vegetation type in CLSMF2.5.',\
                        'units' : '[]', 'coordinates' : 'lon lat'},\
             'ARAIN' : {'standard_name': 'Total liquid precipitation', 'long_name': 'Total liquid precipitation (normalized). ', \
                        'units' : '[]', 'coordinates' : 'lon lat', 'source':  "ERA5 daily derived standard anomalies"},\
             'EVP' : {'standard_name': 'Actual evapotranspiration', 'long_name': 'Actual evapotranspiration (normalized). ',\
                      'units' : '[]', 'coordinates' : 'lon lat', 'source':  "ERA5 daily derived standard anomalies"},\
             'PEVPR' : {'standard_name': 'Potential evapotranspiration', 'long_name': 'Potential evapotranspiration (normalized). ',\
                        'units' : '[]', 'coordinates' : 'lon lat', 'source':  "ERA5 daily derived standard anomalies"},\
             'PRES' : {'standard_name': 'Surface pressure', 'long_name': 'Surface pressure (normalized). ',\
                       'units' : '[]', 'coordinates' : 'lon lat', 'source':  "ERA5 daily derived standard anomalies"},\
             'RZSM' : {'standard_name': 'Root-zone Soil moisture', 'long_name' : 'Root-zone Soil moisture (normalized). ',\
                       'units' : '[]', 'coordinates' : 'lon lat', 'source':  "GLDAS daily derived standard anomalies"},\
             'TMP' : {'standard_name': '2-m above ground temperature', 'long_name' : '2-m above ground temperature (normalized). ',\
                      'units' : '[]', 'coordinates' : 'lon lat', 'source':  "ERA5 daily derived standard anomalies"},\
             'VPD' : {'standard_name': 'Vapor pressure deficit', 'long_name': 'Vapor pressure deficit (normalized). ',\
                      'units' : '[]', 'coordinates' : 'lon lat', 'source':  "ERA5 daily derived standard anomalies"},\
             'WS' : {'standard_name': '10-m above ground wind speed', 'long_name': '10-m above ground wind speed (normalized). ',\
                     'units' : '[]', 'coordinates' : 'lon lat', 'source':  "ERA5 daily derived standard anomalies"}}                   



###########################################################################################3
FD_Years = list(range(2020,2022,1))
FD_Events_Path = workdir+'FD_Events/'
GLDAS_Daily_RZSM_STD_Path = '/glade/work/mahdad1/GLDAS/YEARS/GLDAS_CLSM025_ANOM_STD_'
Composites_Path = workdir+'Composites/'


for year in FD_Years:
    start = timeit.default_timer()


    list_datasets = []
    print('Working on year '+str(year)+'!!!\n')
    # Loading the csv for the year
    date_cols = ['fstdate1' ,'lstdate1','fstdate2', 'lstdate2','fstdate3', 'lstdate3',\
             'fstdate4', 'lstdate4','fstdate5', 'lstdate5', 'fstdate6', 'lstdate6']
    df = pd.read_csv(FD_Events_Path+'SMVI_GLDAS_Summ_table_SMVI_'+str(year)+'_n864000.csv',\
                     dtype = type_Dict, parse_dates=date_cols).drop(columns=['Unnamed: 0']).dropna(thresh=3)
    # date_cols = [col for col in df.columns if 'fstdate' in col or 'lstdate' in col]
    df = df.join(latlon, how='inner', validate='1:1')
    # Convert date columns to DOY
    df[date_cols] = df[date_cols].apply(lambda x: x.dt.dayofyear)
    
    # df = df.join(latlon, how='inner', validate='1:1').reset_index()
    print('FD_events for year '+ str(year) +' is loaded into a Pandas df. \n')
    entire_row_numbers = df.shape[0]
    print(entire_row_numbers)
    cols_dict = {}


    # Creating the date_range for each year to create the DataArray
    start_date = datetime.strptime(str(year)+'-01-01', '%Y-%m-%d')
    end_date = datetime.strptime(str(year)+'-12-31', '%Y-%m-%d')

    # Data variables initialization
    # Grid matching GLDAS resolution
    dims=('lat', 'lon')

    lons = latlon['LON'].unique()
    lats = latlon['LAT'].unique()
    lons.sort()
    lats.sort()

    data_vars = {}
    for col in df.columns:
        for var_attr in var_attrs:
            if var_attr in col:  
                varattrs=var_attrs[var_attr]

                continue
        # data_vars = {col: (['lat', 'lon'], np.full((len(lats), len(lons)), np.nan))}
        data_vars[col] = Variable(dims=dims, data=np.full((len(lats), len(lons)), np.nan), attrs=varattrs)
    coords={'lat': ('lat', np.arange(-59.875, 90.125, 0.25),{'type': 'geodetic'}),\
            'lon': ('lon', np.arange(-179.875, 180.125, 0.25),{'prime_meridian': 'greenwich'})}
    attrs = {
        'Conventions': 'CF-1.6, ACDD-1.3',
        'title': 'Global SMVI events dataset',
        'institution': 'Johns Hopkins University',
        'source': '',
        'history': f'Created {datetime.now().isoformat()}',
        'references': 'Osman, M. et al. Flash drought onset over the contiguous United States:\
        sensitivity of inventories and trends to quantitative definitions. Hydrol. Earth Syst. \
        Sci. 25, 565-581 (2021); Osman, M. et al. Diagnostic Classification of Flash Drought Events\
        Reveals Distinct Classes of Forcings and Impacts. J. Hydrometeorol. 23, 275â€“289 (2022).',
        'comment': 'This file contains SMVI event data mapped to GLDAS grid.'}


    print('\tFiles are added and joined.\n\tDatarrays are created!')

    # Mapping events to grid points
    for index, row in df.iterrows():
        if index%50000 == 0:
            print('\t'+str(index))
        lon, lat = row.LON, row.LAT
        lat_idx = np.where(lats == lat)[0][0]
        lon_idx = np.where(lons == lon)[0][0]
        for col in data_vars.keys():
            # data_vars[col][1][lat_idx, lon_idx] = row[col]
            data_vars[col][lat_idx, lon_idx] = row[col]
    
    # Create xarray Dataset with CF and ACDD attributes
    ds = xr.Dataset(data_vars=data_vars, coords=coords, attrs=attrs)
    
    # Compression settings
    compression_opts = {
        'zlib': True,
        'complevel': 5,  # Compression level (1-9, where 9 is the highest compression)
        'chunksizes': (100, 100),  # Optional: set chunk sizes for better compression
    }
    
    # Apply compression settings to all variables
    encoding = {var: compression_opts for var in ds.data_vars}
    
    # Define output directory and save NetCDF
    output_dir = 'NETCDF_OUT/'
    os.makedirs(output_dir, exist_ok=True)
    netcdf_path = './NetCDF_Files/FD_Events_'+str(year)+'.nc'
    ds.to_netcdf(netcdf_path, format='NETCDF4', encoding=encoding)
    
    print('FD NetCDF file created successfully!')
    del(ds)

    
    
    Composite_Vars_Dict = {'ARAIN' : {}, 'EVP' : {}, 'PEVPR' : {}, 'PRES' : {}, 'RZSM' : {}, 'TMP' : {}, 'VPD' : {}, 'WS' : {}}
    Pentad_Dict = {'On01': ' 1-pentad before Onset date.', 'On02': ' 2-pentads before Onset date.', 'On03': ' 3-pentads before Onset date.', 
                   'On00': ' at Onset date.','Re00': ' at Recovery date.', 'Re01': ' 1-pentad after Recovery date.', 'On3Pn': ' 3-pentads back from Onset date.'}

    
    for var in Composite_Vars_Dict:   
        data_comp_vars = {}
        for event in SV_Dict:
            
            var_file_path = Composites_Path+'SMVI_GLDAS_E'+str(event)+'_'+var+'_'+str(year)+'.csv'
            # temp = pd.read_csv(var_file_path, usecols = Pentad_List).drop(columns=['Unnamed: 0'])
            temp = pd.read_csv(var_file_path, usecols = list(Pentad_Dict.keys()))
            cols_dict = {s: var  + str(event)+ '_' + s for s in temp.columns}
            temp.rename(columns=cols_dict, errors='raise', inplace=True)
            if event == 1:
                temp = df[['LAT','LON']].join(temp, how='inner', validate='1:1')
                df_var = temp
            else:
                df_var = pd.concat([df_var, temp], axis=1, join='inner')
            del(temp)

        for col in df_var.columns:
            for var_attr in var_attrs:
                if var_attr in col:  
                    varattrs=var_attrs[var_attr].copy()
                    event_number = col.split("_")[0].split(var_attr)[1]
                    varattrs['long_name'] = varattrs['long_name'] +'Event'+event_number+' '+var_attr+'. '
                    continue
            for pentad in Pentad_Dict.keys():
                if pentad in col:
                    if 'On3Pn' == pentad:
                        varattrs['long_name'] = varattrs['long_name']+'The 15-days average (3-pentads) anamoly'+Pentad_Dict[pentad]
                    else:
                        varattrs['long_name'] = varattrs['long_name']+'The 5-days average (1-pentads) anamoly'+Pentad_Dict[pentad]
                    continue
            data_comp_vars[col] = Variable(dims=dims, data=np.full((len(lats), len(lons)), np.nan), attrs=varattrs)
            del(varattrs)
        # Mapping events to grid points
        for index, row in df_var.iterrows():
            lon, lat = row.LON, row.LAT
            lat_idx = np.where(lats == lat)[0][0]
            lon_idx = np.where(lons == lon)[0][0]
            for col in data_comp_vars.keys():
                # data_comp_vars[col][1][lat_idx, lon_idx] = row[col]
                data_comp_vars[col][lat_idx, lon_idx] = row[col]
        
        # Create xarray Dataset with CF and ACDD attributes
        ds = xr.Dataset(data_vars=data_comp_vars, coords=coords, attrs=attrs)
        
        # Compression settings
        compression_opts = {
            'zlib': True,
            'complevel': 5,  # Compression level (1-9, where 9 is the highest compression)
            'chunksizes': (100, 100),  # Optional: set chunk sizes for better compression
        }
        
        # Apply compression settings to all variables
        encoding = {var: compression_opts for var in ds.data_vars}
        
        # Define output directory and save NetCDF
        output_dir = 'NETCDF_OUT/'
        os.makedirs(output_dir, exist_ok=True)
        netcdf_path = './NetCDF_Files/'+var+'_'+str(year)+'.nc'
        ds.to_netcdf(netcdf_path, format='NETCDF4', encoding=encoding)
        print('\t\t'+var+' NetCDF file created successfully')
        del(ds)
        del(df_var)


        
    end = timeit.default_timer()
    hours, rem = divmod(end-start, 3600)
    minutes, seconds = divmod(end-start, 60)
    print('\n\t\tAdding year '+str(year)+' takes ' +'{:0>2}:{:0>2}:{:05.2f}'.format(int(hours),int(minutes),seconds)) 
    print('\t\tMemory used so far:') 
    print('\t\t\t'+str(psutil.Process(os.getpid()).memory_info().rss / 1024 ** 2))
    print('\n\n\t'+str(year)+' is Finished!\n\n')   


Working on year 2020!!!

FD_events for year 2020 is loaded into a Pandas df. 

44559
	Files are added and joined.
	Datarrays are created!
	500000
FD NetCDF file created successfully!
		ARAIN NetCDF file created successfully
		EVP NetCDF file created successfully
		PEVPR NetCDF file created successfully
		PRES NetCDF file created successfully
		RZSM NetCDF file created successfully
		TMP NetCDF file created successfully
		VPD NetCDF file created successfully
		WS NetCDF file created successfully

		Adding year 2020 takes 00:07:29.94
		Memory used so far:
			1186.24609375


	2020 is Finished!


Working on year 2021!!!

FD_events for year 2021 is loaded into a Pandas df. 

40210
	Files are added and joined.
	Datarrays are created!
	500000
FD NetCDF file created successfully!
		ARAIN NetCDF file created successfully
		EVP NetCDF file created successfully
		PEVPR NetCDF file created successfully
		PRES NetCDF file created successfully
		RZSM NetCDF file created successfully
		TMP NetCDF fil

In [12]:
rows_to_check =100
year = 1992
FD_DA_Path = 'NetCDF_Files/FD_Events_'+str(year)+'.nc'
FD_DF_Path = './FD_Events/SMVI_GLDAS_Summ_table_SMVI_'+str(year)+'_n864000.csv'
Composites_Path = workdir+'Composites/'

FD_DA = xr.open_dataset(FD_DA_Path)
FD_DF = pd.read_csv(FD_DF_Path).drop(columns=['Unnamed: 0']).dropna(thresh=3)
FD_DF = FD_DF.join(latlon, how='inner', validate="1:1")
FD_DF_Head = FD_DF.head(rows_to_check)
FD_DF_Head

SV_Dict = {1: 'SV1', 
           2: 'SV2', 
           3: 'SV3', 
           4: 'SV4', 
           5: 'SV5', 
           6: 'SV6'}

Composite_Vars_DF_Dict = {'ARAIN' : {}, 'EVP' : {}, 'PEVPR' : {}, 'PRES' : {}, 'RZSM' : {}, 'TMP' : {}, 'VPD' : {}, 'WS' : {}}
Composite_Vars_DA_Dict = {'ARAIN' : [], 'EVP' : [], 'PEVPR' : [], 'PRES' : [], 'RZSM' : [], 'TMP' : [], 'VPD' : [], 'WS' : []}
# Pentad_Dict = {'On01' : [], 'On02' : [], 'On03' : [], 'On00' : [], 'Re00' : [], 'Re01' : [], 'On3Pn' : []}
Pentad_List = ['On01', 'On02', 'On03', 'On00', 'Re00', 'Re01', 'On3Pn']

for var in Composite_Vars_DA_Dict:
    VAR_DA_Path = 'NetCDF_Files/'+var+'_'+str(year)+'.nc'
    Composite_Vars_DA_Dict[var] = xr.open_dataset(VAR_DA_Path)
    for event in range(1,7):
        var_df_file_path = Composites_Path+'SMVI_GLDAS_E'+str(event)+'_'+var+'_'+str(year)+'.csv'
        temp = pd.read_csv(var_df_file_path).drop(columns=['Unnamed: 0'])
        Composite_Vars_DF_Dict[var][var+str(event)] = FD_DF_Head[['LAT','LON']].join(temp, how='inner', validate="1:1")
        
count = 0
iter_count = 0
Wrong_Index_Items = {}

for index, row in FD_DF_Head.iterrows():
    
    lon, lat = row.LON, row.LAT
    DA_VEG_Val = FD_DA.sel(lat=lat, lon=lon)[SV_event].values
    DF_VEG_Val = row[SV_event]
    if np.isnan(DA_VEG_Val) and np.isnan(DF_VEG_Val):
        continue
    elif DA_SV_Val != DF_SV_Val:
        count += 1
        Wrong_Index_Items[index] = ['VEGID', {'DF_SV_Val' : DF_SV_Val}, {'DA_SV_Val' : DA_SV_Val}]
    
    for event in SV_Dict:
        SV_event = SV_Dict[event]
        
        DA_SV_Val = FD_1990_DA.sel(lat=lat, lon=lon)[SV_event].values
        DF_SV_Val = row[SV_event]
        if np.isnan(DF_SV_Val) and np.isnan(DA_SV_Val):
            continue
        elif DA_SV_Val != DF_SV_Val:
            count += 1
            Wrong_Index_Items[index] = ['VEGID', {'DF_'+SV_event+'_Val' : DF_SV_Val}, {'DA_'+SV_event+'_Val' : DA_SV_Val}]
if count == 0:
    print('There was no mismatch in FD_Events!')

FileNotFoundError: [Errno 2] No such file or directory: '/glade/u/home/mahdad1/Jupyter_Projects/Flash_Drought/NetCDF_Files/FD_Events_1992.nc'

In [80]:
iter_count = 0
count = 0
Wrong_Index_Composites = {}
for index, row in FD_DF_Head.iterrows():
    for var in Composite_Vars_DF_Dict:
        # print('Working on '+var)
        for event in range(1,7):
         
            df_var = Composite_Vars_DF_Dict[var][var+str(event)].loc[index] 
            lon, lat = row.LON, row.LAT
            for pentad in Pentad_List:
                var_event_pentad = var+str(event)+'_'+pentad
                da_var = Composite_Vars_DA_Dict[var]
                var_df_val = df_var[pentad]
                var_da_val = da_var.sel(lat=lat, lon=lon)[var_event_pentad].values
        
            if np.isnan(var_da_val) and np.isnan(var_df_val):
                continue
            elif var_df_val != var_da_val:
                count += 1
                Wrong_Index_Composites[index] = [var_event_pentad, {'DF_'+var_event_pentad+'_Val' : var_df_val}, {'DA_'+var_event_pentad+'_Val' : var_da_val}]
        # if count == 0:
        #     print('There was no mismatch in '+var)
        # else:
        #     print(var+' had '+str(count)+' mismatches!')
    iter_count += 1
    if iter_count %5 == 0:
        print(str(iter_count)+' number of rows out of '+str(float(FD_DF_Head.shape[0]))+' are finished!')
if count == 0:
    print('There was no mismatch in Composites!')

5 number of rows out of 100.0 are finished!
10 number of rows out of 100.0 are finished!
15 number of rows out of 100.0 are finished!
20 number of rows out of 100.0 are finished!
25 number of rows out of 100.0 are finished!
30 number of rows out of 100.0 are finished!
35 number of rows out of 100.0 are finished!
40 number of rows out of 100.0 are finished!
45 number of rows out of 100.0 are finished!
50 number of rows out of 100.0 are finished!
55 number of rows out of 100.0 are finished!
60 number of rows out of 100.0 are finished!
65 number of rows out of 100.0 are finished!
70 number of rows out of 100.0 are finished!
75 number of rows out of 100.0 are finished!
80 number of rows out of 100.0 are finished!
85 number of rows out of 100.0 are finished!
90 number of rows out of 100.0 are finished!
95 number of rows out of 100.0 are finished!
100 number of rows out of 100.0 are finished!
There was no mismatch in Composites!


In [68]:
count = 0
for key in Wrong_Index_Composites.keys():
    if 'On3Pn' in Wrong_Index_Composites[key]:
        continue
    else:
        count +=1
print(count)

99
