In [8]:
import shutil
import shlex
import subprocess
import sys, os, gc
import numpy as np
import xarray as xr
import pandas as pd
import multiprocessing as mp

from glob import glob
from io import StringIO
from datetime import datetime, timedelta

core_limit = 80

In [9]:
nbm_dir = '/scratch/general/lustre/u1070830/nbm/'

In [10]:
nbm_extract = np.array(sorted(glob(nbm_dir + 'extract/*.nc')))
nbm_agg = np.array(sorted(glob(nbm_dir + 'agg/*.nc')))

In [12]:
# Most recent run in archive
try:
    with xr.open_dataset(sorted([f for f in nbm_agg if 'f024' in f])[-1]) as sample:

        if sample.init.size > 1:
            most_recent_agg = pd.to_datetime(sample.init[-1].values)
        else:
            most_recent_agg = pd.to_datetime(sample.init.values)
except:
    # Start fresh/NBM 4.0 start date
    most_recent_agg = datetime(2020, 9, 30, 12, 0)

most_recent_agg

datetime.datetime(2020, 9, 30, 12, 0)

In [13]:
# Most recent run available
nbm_upload_lag = 6
most_recent_time = datetime.utcnow()

roundUp = True if most_recent_time.minute < 30 else False

most_recent_time = most_recent_time.replace(minute=0, second=0, microsecond=0)

if roundUp:
    most_recent_time += timedelta(hours=1)
    
# Round down to nearest 0, 6, 12, 18, then grab the run prior
most_recent_time -= timedelta(hours=(most_recent_time.hour%6)+6)

print('Fill from:', most_recent_agg)
print('Fill to:', most_recent_time)

Fill from: 2020-09-30 12:00:00
Fill to: 2021-06-02 12:00:00


In [14]:
# if last_nbm_download > last_nbm_agg:
if np.datetime64(most_recent_time) > most_recent_agg:
    
    print('Newer NBM Available')    
    # fill_runs = pd.date_range(last_nbm_agg, last_nbm_download, freq='6H')
    fill_runs = pd.date_range(most_recent_agg, most_recent_time, freq='6H')[1:]
    
fill_runs

Newer NBM Available


DatetimeIndex(['2020-09-30 18:00:00', '2020-10-01 00:00:00',
               '2020-10-01 06:00:00', '2020-10-01 12:00:00',
               '2020-10-01 18:00:00', '2020-10-02 00:00:00',
               '2020-10-02 06:00:00', '2020-10-02 12:00:00',
               '2020-10-02 18:00:00', '2020-10-03 00:00:00',
               ...
               '2021-05-31 06:00:00', '2021-05-31 12:00:00',
               '2021-05-31 18:00:00', '2021-06-01 00:00:00',
               '2021-06-01 06:00:00', '2021-06-01 12:00:00',
               '2021-06-01 18:00:00', '2021-06-02 00:00:00',
               '2021-06-02 06:00:00', '2021-06-02 12:00:00'],
              dtype='datetime64[ns]', length=980, freq='6H')

In [15]:
# Call the NBM download here
python = '/uufs/chpc.utah.edu/common/home/u1070830/anaconda3/envs/xlab/bin/python '
dl_script = '/uufs/chpc.utah.edu/common/home/u1070830/code/model-tools/nbm/get_nbm_gribs_aws.py ' 

dl_start, dl_end = pd.to_datetime(fill_runs[0]), pd.to_datetime(fill_runs[-1])
dl_start, dl_end = [datetime.strftime(t, '%Y%m%d%H%M') for t in [dl_start, dl_end]]

cmd = python + dl_script + '%s %s'%(dl_start, dl_end)
print(cmd)
# subprocess.call(shlex.split(cmd))

/uufs/chpc.utah.edu/common/home/u1070830/anaconda3/envs/xlab/bin/python /uufs/chpc.utah.edu/common/home/u1070830/code/model-tools/nbm/get_nbm_gribs_aws.py 202009301800 202106021200


