Fetch and organize SNODAS

In [None]:
import os
import glob
import shutil
import numpy as np
import regionmask 
import xarray as xr
import rioxarray
import zarr
from datetime import datetime, timedelta
from dask.distributed import Client, LocalCluster
from pathlib import Path
import geopandas as gpd
from validation import SNODAS, MountainHub, Elevation, utils as ut

In [None]:
start_date = datetime(2003,10,1)
end_date = datetime(2004,10,1)
ndays = end_date - start_date
ndays.days

In [None]:
lm_den = s_clm.output[‘scalarSWE’]/s_clm.output[‘scalarSnowDepth’]
jrdn_den = s_jrdn.output[‘scalarSWE’]/s_jrdn.output[‘scalarSnowDepth’]
thin_den = s_2layer.output[‘scalarSWE’]/s_2layer.output[‘scalarSnowDepth’]
thick_den = s_thick.output[‘scalarSWE’]/s_thick.output[‘scalarSnowDepth’]clm_den.plot(label=‘CLM layering’)
jrdn_den.plot(label=‘Jordan1991 layering’)
thin_den.plot(label=‘Thin-2 layer’)
thick_den.plot(label=‘Thick-2 layer’)
plt.legend()
plt.ylabel(‘Density (kg m^-3)’)
plt.xlim(xmin = datetime.datetime(2011,3,3),xmax = datetime.datetime(2011,7,12))
plt.savefig(“densityclose.png”, dpi = 100)

In [None]:
# Fetch data from SNODAS
for date in (start_date + timedelta(n) for n in range(ndays.days)):
    output_path = date.strftime('/home/spestana/data/SNODAS/SNODAS_%Y%m%d.nc')
    if not os.path.exists(output_path):
        print(output_path)
        try:
            snodas_ds = SNODAS.snodas_ds(date)
            ut.save_netcdf(snodas_ds, output_path)
        except Exception as err:
            print(f"Error: {err}")

now that it is downloaded, we can work with the data

In [None]:
def dask_start_cluster(
    workers,
    threads=1,
    ip_address=None,
    port=":8786",
    open_browser=False,
    verbose=True,
):
    """
    Starts a dask cluster. Can provide a custom IP or URL to view the progress dashboard.
    This may be necessary if working on a remote machine.
    """
    cluster = LocalCluster(
        n_workers=workers,
        threads_per_worker=threads,
        #silence_logs=logging.ERROR,
        dashboard_address=port,
    )

    client = Client(cluster)

    if ip_address:
        if ip_address[-1] == "/":
            ip_address = ip_address[:-1]  # remove trailing '/' in case it exists
        port = str(cluster.dashboard_link.split(":")[-1])
        url = ":".join([ip_address, port])
        if verbose:
            print("\n" + "Dask dashboard at:", url)
    else:
        if verbose:
            print("\n" + "Dask dashboard at:", cluster.dashboard_link)
        url = cluster.dashboard_link

    if port not in url:
        if verbose:
            print("Port", port, "already occupied")

    if verbose:
        print("Workers:", workers)
        print("Threads per worker:", threads, "\n")

    if open_browser:
        webbrowser.open(url, new=0, autoraise=True)

    return client

In [None]:
input_folder = '/home/spestana/data/SNODAS/'
zarr_output_path = '/home/spestana/data/SNODAS/SNODAS_test.zarr'

In [None]:
client = dask_start_cluster(
    workers=6,
    threads=2,
    ip_address='http://dshydro.ce.washington.edu',
    port=":8787",
    open_browser=False,
    verbose=True,
)

In [None]:
## Create a Dask cluster so we can watch the dask dashboard
## If this cell is not run, how many computer cores will be used?
#workers = 6
#ip_addres = 'http://dshydro.ce.washington.edu'
#port=':8787'
#threads = 2
#cluster = LocalCluster(n_workers=workers, threads_per_worker=threads, dashboard_address=port)
#client = Client(cluster)

In [None]:
# Because the files may not be in a chronological order, we sort them so that the timeseries data we are creating will be in a chronological order.

def get_start_date_from_SNODAS_filename(s):
    datetime_str = s.split('_')[-1].split('.')[0] # format is YYYYMMDD
    datetime_object = datetime.strptime(datetime_str, '%Y%m%d')
    return datetime_object

nc_files = sorted(
    glob.glob(os.path.join(input_folder, '*.nc')),
    key=get_start_date_from_SNODAS_filename
)

In [None]:
datetimes = [get_start_date_from_SNODAS_filename(s) for s in nc_files]
files = nc_files

for i,file in enumerate(files):
    print(f"Processing {i+1} of {len(files)}...")
    try:
        ds = xr.open_dataset(file, decode_coords="all")
        ds = ds.assign_coords({"time": datetimes[i]})
        ds = ds.expand_dims("time")
        ds = ds.reset_coords(drop=True)
        da = ds['Band1']
        new_file_name = file.replace(
            "/SNODAS/",
            "/SNODAS_withtime/",
        )
        da = da.rio.write_crs('EPSG:4326')
        da.to_netcdf(new_file_name)
    except Exception as err:
        print(f"Failed on {file}")
        print(f"Error: {err}")



In [None]:
nc_files = sorted(
    glob.glob(os.path.join('/home/spestana/data/SNODAS_withtime/', '*.nc')),
    key=get_start_date_from_SNODAS_filename
)

# Open all the raster files as a single dataset (combining them together)
# Why did we choose chunks = 500? 100MB?
# https://docs.xarray.dev/en/stable/user-guide/dask.html#optimization-tips
ds = xr.open_mfdataset(nc_files, chunks={'time': 500})



In [None]:
ds

In [None]:
# Rename Band1 as a more indicative name: swe
ds = ds.rename({'Band1': 'swe'})



In [None]:
ds.swe

In [None]:
# Dask's rechunk documentation: https://docs.dask.org/en/stable/generated/dask.array.rechunk.html

# 0:-1 specifies that we want the dataset to be chunked along the 0th dimension -- the time dimension, which means that each chunk will have all 40 thousand values in time dimension
# 1:'auto', 2:'auto' and balance=True specifies that dask can freely rechunk along the latitude and longitude dimensions to attain blocks that have a uniform size
ds['swe'].data.rechunk(
    {0:-1, 1:'auto', 2:'auto'}, 
    block_size_limit=1e8, 
    balance=True
)

# Assign the dimensions of a chunk to variables to use for encoding afterwards
t,y,x = ds['swe'].data.chunks[0][0], ds['swe'].data.chunks[1][0], ds['swe'].data.chunks[2][0]



In [None]:
ds

In [None]:
# given a shapefile, get the bounding box coords and subset dataset to that area
sf_path = Path('/home/spestana/git/Skagit/raw_data/gis/SkagitRiver_BasinBoundary.shp').expanduser()
sf = gpd.read_file(str(sf_path))
minx, miny, maxx, maxy = sf.geometry[0].bounds
ds = ds.sel(lat=slice(miny,maxy), lon=slice(minx,maxx))

In [None]:


# Create an output zarr file and write these chunks to disk
# if already exists, remove it here
shutil.rmtree(zarr_output_path, ignore_errors=False)



In [None]:
zarr_output_path

In [None]:
ds['swe'].encoding = {'chunks': (t, y, x)}



In [None]:
ds.to_zarr(zarr_output_path)

In [None]:
# Display 
source_group = zarr.open(zarr_output_path)
source_array = source_group['swe']
print(source_group.tree())
print(source_array.info)
del source_group
del source_array