In [1]:
# default_exp convert_to_zarr

# Notebook which converts per region netCDF files to Zarr files to make them more efficient when indexing

### uses pangeo_small environment

In [1]:
#test_ignore
%reload_ext autoreload
%autoreload 2
%matplotlib inline

In [2]:
#export
import xarray as xr
import zarr
from joblib import Parallel, delayed
import pandas as pd
import os
from numcodecs import Blosc

## Set the parameters here
### Ensure all State and Regions you want to transform are specified here. 

In [3]:
#test_ignore
data_root = '/media/scottcha/Data2/OAPMLData/'
output_root = '/media/scottcha/E1/Data/OAPMLData/Temp2'
interpolation = 1

#currently only have Washington regions and one season specified for the tutorial
#uncomment regions and seasons if doing a larger transform
regions = {#'Utah': ['Abajos', 'Logan', 'Moab', 'Ogden', 'Provo', 
           #'Salt Lake', 'Skyline', 'Uintas'],  
           #'Colorado': ['Grand Mesa Zone', 'Sangre de Cristo Range', 'Steamboat Zone', 'Front Range Zone',
           #'Vail Summit Zone', 'Sawatch Zone', 'Aspen Zone', 
           #'North San Juan Mountains', 'South San Juan Mountains', 'Gunnison Zone'],
           #'Washington': ['Mt Hood', 'Olympics', 'Snoqualmie Pass', 'Stevens Pass',
           #'WA Cascades East, Central', 'WA Cascades East, North', 'WA Cascades East, South',
           #'WA Cascades West, Central', 'WA Cascades West, Mt Baker', 'WA Cascades West, South'
           #]           
           'Colorado': ["Aspen Zone",]#"Northwest Coastal", "Northwest Inland", "Sea To Sky", 
                       #"South Coast Inland", "South Coast", "North Rockies", 
                       #"Cariboos", "North Columbia", "South Columbia", "Purcells", 
                       #"Kootenay Boundary", "South Rockies", "Lizard Range and Flathead", 
                       #"Vancouver Island", "Kananaskis Country, Alberta Parks", "Chic Chocs, Avalanche Quebec",
                       #"Little Yoho", "Banff, Yoho and Kootenay National Parks", "Glacier National Park",
                       #"Waterton Lakes National Park", "Jasper National Park"]
           }
seasons = ['19-20']#, '20-21']

In [19]:
ds = xr.open_dataset('/media/scottcha/Data2/OAPMLData/3.GFSFiltered1xInterpolation1d/19-20/Region_Aspen Zone_20200103.nc', chunks={'latitude':1, 'longitude':1})

In [5]:
from dask.distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=12)
client = Client(cluster)
client

0,1
Client  Scheduler: tcp://127.0.0.1:37659  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 12  Cores: 24  Memory: 135.10 GB


In [27]:
ds4 = xr.open_mfdataset('/media/scottcha/Data2/OAPMLData/3.GFSFiltered1xInterpolation3h/19-20/Region_Little Yoho_2020010[1-2].nc', combine='by_coords', chunks={'latitude':1, 'longitude':1, 'time':1}, parallel=True)

In [23]:
ds4.chunks

Frozen(SortedKeysDict({'time': (1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1), 'latitude': (1,), 'longitude': (1,)}))

In [29]:
ds4 = ds4.to_array(name='vars').chunk({'time':8, 'latitude':1, 'longitude':1, 'variable':-1}).to_dataset()

In [31]:
ds4.chunks

Frozen(SortedKeysDict({'variable': (1641,), 'time': (8, 8), 'latitude': (1,), 'longitude': (1,)}))

In [32]:
ds4.to_zarr('/media/scottcha/E1/Data/OAPMLData/Temp2/Little_yoho2.zarr', consolidated=True)

<xarray.backends.zarr.ZarrStore at 0x7f16479af710>

In [20]:
ds1 = ds.to_array(name='vars').chunk({'time':7, 'latitude':4, 'longitude':4, 'variable':-1}).to_dataset()

In [21]:
ds1

In [12]:
ds2 = ds.to_array(name='vars').chunk({'time':1, 'latitude':-1, 'longitude':-1}).to_dataset()
ds2

In [62]:
ds3 = ds.chunk({'time':1, 'latitude':1, 'longitude':1})

In [9]:
compressor = Blosc(cname='zstd', clevel=3, shuffle=Blosc.BITSHUFFLE)
enc = {x: {"compressor": compressor} for x in ds1}

