In [7]:
import os
import gc
import sys
import numpy as np
from glob import glob
import xarray as xr

from subprocess import run, PIPE

from multiprocessing import Pool, cpu_count
from functools import partial

os.environ['OMP_NUM_THREADS'] = '1'

In [3]:
lat, lon, start, end = 40.4561, -112.52351, 1980, 2020 #sys.argv[1:]
lat, lon = float(lat), float(lon)
start, end = int(start), int(end)

print('Creating ERA5 Profile: {}, {}, {}, {}'.format(
    lat, lon, start, end))

Creating ERA5 Profile: 40.4561, -112.52351, 1980, 2020


In [4]:
isodir = '/uufs/chpc.utah.edu/common/home/steenburgh-group10/mewessler/era5/iso/'
sfcdir = '/uufs/chpc.utah.edu/common/home/steenburgh-group10/mewessler/era5/sfc/'
profdir = '/uufs/chpc.utah.edu/common/home/steenburgh-group10/mewessler/era5/profiles/disagg/'

In [5]:
isokeys = ['q', 't', 'u', 'v', 'vo', 'w', 'z', 'r']
sfckeys = ['100u', '100v', '10u', '10v', '2d', '2t', 'blh', 'cape', 'msl', 'sp']

In [23]:
sample = xr.open_dataset('./era5_iso_sample.nc')
a = abs(sample['latitude']-lat)+abs(sample['longitude']-360-lon)
yi, xi = np.unravel_index(a.argmin(), a.shape)

lat = sample.isel(latitude=yi, longitude=xi)['latitude']
lon = sample.isel(latitude=yi, longitude=xi)['longitude'] - 360
print('ERA5 profile at gridpoint: %.2f, %.2f'%(lat, lon))

def get_year(year, key, levelset):
    
    # print('Working on: %s %04d'%(key, year))
        
    year_data = []
    for month in np.arange(1, 2+1):
        
        print('Working on: %s %04d %02d'%(key, year, month))
        
        datadir = isodir if levelset == 'iso' else sfcdir
        date_dir = datadir + '%04d%02d'%(year, month)

        flist = sorted(glob(date_dir + '/*_%s.*'%key))
        # flist = flist[0] if len(flist) == 1 else flist
        
        month_data = xr.open_mfdataset(flist, concat_dim='time', drop_variables=['utc_date'], parallel=True,
                                  decode_cf=True, decode_times=True, decode_coords=False,
                                 ).isel(latitude=yi, longitude=xi).drop(['latitude', 'longitude'])
        
        try:
            if levelset == 'iso':
                month_data = month_data.chunk({'time':month_data[key.upper()].shape[0]*1, 
                                 'level':month_data[key.upper()].shape[1]*1}).load()
            else:
                month_data = month_data.chunk({'time':month_data[key.upper()].shape[0]*1,}).load()
        except:
            raise
            print('Failed: %04d %02d'%(year, month))
        
        else:
            month_data.attrs = {}
            year_data.append(month_data)

    try:
        year_data = xr.concat(year_data, dim='time')
        year_data = year_data#.chunk({'time':year_data[key.upper()].shape[0],
                             #   'level':year_data[key.upper()].shape[1]})
    except:
        return None
    else:
        return year_data#.load()

ERA5 profile at gridpoint: 40.50, -112.50


In [29]:
mpfunc = partial(get_year, key='cape', levelset='sfc')

result = [mpfunc(y) for y in np.arange(1990, 1995)]

# with get_context('forkserver').Pool(len(np.arange(1990, 2001))) as p: #start, end+1)))
#     result = p.map(mpfunc, np.arange(1990, 2001), chunksize=1)
#     p.close()
#     p.join()

Working on: cape 1990 01
Working on: cape 1990 02
Working on: cape 1991 01
Working on: cape 1991 02
Working on: cape 1992 01
Working on: cape 1992 02
Working on: cape 1993 01
Working on: cape 1993 02
Working on: cape 1994 01
Working on: cape 1994 02


In [21]:
xr.open_mfdataset('/uufs/chpc.utah.edu/common/home/steenburgh-group10/mewessler/era5/sfc/200001/e5.oper.an.sfc.128_059_cape.ll025sc.2000010100_2000013123.WE.nc')

<xarray.Dataset>
Dimensions:    (latitude: 81, longitude: 113, time: 744)
Coordinates:
  * latitude   (latitude) float64 50.0 49.75 49.5 49.25 ... 30.5 30.25 30.0
  * longitude  (longitude) float64 232.0 232.2 232.5 232.8 ... 259.5 259.8 260.0
  * time       (time) datetime64[ns] 2000-01-01 ... 2000-01-31T23:00:00
Data variables:
    CAPE       (time, latitude, longitude) float32 dask.array<shape=(744, 81, 113), chunksize=(744, 81, 113)>
    utc_date   (time) int32 dask.array<shape=(744,), chunksize=(744,)>
Attributes:
    DATA_SOURCE:          ECMWF: https://cds.climate.copernicus.eu, Copernicu...
    NETCDF_CONVERSION:    CISL RDA: Conversion from ECMWF GRIB1 data to netCDF4.
    NETCDF_VERSION:       4.6.1
    CONVERSION_PLATFORM:  Linux casper05 3.10.0-693.21.1.el7.x86_64 #1 SMP We...
    CONVERSION_DATE:      Fri Jul 26 11:45:49 MDT 2019
    Conventions:          CF-1.6
    NETCDF_COMPRESSION:   NCO: Precision-preserving compression to netCDF4/HD...
    history:              Fri A