# Load and Save hourly HRDPS files
Load in and process HRDPS files here then export them as netcdf or numpy arrays

In [1]:
from pathlib import Path
import time
import datetime as dt

import numpy as np
import xarray as xr

In [2]:
hrdps_path = Path("/results/forcing/atmospheric/GEM2.5/operational/")

Processing all of the years at once with a single open_mfdataset() call was slow and had a huge memory footprint. So process 1 year at a time, storing the years in a dict, then concatenate them near the end of the processing.

In [3]:
start_year, end_year = 2014, 2020
years = range(start_year, end_year+1)

The PerformanceWarning messages and a substantial increase in execution time and memory footprint were due to the addition of the 3 variables LHTFL_surface, PRATE_surface, and RH_2maboveground to the HRDPS datasets on 5-Dec-2018. Adding those variables to the ones to drop resolved those issues.

In [4]:
drop_vars = (
    "precip", "qair", "solar", "therm_rad", "percentcloud", 
    "LHTFL_surface", "PRATE_surface", "RH_2maboveground",
)
hrdps = {}
t_total_start = time.time()
for year in years:
    print(f"start gathering {year}")
    t_start = time.time()
    hrdps[year] = xr.open_mfdataset(
        sorted(hrdps_path.glob(f"ops_y{year}*.nc")), #new naming convention from sep 12 2014-present
        drop_variables=drop_vars,
    )
    print(f"finished gathering {year}: {time.time() - t_start}s")
print(f"total gathering time: {time.time() - t_total_start}s")

start gathering 2014
finished gathering 2014: 10.122012376785278s
start gathering 2015
finished gathering 2015: 30.291906595230103s
start gathering 2016
finished gathering 2016: 32.065611124038696s
start gathering 2017
finished gathering 2017: 31.87451410293579s
start gathering 2018
finished gathering 2018: 30.619048357009888s
start gathering 2019
finished gathering 2019: 36.012388706207275s
start gathering 2020
finished gathering 2020: 38.97812366485596s
total gathering time: 209.96704363822937s


The dask optimization tips section in the xarray docs recommends doing .sel() and .isel() operations before resample() and groupby() ones. So, we will do that.<br>

Choosing to store the datasets we create at each step in new variables so that we can inspect them. They don't take up too much more memory because they are collections of dask arrays, not the actual results.

In [5]:
hrdps_ssc = {
    year: hrdps[year].sel(x=slice(0, 480000))
    for year in years
}

Now we resample to get daily averages. Again, I am using a dict comprehension. This step does trigger some computation across all of the blocks of data that dask has divided things up into. So, it takes a bit of time, and I have wrapped it in timing code.<br>
NOT DOING THIS STEP IN THIS FILE AS WE WANT TO WORK WITH THE HOURLY DATA

In [9]:
# t_start = time.time()
# day_avgs = {
#     year: hrdps_ssc[year].resample(time_counter="D").mean(dim="time_counter")
#     for year in years
# }
# print(f"resampled to day averages: {time.time() - t_start}s")

resampled to day averages: 48.126585721969604s


Next, we concatenate the years together. Recall that day_avgs, like hrdps above is a dict whose keys are the year numbers, and whose values are xarray datasets. We get the the collection of datasets by calling the values() method on the dict. It's unfortunately confusing the method to do that has the same name as the method to access the NumPy array underlying an xarray.DataArray.

In [6]:
# hrdps_allyears = xr.concat(day_avgs.values(), dim="time_counter")
hrdps_allyears = xr.concat(hrdps_ssc.values(), dim="time_counter")

Now do a little bit of cleanup

In [7]:
# Drop the time_counter coordinate from nav_lat and nav_lon to make them 2d variables
hrdps_allyears["nav_lat"] = hrdps_allyears.nav_lat.sel(time_counter=hrdps_allyears.time_counter[0])
hrdps_allyears["nav_lon"] = hrdps_allyears.nav_lon.sel(time_counter=hrdps_allyears.time_counter[0])
#note how easy this is to do within an xarray instead of within multiple numpy arrays

In [8]:
# Restore that attribute metadata of the variables that got lost during resampling
for var in hrdps_allyears.data_vars:
    hrdps_allyears[var].attrs = hrdps[start_year][var].attrs

This is where the rubber hits the road!<br>

All of the above processing has been telling dask to add tasks to its processing graph but deferring the actual processing until we actually need to access its results (lazy processing). Doing things like accessing the underlying NumPy arrays in the dataset to plot them or look at a slice triggers processing. But it is best for large data collections for us to control when the processing is triggered, and how it is executed. We do that with the load() method on the dataset.<br>

For the kind of "concatenate and lightly process lots of netCDF datasets" workload in this notebook, telling dask to launch a collection of separate Python processes (workers) to distribute the tasks in its graph on to is usually the best approach. Here I chose to use 8 workers because I was running the notebook on salish while the nowcast-dev NEMO run was in progress. If top showed me that salish was not busy doing anything else I would have used up to 32 workers.

In [9]:
num_workers = 8
t_start = time.time()
hrdps_loaded = hrdps_allyears.load(scheduler="processes", num_workers=num_workers)
print(f"dask processing in {num_workers} processes to load result: {time.time() - t_start}s")

dask processing in 8 processes to load result: 738.5992617607117s


In [10]:
#add netCDF metadata
netcdf_title = 'HRDPShourly.nc'
netcdf_comment = 'HRDPS2.5 dataset from September 12, 2014-Decmber 31,2020 used in CanRCM4 downscaling attempt for the Salish Sea'
notebook = 'LoadFiles.ipynb'

ds_attrs = {
        'creator_email':
            'rbeutel@eoas.ubc.ca',
        'institution_fullname': (
            'Earth, Ocean & Atmospheric Sciences,'
            ' University of British Columbia'
        ),
    'title': netcdf_title,
    'comment': netcdf_comment,
    'notebook': notebook,
    'summary': f'sea-level pressure, N/S wind, E/W wind, temperature',
    'history': (
            '[{}] File creation.'
            .format(dt.datetime.today().strftime('%Y-%m-%d'))
        )
}

#hrdps_loaded["attrs"]=ds_attrs #<- saying this is an invalid type and not sure why?
#this was a bit of an asside extra step to see how to do this, not a priority so just going to skip for now, ask Doug about this another time

In [11]:
hrdps_loaded

Dump the daily averaged HRDPS fields and geo-coordinates to a netCDF so that they can be loaded with xarray.open_dataset() in the other processing steps without repeating the above processing.<br>

nc_file can be any path your want. The one below puts the file in the same directory as this notebook.<br>

NOTE: never commit this netCDF file to Git <br>
a) it can be regenerated by this code anyways<br>
b) its wayy too big

In [12]:
nc_file = Path("hrdps_hourly_postSep2014.nc")
hrdps_loaded.to_netcdf(nc_file, unlimited_dims=('time_counter'))