In [23]:
def extract_fhr_data(f, interval=24, keep_percentiles=[]): 
    import pygrib

    data = []
    
    with pygrib.open(f) as grb:
        
        fhr = int(f.split('.')[3].replace('f', ''))
        
        for msg in grb.read():
            
            step = timedelta(hours=msg.endStep - msg.startStep)
            lead = msg.endStep
            
            if (fhr == lead) & (step == timedelta(hours=interval)):
                
                lats, lons = msg.latlons()

                if 'probability' in str(msg).lower():
                    
                    init = datetime.strptime(str(msg.dataDate) + '%04d'%msg.dataTime, '%Y%m%d%H%M')                    
                    valid = datetime.strptime(str(msg.validityDate) + '%04d'%msg.validityTime, '%Y%m%d%H%M')

                    threshold = msg.upperLimit
                    threshold_in = round(threshold/25.4, 2)

                    if threshold_in <= 4.0:

                        # print(init, valid, threshold, lead, threshold_in)
                        
                        idata = xr.DataArray([msg.data()[0].astype(np.float32)], name='probx',
                                                     dims=('valid', 'y', 'x'), 
                                                     coords={'valid':[valid],
                                                             'lat':(('y', 'x'), lats), 
                                                             'lon':(('y', 'x'), lons)})
                        idata['init'] = init                        
                        idata['interval'] = interval
                        idata['step'] = step
                        idata['fhr'] = lead
                        
                        idata['threshold'] = threshold
                        idata['threshold_in'] = threshold_in
                        
                        # Manually fix encoding issues...
                        idata['init'].encoding['units'] = 'hours since 2000-01-01 00:00:00'
                        idata['valid'].encoding['units'] = 'hours since 2000-01-01 00:00:00'
                        
                        data.append(idata)

                elif 'percentileValue' in msg.keys():
                    
                    if msg.percentileValue in keep_percentiles:
                        
                        # Append this data later with similar code as above, for now pass
                        # print(msg.percentileValue, msg)
                        
                        pass
    
    try:
        data = xr.concat(data, dim='threshold')    
    
    except:
        raise
        pass
    
    else:
        out_file = 'blend.%s.t%02dz.qmd.f%03d.WR.nc'%(init.strftime('%Y%m%d'), init.hour, lead)

        data.to_netcdf(nbm_dir + 'extract/' + out_file, unlimited_dims='valid')
        print(out_file, 'saved')

        del data
        gc.collect()

    return None

print(list_new_extracts[500])
extract_fhr_data(list_new_extracts[500])

/scratch/general/lustre/u1070830/nbm/20210208/blend.t18z.qmd.f096.WR.grib2


ValueError: must supply at least one object to concatenate

In [18]:
nbm_raw = np.array(sorted([f for f in sorted(glob(nbm_dir + '*/*.grib2'))]))
nbm_raw 

for forecast_hour in np.arange(96, 96+1, 24):

    list_new_extracts = []
    for run in fill_runs:        
            try:
                list_new_extracts.append([f for f in nbm_raw if (
                    (run.strftime('%Y%m%d') in f) & 
                    ('t%02dz.qmd.f%03d'%(run.hour, forecast_hour) in f))][0])
            except:
                pass

    list_new_extracts = sorted(list_new_extracts)
    break
    
#     workers = min(core_limit, len(list_new_extracts))
#     print('Extracting %d NBM data files using %d processes'%(len(list_new_extracts), workers))

#     with mp.get_context('fork').Pool(core_limit) as p:
#         p.map(extract_fhr_data, list_new_extracts, chunksize=1)
#         p.close()
#         p.join()
        
#     gc.collect()

list_new_extracts

