In [1]:
import os
import pygrib
import cfgrib
import pandas as pd
import numpy as np
import xarray as xr
import matplotlib.pyplot as plt
import multiprocessing as mp

from glob import glob
from datetime import datetime, timedelta

os.environ['OMP_NUM_THREADS'] = '1'

In [47]:
nbm_dir = '/scratch/general/lustre/u1070830/nbm/'
urma_dir = '/scratch/general/lustre/u1070830/urma/'

nbm_shape = (1051, 1132)

In [51]:
def unpack_fhr(nbm_file, returned=False):
    
    print(nbm_file.split('/')[-2:])
    
    with pygrib.open(nbm_file) as grb:
        
        msgs = grb.read()
        if len(msgs) > 0:
                    
            _init = nbm_file.split('/')[-2:]
            init = datetime.strptime(
                _init[0] + _init[1].split('.')[1][1:-1], 
                '%Y%m%d%H')

            if init.hour % 6 != 0:
                init -= timedelta(hours=1)

            lats, lons = grb.message(1).latlons()

            valid = datetime.strptime(
                str(msgs[0].validityDate) + '%02d'%msgs[0].validityTime, 
                '%Y%m%d%H%M')
            
            step = valid - init
            lead = int(step.days*24 + step.seconds/3600)

            for msg in msgs:

                if 'Probability of event above upper limit' in str(msg):

                    interval = msg['stepRange'].split('-')
                    interval = int(interval[1]) - int(interval[0])

                    threshold = msg.upperLimit

                    if ((threshold == 0.254)&(interval == 24)):
                        
                        returned = True
                        return (init, valid, lead, msg.values)
                    
            if not returned:
                return(init, valid, lead, np.full(nbm_shape, fill_value=np.nan))
            
        else:
            print('%s: No grib messages'%nbm_file.split('/')[-2:])
                    
test = unpack_fhr(nbm_flist_agg[-1]); test

['20201014', 'blend.t18z.qmd.f180.WR.grib2']


(datetime.datetime(2020, 10, 14, 18, 0),
 datetime.datetime(2020, 10, 22, 6, 0),
 180,
 array([[nan, nan, nan, ..., nan, nan, nan],
        [nan, nan, nan, ..., nan, nan, nan],
        [nan, nan, nan, ..., nan, nan, nan],
        ...,
        [nan, nan, nan, ..., nan, nan, nan],
        [nan, nan, nan, ..., nan, nan, nan],
        [nan, nan, nan, ..., nan, nan, nan]]))

In [None]:
# def get_init(init):
    
#     try:
#         nbm_flist = sorted(glob(nbm_dir + init.strftime('%Y%m%d') + '/*t%02dz*'%init.hour))
#         nbm_flist[0]
#     except:
#         nbm_flist = sorted(glob(nbm_dir + init.strftime('%Y%m%d') + '/*t%02dz*'%(init+timedelta(hours=1)).hour))
    
#     if len(nbm_flist) > 0:
        
#         with mp.get_context('fork').Pool(len(nbm_flist)) as p:
#             returns = p.map(unpack_fhr, nbm_flist, chunksize=1)
#             p.close()
#             p.join()

#     returns = np.array([r for r in returns if r is not None], dtype=object)

#     time = returns[:, 0].astype(np.datetime64)
#     data = xr.DataArray(
#         np.array([r for r in returns[:, 1]], dtype=np.int8), 
#         dims={'x', 'y', 'valid'},
#         coords={'valid':time})
        
#     return data

In [4]:
inits = pd.date_range(datetime(2020, 9, 15, 0), datetime(2020, 10, 14, 23), freq='6H')
inits

DatetimeIndex(['2020-09-15 00:00:00', '2020-09-15 06:00:00',
               '2020-09-15 12:00:00', '2020-09-15 18:00:00',
               '2020-09-16 00:00:00', '2020-09-16 06:00:00',
               '2020-09-16 12:00:00', '2020-09-16 18:00:00',
               '2020-09-17 00:00:00', '2020-09-17 06:00:00',
               ...
               '2020-10-12 12:00:00', '2020-10-12 18:00:00',
               '2020-10-13 00:00:00', '2020-10-13 06:00:00',
               '2020-10-13 12:00:00', '2020-10-13 18:00:00',
               '2020-10-14 00:00:00', '2020-10-14 06:00:00',
               '2020-10-14 12:00:00', '2020-10-14 18:00:00'],
              dtype='datetime64[ns]', length=120, freq='6H')

In [5]:
nbm_flist_agg = []

for init in inits:
    
    try:
        nbm_flist = sorted(glob(nbm_dir + init.strftime('%Y%m%d') + '/*t%02dz*'%init.hour))
        nbm_flist[0]
        
    except:
        nbm_flist = sorted(glob(nbm_dir + init.strftime('%Y%m%d') + '/*t%02dz*'%(init+timedelta(hours=1)).hour))
    
    if len(nbm_flist) > 0:
        
        nbm_flist_agg.append(nbm_flist)

In [6]:
nbm_flist_agg = np.hstack(nbm_flist_agg)
nbm_flist_agg

array(['/scratch/general/lustre/u1070830/nbm/20200915/blend.t01z.qmd.f005.WR.grib2',
       '/scratch/general/lustre/u1070830/nbm/20200915/blend.t01z.qmd.f011.WR.grib2',
       '/scratch/general/lustre/u1070830/nbm/20200915/blend.t01z.qmd.f017.WR.grib2',
       ...,
       '/scratch/general/lustre/u1070830/nbm/20201014/blend.t18z.qmd.f168.WR.grib2',
       '/scratch/general/lustre/u1070830/nbm/20201014/blend.t18z.qmd.f174.WR.grib2',
       '/scratch/general/lustre/u1070830/nbm/20201014/blend.t18z.qmd.f180.WR.grib2'],
      dtype='<U84')

In [None]:
workers = 12

with mp.get_context('fork').Pool(workers) as p:
    returns = p.map(unpack_fhr, nbm_flist, chunksize=1)
    p.close()
    p.join()