## Demo notebook for accessing Daymet data on Azure

The Daymet dataset contains daily minimum temperature, maximum temperature, precipitation, shortwave radiation, vapor pressure, snow water equivalent, and day length at 1km resolution for North America. The dataset covers the period from January 1, 1980 to December 31, 2019.  

The Daymet dataset is maintained at [daac.ornl.gov/cgi-bin/dsviewer.pl?ds_id=1328](https://daac.ornl.gov/cgi-bin/dsviewer.pl?ds_id=1328) and mirrored on Azure Open Datasets at [azure.microsoft.com/services/open-datasets/catalog/daymet](https://azure.microsoft.com/services/open-datasets/catalog/daymet).

In [None]:
# Standard or standard-ish imports
import os
import tempfile
import shutil
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import xarray as xr
#import cartopy.crs as ccrs
import urllib.request

# Less standard, but still pip- or conda-installable
from azure.storage.blob import ContainerClient

container_name = 'daymetv3-raw'
daymet_azure_storage_url = 'https://daymet.blob.core.windows.net/'

daymet_container_client = ContainerClient(account_url=daymet_azure_storage_url, 
                                         container_name=container_name,
                                         credential=None)

# Temporary folder for data we need during execution of this notebook (we'll clean up
# at the end, we promise)
temp_dir = os.path.join('/home/cspadmin','daymet')
os.makedirs(temp_dir,exist_ok=True)
os.listdir(temp_dir)

### Support functions

In [None]:
def download_url(url, destination_filename=None, progress_updater=None, force_download=False):
    """
    Download a URL to a temporary file
    """
    
    # This is not intended to guarantee uniqueness, we just know it happens to guarantee
    # uniqueness for this application.
    if destination_filename is None:
        url_as_filename = url.replace('://', '_').replace('.', '_').replace('/', '_')
        destination_filename = \
            os.path.join(temp_dir,url_as_filename)
    if (not force_download) and (os.path.isfile(destination_filename)):
        print('Bypassing download of already-downloaded file {}'.format(os.path.basename(url)))
        return destination_filename
    print('Downloading file {}'.format(os.path.basename(url)),end='')
    urllib.request.urlretrieve(url, destination_filename, progress_updater)  
    assert(os.path.isfile(destination_filename))
    nBytes = os.path.getsize(destination_filename)
    print('...done, {} bytes.'.format(nBytes))
    return destination_filename

### Download a specific file from Azure blob storage
This code shows how to download a specific file from Azure blob storage into the current directory.  It uses the example file daymet_v3_tmax_1980_hawaii.nc4, but you can change this as described below.  

The following types of data are available: minimum temperature (tmin), maximum temperature (tmax), precipitation (prcp), shortwave radiation (srad), vapor pressure (vp), snow water equivalent (swe), and day length (dayl).  


In [None]:
def make_filename(my_variable, year, location):
    destination_file = my_variable + '_' + str(year) + '_' + location
    granule_name = 'daymet_v3_' + destination_file + '.nc4'
    url = 'https://daymet.blob.core.windows.net/daymetv3-raw/' + granule_name
    return download_url(url, destination_filename=os.path.join(temp_dir, destination_file + '.nc'))

### Explore the NetCDF metadata

In [None]:
class Region:
    def __init__(self, name, ds, df, var):
        self.name = name
        self.df = df
        self.ds = ds
        self.var = var
        self.calculate_reduction2()
        self.avg_sum()
        #self.calculate_std()
        #self.write_daymet()
        #self.get_points()
        #self.concat()
        #self.fix_df()
        
        
    def setup_original_df(self):
        self.xy = [data_crs.transform_point(x, y, src_crs=ccrs.PlateCarree()) for x, y in zip(self.df['LON'], self.df['LAT'])]
        self.df['x_exact'], self.df['y_exact'] = list(zip(*self.xy))
        self.df.reset_index(inplace=True)
        
    def setup_unique_df(self):
        self.unique = self.df[['newlat', 'newlon', 'latlon_id']].drop_duplicates()
        self.xy = [data_crs.transform_point(x, y, src_crs=ccrs.PlateCarree()) for x, y in zip(self.unique['newlon'], self.unique['newlat'])]
        self.xy_round = [(int(np.around(x/250, decimals=0)*250), int(np.around(y/250, decimals=0)*250)) for x, y in self.xy]
        self.unique['x_exact'], self.unique['y_exact'] = list(zip(*self.xy_round))
        
    def get_points(self):
        self.points = [Point(self, self.df['y_exact'].iloc[row], self.df['x_exact'].iloc[row]) for row in range(len(self.df))]
               
    def fix_df(self):
        self.df['newlat'] = [r.point.coords['lat'].values for r in self.points]
        self.df['newlon'] = [r.point.coords['lon'].values for r in self.points]
        self.df['latlon_id'] = make_identifier(self.df[['newlat', 'newlon']])
        self.df.drop(['x_exact', 'y_exact'], axis=1, inplace=True)
        self.df.to_csv(self.name + '_nearest.csv', index=False)
    
    def concat(self):
        self.set = xr.concat([r.point for r in self.points], dim="locs")
        
    def find_original_xy_indices(self):
        self.unique['x_ind'] = [np.where(self.ds.indexes['x'] == num)[0][0] for num in self.unique['x_exact']]
        self.unique['y_ind'] = [np.where(self.ds.indexes['y'] == num)[0][0] for num in self.unique['y_exact']]
        
    def calculate_reduction2(self):
        self.time_value_gen = (self.ds[self.var].isel(time=my_time).values for my_time in range(365))
        self.reduction = next(self.time_value_gen)
        for i in range(1,365):
            self.reduction += next(self.time_value_gen)
        self.reduction /= 365.
        
    def avg_sum(self):
        self.avg = [self.reduction[y,x] for x, y in zip(self.df['x_ind'], self.df['y_ind'])]
        
    def calculate_reduction(self):
        self.reduction = self.ds.groupby('time.year').sum(dim='time').mean(dim='year')
        self.avg_sum = np.empty(len(self.unique))
        for idx, itr in enumerate(range(int(np.floor(len(self.unique)/1000)))):
            self.avg_sum[itr*1000:(itr+1)*1000] = np.diagonal(self.reduction.sel(y=self.unique['y_exact'][itr*1000:(itr+1)*1000], x=self.unique['x_exact'][itr*1000:(itr+1)*1000])['prcp'].values)
            print(idx)
        self.unique['prcp_avg'] = self.avg_sum
        
    def calculate_std(self):
        self.std = self.ds.groupby('time.year').std(dim='time').mean(dim='year')
        self.avg_std = np.empty(len(self.unique))
        for idx, itr in enumerate(range(int(np.floor(len(self.unique)/1000)))):
            self.avg_std[itr*1000:(itr+1)*1000] = np.diagonal(self.std.sel(y=self.unique['y_exact'][itr*1000:(itr+1)*1000], x=self.unique['x_exact'][itr*1000:(itr+1)*1000])['prcp'].values)
            print(idx)
        self.unique['prcp_std'] = self.avg_std
        
    def write_daymet(self):
        self.df.to_csv(self.name + '_daymet.csv', index=False)


In [None]:
variable = "tmax"
#for region in ["hawaii", "puertorico", "na"]:
for region in ["hawaii"]:
    print(region)
#    df_region = pd.read_csv(region + '_daymet.csv')
    my_files = [make_filename(variable, 1980, region)]
#    ds_region = xr.open_dataset(my_basefile)
#    base_region = Region(region, ds_region, df_region, variable)
#    region_var = base_region.avg
    for i in range(1981,2011):
        print(i)
        my_files.append(make_filename(variable, i, region))
 #       my_region = xr.open_dataset(my_file)
 #       ds_region_new = xr.concat([ds_region, my_region], dim='time')
 #       del ds_region
 #       del my_region
 #       ds_region = ds_region_new
 #       del ds_region_new
    ds_region = xr.open_mfdataset(my_files[:-1])
    ds_region.drop_vars('lambert_conformal_conic').to_zarr('/home/datablob/daymet/hi-zarr-full')
#        region_var += np.array(my_region.avg)
#        os.remove(my_file)
#        del my_region
#    region_var /= 31.
#    base_region.df[variable + '_avg'] = region_var
#    del region_var
#    base_region.write_daymet()
#    os.remove(my_basefile)
#    del base_region

In [None]:
my_zarr = xr.open_dataset(my_files[-1])

In [None]:
my_zarr.drop_vars('lambert_conformal_conic').to_zarr('/home/datablob/daymet/hi-zarr-full', append_dim='time')

In [None]:
my_zarr3 = xr.open_zarr('/home/datablob/daymet/hi-zarr-full')

In [None]:
my_zarr3['tmax'].groupby('time.year').mean(dim='time').isel(year=9).isel(x=23, y=24).values

### Cleanup

In [None]:
df = pd.read_csv('fia_no_pltcn.csv')
df_hi = df.loc[df['LAT'] < 25].loc[df['LON'] < -100]
df_pr = df.loc[df['LAT'] < 20].loc[df['LON'] > -70]
df_na = df[~df.isin(df_hi)][~df.isin(df_pr)].dropna()

In [None]:
data_crs = ccrs.LambertConformal(central_longitude=-100, central_latitude=42.5, standard_parallels=(25, 60))

In [None]:
shutil.rmtree()

In [None]:
class Point:
    def __init__(self, parent, y, x):
        self.parent = parent
        self.y = y
        self.x = x
        self.index = {}
        self.original_point = self.parent.ds.sel(y=self.y, method='nearest').sel(x=self.x, method='nearest')
        self.find_original_xy_indices()
        self.need_recalculate()
        self.get_new_point()
        
    def find_original_xy_indices(self):
        self.index['x'] = np.where(self.parent.ds.indexes['x'] == self.original_point.coords['x'].values)[0][0]
        self.index['y'] = np.where(self.parent.ds.indexes['y'] == self.original_point.coords['y'].values)[0][0]
        
    def need_recalculate(self):
        if np.isnan(self.original_point['prcp'].isel(time=0)):
            self.find_nonnan()
        else:
            self.indices = self.index['x'], self.index['y']
            
    def find_nonnan(self):
        my_x = self.index['x']
        my_y = self.index['y']
        for j in [my_y, my_y-1, my_y+1, my_y-2, my_y+2, my_y-3, my_y+3]:
            for i in [my_x, my_x-1, my_x+1, my_x-2, my_x+2, my_x-3, my_x+3]:
                if i==my_x and j==my_y:
                    continue
                if i>=self.parent.ds.indexes['x'].size or i<=-1:
                    continue
                if j>=self.parent.ds.indexes['y'].size or j<=-1:
                    continue
                if not np.isnan(self.parent.ds['prcp'].isel(time=0).isel(x=i).isel(y=j)):
                    self.indices = i, j
        self.indices = -1, -1
        
    def get_new_point(self):
        self.point = self.parent.ds.isel(x=self.indices[0]).isel(y=self.indices[1])

        

In [None]:
def make_identifier(df):
    str_id = df.apply(lambda x: '_'.join(map(str, x)), axis=1)
    return pd.factorize(str_id)[0]

In [None]:
import sys
!conda install -y -c conda-forge h5netcdf xarray cartopy
!{sys.executable} -m pip install azure.storage.blob