In [22]:
#ds1.to_zarr('/media/scottcha/E1/Data/OAPMLData/Temp2/Aspen.zarr', consolidated=True, encoding=enc)
ds1.to_zarr('/media/scottcha/E1/Data/OAPMLData/Temp2/Aspen.zarr', consolidated=True, append_dim='time')

<xarray.backends.zarr.ZarrStore at 0x7fa91810bfb0>

In [11]:
ds1

In [53]:
ds.sortby('variable')#.to_zarr(zarr_path, consolidated=True, encoding=enc)

KeyError: 'variable'

In [14]:
#export
class ConvertToZarr:
    """
    Class which encapsulates the logic to convert a set of filtered netCDF files to Zarr
    """
    
    def __init__(self, seasons, regions, data_root, interpolate=1, resample_length='1d'):
        """
        Initialize the class
        
        Keyword Arguments
        seasons: list of season values to process
        regions: dictonary of Key: State and Value: List of Regions to process for that state
        data_root: the root path of the data folders which contains the 3.GFSFiltered1xInterpolation
        interpolate: the amount of interpolation applied in in the previous ParseGFS notebook (used for finding the correct input/output paths)
        """
        self.processed_path = data_root + '/3.GFSFiltered'+ str(interpolate) + 'xInterpolation' + resample_length + '/'
        self.zarr_base_path = output_root + '/4.GFSFiltered'+ str(interpolate) + 'xInterpolationZarr' + resample_length + '/'
        
        self.seasons = seasons
        self.regions = regions
        self.data_root = data_root
        self.compressor = Blosc(cname='zstd', clevel=3, shuffle=Blosc.BITSHUFFLE)
        
        if not os.path.exists(self.zarr_base_path):
            os.makedirs(self.zarr_base_path)
    
    def compute_region(self, region_name, season, state):
        """
        Calculates the zarr conversion for a specific region, season and state and indexes it for efficient lookup 
        
        Keyword Arguments
        region_name: name of the region to process
        season: season to process
        state: state to process (region must be a part of the state)
        """
        first = True
        base_path = self.processed_path + season + '/' + '/Region_' + region_name 
        zarr_path = self.zarr_base_path + season + '/' + state + '/Region_' + region_name + '.zarr'
        
        #TODO: refactor these to be shared code as logic also exists in ParseGFS
        p = 181
        if season in ['15-16', '19-20']:
            p = 182 #leap years
        
        if season == '15-16':
            snow_start_date = '2015-11-01'
        elif season == '16-17':
            snow_start_date = '2016-11-01'
        elif season == '17-18':
            snow_start_date = '2017-11-01'
        elif season == '18-19':
            snow_start_date = '2018-11-01'
        elif season == '19-20':
            snow_start_date = '2019-11-01'
        elif season == '20-21':
            snow_start_date = '2020-11-01'
        else:
            raise Exception('No known season ' + season)

        date_values_pd = pd.date_range(snow_start_date, periods=p, freq="D")
        try:
            with xr.open_zarr(zarr_path) as z:
                if z.time.values[-1] == date_values_pd[-1]:
                    print(' already exists: ' + region_name + ' ' + season + ' ' + state)
                    z.close()
                    return
                else:
                    #already exists but incomplete
                    date_values_pd = [pd.Timestamp(v) for v in date_values_pd.values.astype('datetime64[ns]') if v not in z.time.values]
                    print(' some exist but have to complete ' + str(len(date_values_pd)))
                    first = False
        except ValueError as err:
            #ignore as it doesn't exist yet
            pass
        
        #sometimes vars get added, filter to only the list of vars in the first dataset for that region
        #TODO: handle the case where the first dataset has more vars than subsequent ones
        final_vars = None
        last_ds = xr.open_dataset(base_path + '_' + date_values_pd[-1].strftime('%Y%m%d') + '.nc')
        
        last_ds = last_ds.to_array(name='vars').chunk({'time':-1, 'latitude':1, 'longitude':1, 'variable':-1}).to_dataset()
        last_vars = list(last_ds.variable.values)
        
        for d in date_values_pd:

            path =  base_path + '_' + d.strftime('%Y%m%d') + '.nc'
            print('On ' + str(path.split('/')[-1]))

            try:
                ds = xr.open_dataset(path, chunks={'latitude':1, 'longitude':1})
            except OSError as err:
                print(' missing file: ' + path)
                continue

            ds = ds.to_array(name='vars').chunk({'time':-1, 'latitude':1, 'longitude':1, 'variable':-1}).to_dataset()


            if first:
                #find intersection of the first and last variables to try to ensure
                #that we are using only the intersection
                final_vars = [v for v in list(ds.variable.values) if v in last_vars]
                ds = ds.sel(variable=ds.variable.isin(final_vars))
                enc = {x: {"compressor": self.compressor} for x in ds}
                ds.sortby('variable').to_zarr(zarr_path, consolidated=True, encoding=enc)
                first=False
            else:
                assert(final_vars is not None)
                ds = ds.sel(variable=ds.variable.isin(final_vars))                
                ds.sortby('variable').to_zarr(zarr_path, consolidated=True, append_dim='time')


    def process_tuple(self, t): 
        """
        Entry method to call compute_region with a tuple
        Basically a helper for executing with joblib parallel
        
        Keyword Arguments
        t: the tuple containing the region, season and state
        """
        self.compute_region(t[0], t[1], t[2])
    
    def make_list(self):
        """
        Helper method to make the list of values to process
        """
        to_process = []
        for s in self.seasons:
            for state in self.regions.keys():           
                for r in self.regions[state]:
                    to_process.append((r,s,state))
        return to_process
    
    def convert_local(self, jobs=15):
        l = self.make_list()
    
        #one state & season takes about 6 hours with 15 cores on my machine
        Parallel(n_jobs=jobs, backend="multiprocessing")(map(delayed(self.process_tuple), l))

