In [1]:
import gc, os
import pickle
import cfgrib
import pygrib

import pandas as pd
import numpy as np
import xarray as xr
import multiprocessing as mp
import matplotlib.pyplot as plt

from glob import glob
from datetime import datetime, timedelta
from functools import partial, reduce

os.environ['OMP_NUM_THREADS'] = '1'

In [2]:
def process_grids(init):
    
    if init.day == 1:
        print('\rProgress: %s'%init.strftime('%Y%m%d'), end='')
    
    valid = init + timedelta(hours=delay+interval)
    
    fhr_step = 3
    fhr_start = delay + fhr_step
    fhr_end = delay + interval
    fhrs = np.arange(fhr_start, fhr_end+fhr_step, fhr_step)
    
    init_dir = archive + '%s/models/gfs0p25/'%init.strftime('%Y%m%d')
    
    init_flist = sorted(np.hstack(
        [glob(init_dir + '*%s*f%03d*.grib2'%(init.strftime('%Y%m%d%H'), fhr)) 
         for fhr in fhrs]))
    
    try:
        returns = [ingest_gfs(f, grid_index=idx1d) for f in init_flist]
        returns = np.array([r for r in returns if r is not None], dtype=object)
        sfc, iso = returns[:, 0], returns[:, 1]

        iso = xr.concat(iso, dim='valid_time').drop('time').rename({'valid_time':'time'}).sortby('time')
        sfc = xr.concat(sfc, dim='valid_time').drop('time').rename({'valid_time':'time'}).sortby('time')

        iso['init'] = sfc['init'] = init
        iso['valid'] = sfc['valid'] = valid

        iso = iso.set_coords(['init', 'valid'])
        sfc = sfc.set_coords(['init', 'valid'])
    
    except:
        return None
    
    else:
        return sfc, iso

In [None]:
def ingest_gfs(f, grid_index):
    
    lon_idx, lat_idx = grid_index[0][0], grid_index[1][0] 
    
    datasets = cfgrib.open_datasets(f)

    keep_keys = ['tp', 'q', 't', 'u', 'v', 'absv', 'w', 'gh', 'r', 'd', 
                  'u10', 'v10', 'u100', 'v100', 't2m', 'd2m', 
                  'cape', 'prmsl', 'sp', 'orog', 'hpbl']

    #keep_keys = ['tp', 't', 'u', 'v', 'gh', 'r', 'orog']

    sfc, iso = [], []

    for ds in datasets:
        
        ds = ds.isel(latitude=lat_idx, longitude=lon_idx).load()

        key_match = np.array(list(ds.data_vars))[np.isin(list(ds.data_vars), keep_keys)]

        if len(key_match) > 0:

            dims = ds.dims.keys()
            coords = ds[key_match].coords

            if ('heightAboveGround' in coords) & ('heightAboveGround' not in dims):
                sfc.append(ds[key_match].drop('heightAboveGround'))

            elif 'isobaricInhPa' in coords:
                iso.append(ds[key_match])

            elif (('surface' in coords)|('meanSea' in coords)):
                sfc.append(ds[key_match])

            elif 'prmsl' in list(ds.data_vars):
                sfc.append(ds['prmsl'])

            else:
                pass

        else:
            pass

    try:
        sfc = xr.merge(sfc).drop('t')
        iso = xr.merge(iso).rename({'isobaricInhPa':'level'})
        iso = iso.sel(level=iso.level[::-1])

        sfc['longitude'] = sfc['longitude'] - 360
        iso['longitude'] = iso['longitude'] - 360
    
    except:
        return None
    
    else:
        return [sfc.drop(['surface', 'meanSea', 'step']), 
                iso.drop('step')]

In [None]:
# site, interval, delay = 'CLNX', 24, 24
# site_lat, site_lon = 40.5763, -111.6383

site, interval, delay = 'BSNFJE', 24, 24
site_lat, site_lon = 40.5763, -111.6383

In [None]:
model = 'gfs0p25'

temp = '/scratch/general/lustre/u1070830/binary_temp/'; os.makedirs(temp, exist_ok=True)
archive = '/uufs/chpc.utah.edu/common/home/steenburgh-group10/mewessler/archive/'

date_fmt = '%Y%m%d'
datetime_fmt = '%Y%m%d%H'

In [None]:
inits = pd.date_range(
    datetime(2015, 1, 15, 0, 0),
    datetime(2020, 5, 31, 23, 59),
    freq='12H')

inits = [init for init in inits 
         if ((init.month <= 5) | (init.month >= 10))]

In [None]:
gfs_sample = xr.open_dataset('./gfs_latlon_grid.nc')
gfs_sample['longitude'] = gfs_sample['longitude'] - 360
gfs_lat, gfs_lon = gfs_sample['latitude'], gfs_sample['longitude']

idx1d = (np.abs(gfs_lon - site_lon) + np.abs(gfs_lat - site_lat))
idx1d = np.where(idx1d == np.min(idx1d))

print(gfs_sample.isel(latitude=idx1d[1], longitude=idx1d[0]))

In [11]:
with mp.get_context('fork').Pool(256) as p:
    returns = p.map(process_grids, inits, chunksize=1)
    p.close()
    p.join()
    
returns = np.array([r for r in returns if r is not None], dtype=object)

Progress: 20200501

ValueError: 'u' is not present in all datasets.

In [12]:
sfc = xr.concat(returns[:, 0], dim='time')
iso = xr.concat(returns[:, 1], dim='time')

ValueError: 'u' is not present in all datasets.

In [13]:
sfc_keys = list(returns[0, 0].data_vars)
sfc_returns = [r for r in returns[:, 0] if list(r.data_vars) == sfc_keys]

