# Extract SWE data from SnowModel output and export to zarr in time increments
4/22, 3/31/2020. https://github.com/emiliom

This notebook implements a single scheme to export to zarr on either the local file system, MinIO (local S3) or AWS S3, based on user choice (`FS_type` variable).

## Notes, background, TO-DOs

### Strategies, TO-DOs
- *Ongoing.* Add more attributes, to enrich the metadata
    - Global attributes, including ones matching with CF conventions.
    - Variable attributes, including CF standard name
- DONE. Read a portion of the file at a time. I can't read it all at once b/c it's larger than my available laptop memory (23 GB vs 15.4 GB). But instead of first writing to incremental netcdf files, it's more practical to read the binary data in the same time segments, and write to zarr instead, concatenating along the time dimension
- DONE. Keeping default `NaN` for `_FillValue`. xrray auto added `'_FillValue': nan` to the `encoding` of the `swe` array. See [xarray #1598](https://github.com/pydata/xarray/issues/1598) and [this](https://cf-trac.llnl.gov/trac/ticket/52) for background discussions. CF/netCDF don't forbid NaN as `_FillValue`. [NCEI NetCDF Template discourages it](https://www.nodc.noaa.gov/data/formats/netcdf/v2.0/faq.html).
- DONE. Add dimension variables (time, x, y), with corresponding attributes.
- DONE. Create scheme that accepts a specified/desired number of time steps in each time increment (tsubset), and from it generates a new sequence specifying the number of time steps in each increment. The last increment (last sequence element) may have a smaller number of time steps

### References for NetCDF CF/ACDD conventions
- https://www.nodc.noaa.gov/data/formats/netcdf/v2.0/
- https://www.nodc.noaa.gov/data/formats/netcdf/v2.0/grid.cdl

In [1]:
from collections import OrderedDict
from pathlib import Path
import math
from datetime import datetime as dt

import numpy as np
import pandas as pd
import xarray as xr
import zarr
import s3fs

import matplotlib.pyplot as plt
%matplotlib inline

from snowmodelzarrfs import connect_fs, get_zarrstore

In [2]:
pd.__version__, xr.__version__, zarr.__version__, s3fs.__version__

('1.0.3', '0.15.1', '2.4.0', '0.4.0')

In [3]:
# base_dpth = Path("/usr/mayorgadat/workmain/aarendt/CSO/projectwork/snow_model")

base_dpth = Path("../tests")

## Set XYT dimensions and binary blob file name

In [4]:
##USER INPUTS## - most of this can be read directly from the .ctl file or the .par file 
#-> need to read in text file

# model filename
inFile = 'swed.gdat'

# start date
st = "2014-10-01"
# end date
ed = "2019-09-29"
# number of timesteps in model run 
timesteps = 365 * 5

# from .ctl file
nx = 1382 # number of cells in the x dimension
ny = 2476 # number of cells in the y dimension
xll = 487200
yll = 4690100
clsz = 100 # cellsize

## Prepare derived coordinate (XYT) arrays and variables

In [5]:
def time_increments(timesteps, target_increments):
    nts_increments = [target_increments] * int(timesteps / target_increments)
    nts_increments.append(timesteps % target_increments)
    nts_increments = np.array(nts_increments)
    nts_increment_offsets = nts_increments.cumsum() - nts_increments
    
    return nts_increments, nts_increment_offsets

The geographical projection is UTM Zone 12N, `epsg:32612`

In [6]:
# Easting (X) and Northing (Y) arrays
easting_x = np.arange(xll, xll+nx*clsz, clsz)
northing_y = np.arange(yll, yll+ny*clsz, clsz)

In [7]:
time_days = pd.date_range(st, periods=timesteps)

In [8]:
len(easting_x), len(northing_y), timesteps

(1382, 2476, 1825)

In [9]:
time_days[0], time_days[-1]

(Timestamp('2014-10-01 00:00:00', freq='D'),
 Timestamp('2019-09-29 00:00:00', freq='D'))

## Workhorse functions

In [10]:
def read_as_xrds_tsubset(fpath, time, y, x, nts, nts_offset=0, minval=0):
    """open grads model output file and output a 
    metadata enriched xarray dataset"""

    nx, ny = len(x), len(y)
    # count : int. Number of items to read. -1 means all items (i.e., the complete file).
    numpy_data = np.fromfile(
        fpath, 
        dtype=np.float32, 
        count=nts*ny*nx,
        offset=4*nts_offset*ny*nx  # offset is in bytes
    ).reshape((nts, ny, nx))
    # Switching to using valid_min attribute instead
    # numpy_data = numpy_data.clip(min=minval)

    time_subset = time[nts_offset:nts_offset+nts]

    # TODO: Generalize attribute assignements to use a dictionary (or OrderedDict)
    #  to be passed to this function

    # Convert data to xarray DataArray, then add variable attributes
    datavar = xr.DataArray(
        numpy_data,
        dims=('time', 'y', 'x'), 
        coords={'time': time_subset, 'y': y, 'x': x}
    )
    datavar.attrs['long_name'] = 'Snow Water Equivalent'
    datavar.attrs['standard_name'] = 'lwe_thickness_of_surface_snow_amount'
    datavar.attrs['units'] = 'meters'
    datavar.attrs['valid_min'] = float(minval)

    ## Create xarray Dataset, including dimension variables
    d = OrderedDict()
    d['time'] = ('time', time_subset)
    d['x'] = ('x', x)
    d['y'] = ('y', y)
    d['swe'] = datavar
    ds = xr.Dataset(d)

    # Add global attributes
    ds.attrs['description'] = "SnowModel model run, SWE variable only"
    ds.attrs['CRS'] = "UTM Zone 12N, EPSG:32612"

    # Add dimension/coordinate variable attributes
    ds.time.attrs['standard_name'] = "time"
    ds.time.attrs['axis'] = "T"

    ds.x.attrs['long_name'] = "Easting"
    ds.x.attrs['units'] = "meters"
    ds.x.attrs['axis'] = "X"

    ds.y.attrs['long_name'] = "Northing"
    ds.y.attrs['units'] = "meters"
    ds.y.attrs['axis'] = "Y"
    
    return ds

In [11]:
def ds_increment_to_zarr(fs_type, zarrstore, ds, chunk, is_first=False):
    """Chunk and export to Zarr"""
    
    chunked_ds = ds.chunk(chunk)
    
    compute = False if fs_type.endswith('s3') else True
    if is_first:
        store = chunked_ds.to_zarr(
            store=zarrstore,
            mode='w',
            consolidated=True,
            compute=compute
        )
    else:
        store = chunked_ds.to_zarr(
            store=zarrstore,
            mode='a',
            consolidated=True,
            append_dim="time",
            compute=compute
        )
    
    if fs_type.endswith('s3'):
        store.persist(retries=100)

## Read binary blob data incrementally and export to Zarr incrementally
Iterate over nts_increments and nts_increment_offsets, generate xarray Dataset, and write (write first one, then append) each increment to zarr

### Configuration for export to Zarr

In [12]:
aws_profile_name = 'cso'

# Options: localfs, localminio_s3, aws_s3
FS_type, bucket = "localfs", "snowmodel"

# Use -1 to process the entire dataset
timesteps_to_process = -1

# should nts_target_increment be the same as the chunk time size?

# Optimizing for whole-spatial-domain queries
# zarrds = "swe_run_a-geo.zarr"
# nts_target_increment = 4 
# chunk = {'time': 4}

# Optimizing for cell time-series queries
zarrds = "swe_run_a-ts-xx.zarr"
nts_target_increment = 460
chunk = {'time': 460, 'x': 150, 'y': 150}

### Execute incremental read and export to Zarr

In [13]:
nts_increments, nts_increment_offsets = time_increments(timesteps, nts_target_increment)

nts_increments[-3:], nts_increment_offsets

(array([460, 460, 445]), array([   0,  460,  920, 1380]))

In [14]:
len(time_days), len(nts_increments), len(nts_increment_offsets)

(1825, 4, 4)

Generic strategy to export to local zarr, MinIO zarr, or AWS S3 zarr

In [15]:
FS = connect_fs(FS_type, aws_profile_name=aws_profile_name)

In [16]:
zarrstore = get_zarrstore(FS, FS_type, bucket, zarrds, base_dpth=base_dpth)

In [17]:
# if FS_type == 'localfs':
#     # Looks like `open_zarr` doesn't accept pathlib Paths, so convert to str
#     zarrstore = str(base_dpth / bucket / zarrds)
# elif FS_type == 'localminio_s3':
#     FS = s3fs.S3FileSystem(
#         key='minioadmin',
#         secret='minioadmin',
#         client_kwargs={"endpoint_url": "http://172.17.0.2:9000"}
#     )
#     zarrstore = s3fs.S3Map(f"{bucket}/{zarrds}", s3=FS)
# elif FS_type == 'aws_s3':
#     # Use stored credentials file, ~/.aws/credentials
#     # Can also pass the credentials here explicitly via parameters
#     # aws_access_key_id and aws_secret_access_key, but that's not safe
#     aws_session = boto3.Session(
#         profile_name='cso',
#         aws_session_token=None,
#         region_name='us-west-2'
#     )
#     FS = s3fs.S3FileSystem(session=aws_session)
#     zarrstore = s3fs.S3Map(f"{bucket}/{zarrds}", s3=FS)

In [18]:
print(dt.strftime(dt.utcnow(), '%Y-%m-%dT%H:%M:%SZ'))

if timesteps_to_process < 0:
    nts_increments_to_process = len(nts_increments)
else:
    # rounds up to the next complete increment, not just the number of timesteps
    nts_increments_to_process = math.floor(timesteps_to_process / nts_target_increment + 1)

for nts_i in range(nts_increments_to_process):
    # Read increment from the binary blob
    ds = read_as_xrds_tsubset(base_dpth / inFile, time_days, northing_y, easting_x,
                              nts_increments[nts_i], 
                              nts_offset=nts_increment_offsets[nts_i], 
                              minval=0)
    
    print(
        "**** Data increment: index {idx_start} - {idx_end}, {date_start} - {date_end}".format(
            idx_start=nts_increment_offsets[nts_i],
            idx_end=nts_increment_offsets[nts_i]+nts_increments[nts_i]-1,
            date_start=time_days[nts_increment_offsets[nts_i]],
            date_end=time_days[nts_increment_offsets[nts_i]+nts_increments[nts_i]-1]
        )
    )
    
    # Export increment to zarr
    # print(dt.strftime(dt.utcnow(), '%Y-%m-%dT%H:%M:%SZ'))
    is_first = True if nts_i == 0 else False        
    %time ds_increment_to_zarr(FS_type, zarrstore, ds, chunk, is_first)
    
print(dt.strftime(dt.utcnow(), '%Y-%m-%dT%H:%M:%SZ'))

2020-04-23T02:30:12Z
**** Data increment: index 0 - 459, 2014-10-01 00:00:00 - 2016-01-03 00:00:00
CPU times: user 22.1 s, sys: 10.5 s, total: 32.5 s
Wall time: 16.5 s
**** Data increment: index 460 - 919, 2016-01-04 00:00:00 - 2017-04-07 00:00:00
CPU times: user 22.7 s, sys: 6.43 s, total: 29.1 s
Wall time: 16.7 s
**** Data increment: index 920 - 1379, 2017-04-08 00:00:00 - 2018-07-11 00:00:00
CPU times: user 22.8 s, sys: 6.2 s, total: 29 s
Wall time: 25.2 s
**** Data increment: index 1380 - 1824, 2018-07-12 00:00:00 - 2019-09-29 00:00:00
CPU times: user 22.2 s, sys: 5.89 s, total: 28.1 s
Wall time: 18.3 s
2020-04-23T02:32:39Z


### Processing times for writing the whole 25 GB dataset to zarr
- **To zarr on AWS**
    - 3/31: 94 minutes, using `model_increment = 460` and `chunk= {'time': 460, 'x': 150, 'y': 150}`. Increments were 20-23 min.
    - 3/31: 4 hr 32 min, using `model_increment = 4` and `chunk= {'time': 4}`. Increments were 5 - 57 secs
- **To zarr on local file system**
    - 3/30 7:30pm: 2 min, using `chunk = {'time': 400, 'x': 200, 'y': 200}`. Each increment took 7.7 - 15 seconds. The size of the chunk files is 5 - 45 MB

## Read zarr dataset back, for verification

In [19]:
zds = xr.open_zarr(
    store=zarrstore, 
    consolidated=True
)

In [20]:
zds

Unnamed: 0,Array,Chunk
Bytes,24.98 GB,41.40 MB
Shape,"(1825, 2476, 1382)","(460, 150, 150)"
Count,681 Tasks,680 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 24.98 GB 41.40 MB Shape (1825, 2476, 1382) (460, 150, 150) Count 681 Tasks 680 Chunks Type float32 numpy.ndarray",1382  2476  1825,

Unnamed: 0,Array,Chunk
Bytes,24.98 GB,41.40 MB
Shape,"(1825, 2476, 1382)","(460, 150, 150)"
Count,681 Tasks,680 Chunks
Type,float32,numpy.ndarray


In [21]:
zds.swe.encoding

{'chunks': (460, 150, 150),
 'compressor': Blosc(cname='lz4', clevel=5, shuffle=SHUFFLE, blocksize=0),
 'filters': None,
 '_FillValue': nan,
 'dtype': dtype('float32')}

In [22]:
zds.swe.attrs

{'long_name': 'Snow Water Equivalent',
 'standard_name': 'lwe_thickness_of_surface_snow_amount',
 'units': 'meters',
 'valid_min': 0.0}

### Changing the attributes using zarr directly, *after* creating the zarr dataset
Use this code to modify global and variable attributes of existing zarr datasets.

In [23]:
# zarr.open_consolidated doesn't allow changing the metadata
# zstore = zarr.open_consolidated(store=zarrstore, mode='r+')

# zstore = zarr.open(store=zarrstore, mode='r+')

In [24]:
# zstore.attrs['new_appended_attr'] = 'my appended attribute'
# zstore.swe.attrs['swe_append_attr'] = 'my swe appended attribute'

In [25]:
# Must pass the zarr store path, not the opened zstore
# zarr.consolidate_metadata(zarrstore)