In [15]:
#tmppath =  '/media/scottcha/E1/Data/OAPMLData/3.GFSFiltered1xInterpolation/19-20/Region_Mt Hood_20191120.nc'
#ds = xr.open_dataset(tmppath, chunks={'latitude':1, 'longitude':1})
#ds = ds.to_array(name='vars').chunk({'time':1, 'latitude':1, 'longitude':1, 'variable':-1}).to_dataset()

In [16]:
#final_vars = list(ds.variable.values)

In [17]:
#tmp_ds = ds.sel(variable=ds.variable.isin(final_vars))

In [18]:
#ds

In [19]:
#tmp_ds

In [20]:
#test_ignore
ctz = ConvertToZarr(seasons, regions, data_root, resample_length='1d')

In [23]:
%%time
#Wall time: 6min 13s

ctz.convert_local(jobs = 20)

On Region_Aspen Zone_20191101.nc
On Region_Aspen Zone_20191102.nc
On Region_Aspen Zone_20191103.nc
On Region_Aspen Zone_20191104.nc
On Region_Aspen Zone_20191105.nc
On Region_Aspen Zone_20191106.nc
On Region_Aspen Zone_20191107.nc
On Region_Aspen Zone_20191108.nc
On Region_Aspen Zone_20191109.nc
On Region_Aspen Zone_20191110.nc
On Region_Aspen Zone_20191111.nc
On Region_Aspen Zone_20191112.nc
On Region_Aspen Zone_20191113.nc
On Region_Aspen Zone_20191114.nc
On Region_Aspen Zone_20191115.nc
On Region_Aspen Zone_20191116.nc
On Region_Aspen Zone_20191117.nc
On Region_Aspen Zone_20191118.nc
On Region_Aspen Zone_20191119.nc
On Region_Aspen Zone_20191120.nc
On Region_Aspen Zone_20191121.nc
On Region_Aspen Zone_20191122.nc
On Region_Aspen Zone_20191123.nc
On Region_Aspen Zone_20191124.nc
On Region_Aspen Zone_20191125.nc
On Region_Aspen Zone_20191126.nc
On Region_Aspen Zone_20191127.nc
On Region_Aspen Zone_20191128.nc
On Region_Aspen Zone_20191129.nc
On Region_Aspen Zone_20191130.nc
On Region_

Process ForkPoolWorker-66:
Process ForkPoolWorker-69:
Process ForkPoolWorker-78:
Process ForkPoolWorker-76:
Process ForkPoolWorker-74:
Process ForkPoolWorker-80:
Process ForkPoolWorker-68:
Process ForkPoolWorker-62:
Process ForkPoolWorker-75:
Process ForkPoolWorker-71:
Process ForkPoolWorker-70:
Process ForkPoolWorker-63:
Process ForkPoolWorker-65:
Process ForkPoolWorker-77:
Process ForkPoolWorker-73:
Process ForkPoolWorker-79:
Traceback (most recent call last):
Traceback (most recent call last):
Process ForkPoolWorker-67:
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
  File "/home/scottcha/miniconda3/envs/pangeo_small3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/home/scottcha/miniconda3/envs/pangeo_small3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    se

In [13]:
dstmp = xr.Dataset()

In [18]:
?dstmp.to_zarr()

Object `dstmp.to_zarr()` not found.