In [14]:
sfc = xr.concat(sfc_returns, dim='time')

In [15]:
iso = xr.concat(returns[:, 1], dim='time')

In [16]:
u, v = iso['u'], iso['v']
wdir = 90 - np.degrees(np.arctan2(-v, -u))
wdir = xr.where(wdir <= 0, wdir+360, wdir)
wdir = xr.where(((u == 0) & (v == 0)), 0, wdir)

iso['dir'] = wdir
iso['spd'] = np.sqrt(u**2 + v**2)

for hgt in [10, 100]:

    u, v = sfc['u%d'%hgt], sfc['v%d'%hgt]
    wdir = 90 - np.degrees(np.arctan2(-v, -u))
    wdir = xr.where(wdir <= 0, wdir+360, wdir)
    wdir = xr.where(((u == 0) & (v == 0)), 0, wdir)

    sfc['dir%dm'%hgt] = wdir
    sfc['spd%dm'%hgt] = np.sqrt(u**2 + v**2)

In [None]:
# iso.to_netcdf('./%s_%dh_delay%d_extract_gfs_ISO.nc'%(site, interval, delay))
iso = xr.open_dataset('./%s_%dh_delay%d_extract_gfs_ISO.nc'%(site, interval, delay))

In [None]:
df = []
match_rename = {'absv':'vo', 'gh':'z', 'hpbl':'blh', 'prmsl':'msl', 'tp':'swe_mm',
               'u10':'u10m', 'v10':'v10m', 'u100':'u100m', 'v100':'v100m'}

# Loop over each variable in the xarray
for ds in [iso]:#, sfc.drop('orog')]:

    for var_name in ds.data_vars:

        new_var_name = match_rename[var_name] if var_name in match_rename.keys() else var_name
        # print('Reducing (%s) to %s index level AGL'%(var_name, new_var_name))

        var = ds[var_name]
        
        if 'level' in var.coords:
            for level in var.level:
                if level >= 200:

                    print('\r%s %s'%(var_name, level.values), end='')

                    var_agl = var.sel(level=level).drop(['init', 'valid', 'level', 'latitude', 'longitude'])
                    df.append(var_agl.to_dataframe(name='%s_%d'%(new_var_name.upper(), level.values)))
qhtop
                    del var_agl
                    gc.collect()
        else:
            df.append(var.drop(['init', 'valid', 'latitude', 'longitude']
                              ).to_dataframe(name='%s'%new_var_name.upper()))
            
df = reduce(lambda left, right: pd.merge(left, right, on=['time']), df)
df = df.rename(columns={'SWE_MM':'swe_mm'})

df.to_pickle('./%s_%dh_delay%d_extract_gfs_ISO.pd'%(site, interval, delay))

spd 10000

In [None]:
orog = sfc.orog
gh = iso.gh

lowest_level = np.full(orog.shape, fill_value=np.nan)
lowest_level_index = np.full(orog.shape, fill_value=np.nan)

for i, level in enumerate(iso['level'][::-1]):
    
    lev_gh = gh.sel(level=level)
    
    lowest_level = xr.where(orog >= lev_gh, level.values, lowest_level)
    lowest_level_index = xr.where(orog >= lev_gh, i, lowest_level_index)

lowest_level_index = xr.where(np.isnan(lowest_level), 0, lowest_level_index)
lowest_level = xr.where(np.isnan(lowest_level), 1000, lowest_level)

print(lowest_level)

In [None]:
# Force fix
lowest_level = 800

levels_above = iso.level[iso.level <= lowest_level][::-1][:10].values
levels_above

In [None]:
df = []
match_rename = {'absv':'vo', 'gh':'z', 'hpbl':'blh', 'prmsl':'msl', 'tp':'swe_mm',
               'u10':'u10m', 'v10':'v10m', 'u100':'u100m', 'v100':'v100m'}

# Loop over each variable in the xarray
for ds in [iso, sfc.drop('orog')]:

    for var_name in ds.data_vars:

        new_var_name = match_rename[var_name] if var_name in match_rename.keys() else var_name
        # print('Reducing (%s) to %s index level AGL'%(var_name, new_var_name))

        var = ds[var_name]

        
        if 'level' in var.coords:
            
            for i, level in enumerate(levels_above):
                
                var_agl = var.sel(level=level)

                # We could go ahead and append to the pandas dataframe here 
                # at the completion of each level (_01agl, _02agl...)
                # We will have to use [(time), lat, lon] as a multiindex
                var_agl = xr.DataArray(var_agl[::-1], 
                     dims=['time'], 
                     coords={'time':ds['time'],
                             'latitude':ds['latitude'], 
                             'longitude':ds['longitude']})
                
                df.append(var_agl.to_dataframe(name='%s_%02dagl'%(new_var_name.upper(), i+1)))

                del var_agl
                gc.collect()

        else:

            var_agl = xr.DataArray(var.values, 
                dims=['time'], 
                coords={'time':ds['time'],
                    'latitude':ds['latitude'], 
                     'longitude':ds['longitude']})

            df.append(var_agl.to_dataframe(name='%s'%new_var_name.upper()))

In [None]:
df = reduce(lambda left, right: pd.merge(left, right, on=['time', 'latitude', 'longitude']), df)
df = df.rename(columns={'SWE_MM':'swe_mm'}).drop(columns=['latitude', 'longitude'])
swe = df['swe_mm']

In [None]:
df.to_pickle('./%s_%dh_delay%d_extract_gfs_LAG.pd'%(site, interval, delay))

In [None]:
df