['/scratch/general/lustre/u1070830/nbm/20201001/blend.t00z.qmd.f096.WR.grib2',
 '/scratch/general/lustre/u1070830/nbm/20201001/blend.t06z.qmd.f096.WR.grib2',
 '/scratch/general/lustre/u1070830/nbm/20201001/blend.t12z.qmd.f096.WR.grib2',
 '/scratch/general/lustre/u1070830/nbm/20201001/blend.t18z.qmd.f096.WR.grib2',
 '/scratch/general/lustre/u1070830/nbm/20201002/blend.t00z.qmd.f096.WR.grib2',
 '/scratch/general/lustre/u1070830/nbm/20201002/blend.t06z.qmd.f096.WR.grib2',
 '/scratch/general/lustre/u1070830/nbm/20201002/blend.t12z.qmd.f096.WR.grib2',
 '/scratch/general/lustre/u1070830/nbm/20201002/blend.t18z.qmd.f096.WR.grib2',
 '/scratch/general/lustre/u1070830/nbm/20201003/blend.t00z.qmd.f096.WR.grib2',
 '/scratch/general/lustre/u1070830/nbm/20201003/blend.t06z.qmd.f096.WR.grib2',
 '/scratch/general/lustre/u1070830/nbm/20201003/blend.t12z.qmd.f096.WR.grib2',
 '/scratch/general/lustre/u1070830/nbm/20201003/blend.t18z.qmd.f096.WR.grib2',
 '/scratch/general/lustre/u1070830/nbm/20201004/blen

In [None]:
list_new_extracts_nc = glob(nbm_dir + 'extract/*.nc')
months = np.unique([f.split('.')[1][:6] for f in list_new_extracts_nc])

for month in months:
    
    for forecast_hour in np.arange(48, 168+1, 24):
        
        print('Working: %s F%03d'%(month, forecast_hour))
        
        month_files = [f for f in list_new_extracts_nc if month in f]
        # print(month_files)
        
        month_data_new = xr.open_mfdataset(month_files, concat_dim='valid')

        # Find existing aggregate if it exists:
        append_to_file = nbm_dir + 'agg/blend.%s.qmd.f%03d.WR.nc'%(month, forecast_hour)
        
        if os.path.isfile(append_to_file):
            
            append_to_ds = xr.open_dataset(append_to_file)
            
            existing_valid_times = append_to_ds.valid.values
            new_valid_times = month_data_new.valid.values
            
            sel_new_valid_times = [t for t in new_valid_times if t not in existing_valid_times]
            
            if len(sel_new_valid_times) > 0:
            
                save_out_new = xr.concat(
                        [append_to_ds, month_data_new.sel(valid=sel_new_valid_times)], 
                    dim='valid')
                
                # Manually fix the encoding issues...
                save_out_new['init'].encoding['units'] = 'hours since 2000-01-01 00:00:00'
                save_out_new['valid'].encoding['units'] = 'hours since 2000-01-01 00:00:00'
            
                save_out_new.to_netcdf(append_to_file + '.new')
                del save_out_new
                
            append_to_ds.close()
            
            del append_to_ds, month_data_new
            gc.collect()
            
        else:        
            month_data_new.to_netcdf(append_to_file)
            
            del month_data_new
            gc.collect()

In [None]:
# # Verify that the new file didn't corrupt the existing data!
# new_agg_temp = sorted([f for f in glob(nbm_dir + 'extract_new/*.nc') if '.new.' in f])
# new_agg_check = sorted([f for f in glob(nbm_dir + 'extract_new/*.nc') if '.new.' not in f])
# old_agg_check = sorted(glob(nbm_dir + 'extract/*.nc'))

# for temp_file, new_file, old_file in zip(new_agg_temp, new_agg_check, old_agg_check):
    
#     try:
#         new = xr.open_dataset(new_file)
#         old = xr.open_dataset(old_file)
    
#     except:
#         pass
    
#     else:
#         if new.valid[-1] > old.valid[-1]:
#             print('New aggregate %s updated, check...'%os.path.basename(new_file))

#             try:
#                 os.remove(temp_file)
#                 print(temp_file, '-->', 'deleted')
#             except:
#                 pass
            
#             try:
#                 shutil.move(old_file, old_file.replace('.nc', '.old.nc'))
#                 print(old_file, '-->', old_file.replace('.nc', '.old.nc'))
#             except:
#                 pass

#             try:
#                 shutil.move(new_file, new_file.replace('extract_new', 'extract'))
#                 print(new_file, '-->', new_file.replace('extract_new', 'extract'))
#             except:
#                 pass

#         else:
#             print('New aggregate %s failed, follow up...'%os.path.basename(new_file))

#         print()