# Practice with Dask Creating a Great-Lakes Xarray via DASK
Using Xarray, Rasterio and Dask

1. 10 degree tile


# Notes

Compressed/tiled COGS would likely be faster for these NDVI
NDVI are the most read and the tall pole in input times

this is dwerived from the smart notebook from Rich Signell

## Ref

https://nbviewer.jupyter.org/gist/rsignell-usgs/4df2b22c2354d07ef8bc2d7fff9d655b



In [None]:
#! pip install --user hvplot
#! pip install --user dask
#!python -m pip install --user "dask[distributed]" --upgrade

In [None]:
import xarray as xr
import hvplot.xarray
import dask
from dask.distributed import Client, progress
#from dask_kubernetes import KubeCluster
import os
import fsspec

Use `fsspec` to explore an S3 "requester pays" bucket like a filesystem.  We don't actually pay anything here since the USGS Pangeo and the data being read here live in the same AWS region (us-west-2)

In [None]:
fs = fsspec.filesystem('s3', anon=False, requester_pays=True)

In [None]:
file_objects = fs.ls('dev-et-data/in/')
file_objects

In [None]:
file_objects = fs.ls('dev-et-data/in/USA_data')
file_objects

In [None]:
def get_NDVI_median_tifs(bucket_prefix_path):
    files = fs.ls(bucket_prefix_path)
    return [f for f in files if f.endswith('tif')]

Create a Dask cluster.  We can run KubeCluster on pangeo, but for rasterio to be able to read from "requester pays" buckets we need to set an environment variable and pass it to the workers also. 

In [None]:
os.environ["AWS_REQUEST_PAYER"] = "requester" 
##cluster = KubeCluster(n_workers=8, env={'AWS_REQUEST_PAYER': 'requester'})
#client = Client(cluster)

In [None]:
#cluster.close()
# local client
client = Client()

In [None]:
client

In [None]:
(512*512*4)*(10)/1e6

Create a delayed function to return an xarray dataarray from a tif filename

In [None]:
@dask.delayed
def tif_to_da(tif):
    return xr.open_rasterio('s3://'+tif, chunks={'band':1, 'x':512, 'y':512})

Create data array for first 30 yeardays of a given year (just reading metadata)

In [None]:
%%time
#tifs = get_tifs(2001)
b_path = 'dev-et-data/in/USA_data/NDVI_med2003_2017'
tifs = get_NDVI_median_tifs(b_path)
#tifs
lazy_da =[tif_to_da(tif) for tif in tifs[:180]]
dalist = dask.compute(*lazy_da)
da = xr.concat(dalist, dim='band')
da = da.rename({'band':'yearday'})

In [None]:
#tifs

In [None]:
da

Assign values to the coordinates

In [None]:
da = da.assign_coords(yearday=range(0,180))

In [None]:
da

In [None]:
ds = da.to_dataset(name='ndvi')

In [None]:
ds = ds.isel(x=slice(20480,20480+1024), y=slice(1024,2048))

In [None]:
ds

In [None]:
%%time
ds.ndvi[:,0,0].load().hvplot(grid=True)

In [None]:
ds = ds.chunk(chunks={'yearday':10, 'x':512, 'y':512})

In [None]:
ds.ndvi.encoding

In [None]:
ds.ndvi.encoding = {'chunks': (10,512,512)}

In [None]:
print(ds.ndvi[0,0,0].load().values)

In [None]:
ds.ndvi[10,:,:].load().hvplot(grid=True)

In [None]:
ary = ds.ndvi[10,:,:].load().values

ary.min()
ary.max()

In [None]:
ary.shape

In [None]:
import matplotlib.pyplot as pyplot
from rasterio.plot import show

def my_plot(array):
    cmaps = ['Accent', 'Accent_r', 'Blues', 'Blues_r', 'BrBG', 'BrBG_r', 'BuGn', 'BuGn_r', 'BuPu', 'BuPu_r', 'CMRmap', 'CMRmap_r', 'Dark2', 'Dark2_r', 'GnBu', 'GnBu_r', 'Greens', 'Greens_r', 'Greys', 'Greys_r', 'OrRd', 'OrRd_r', 'Oranges', 'Oranges_r', 'PRGn', 'PRGn_r', 'Paired', 'Paired_r', 'Pastel1', 'Pastel1_r', 'Pastel2', 'Pastel2_r', 'PiYG', 'PiYG_r', 'PuBu', 'PuBuGn', 'PuBuGn_r', 'PuBu_r', 'PuOr', 'PuOr_r', 'PuRd', 'PuRd_r', 'Purples', 'Purples_r', 'RdBu', 'RdBu_r', 'RdGy', 'RdGy_r', 'RdPu', 'RdPu_r', 'RdYlBu', 'RdYlBu_r', 'RdYlGn', 'RdYlGn_r', 'Reds', 'Reds_r', 'Set1', 'Set1_r', 'Set2', 'Set2_r', 'Set3', 'Set3_r', 'Spectral', 'Spectral_r', 'Wistia', 'Wistia_r', 'YlGn', 'YlGnBu', 'YlGnBu_r', 'YlGn_r']
    axs=()
    fig, axs = pyplot.subplots(1,8, figsize=(21,21))
    for i in range(0,8):
        show(array, ax=axs[i], cmap=cmaps[i], title=cmaps[i])
    pyplot.show()

In [None]:
my_plot(ary)

### Need to study zarr - Tony
### Bottom Line: for this test and this toolset, it's 20 times faster reading the rechunked zarr compared to original COGS!

## Now Window the great Lakes using the same approaches


In [None]:
filepath = 'dev-et-data/in/USA_data/NDVI_med2003_2017'
file_objects = fs.ls(filepath)
#file_objects
tifs = [f for f in file_objects if f.endswith('tif')]

tifs[0]

In [None]:
from pangeoLib.aws_authenticate import aws_authenticate
aws_authenticate()

In [None]:
import rasterio as rio
from rasterio.windows import from_bounds

filepath = '/vsis3/' + tifs[0]

left = -90
right = -80
top = 50
bottom = 40
with rio.open(filepath) as src:
        my_window = from_bounds(left, bottom, right, top, src.transform)
        print(my_window)

In [None]:
col_off = int(round(my_window.col_off))
width = int(round(my_window.width))

In [None]:
col_off

In [None]:
col_slice=(col_off, col_off + width)

In [None]:
col_slice

In [None]:
row_off = int(round(my_window.row_off))
height = int(round(my_window.height))

In [None]:
row_slice = (row_off, row_off + height)

In [None]:
row_slice

In [None]:
ds = da.to_dataset(name='ndvi')

In [None]:
ds

In [None]:
ds = ds.isel(x=slice(col_off, col_off + width), y=slice(row_off, row_off + height))

In [None]:
ds

In [None]:
ds = ds.chunk(chunks={'yearday':10, 'x':512, 'y':512})

In [None]:
ds.ndvi.encoding = {'chunks': (10,512,512)}

In [None]:
#ds.ndvi[10,:,:].load().hvplot(grid=True)

In [None]:
#ary = ds.ndvi[10,:,:].load().values
ary = ds.ndvi[10,:,:].values

In [None]:
ary.shape

In [None]:
ary.min()

In [None]:
import numpy as np

In [None]:
ary[(ary < 0)] = np.nan

In [None]:
my_plot(ary)

In [None]:
import rasterio as rio
from rasterio.windows import from_bounds

filepath = '/vsis3/' + tifs[0]

left = -88
right = -86
top = 44.9
bottom = 42
with rio.open(filepath) as src:
        my_window = from_bounds(left, bottom, right, top, src.transform)
        print(my_window)

In [None]:
col_off = int(round(my_window.col_off))
width = int(round(my_window.width))

row_off = int(round(my_window.row_off))
height = int(round(my_window.height))

ds = da.to_dataset(name='ndvi')
ds = ds.isel(x=slice(col_off, col_off + width), y=slice(row_off, row_off + height))


In [None]:
ds = ds.chunk(chunks={'yearday':10, 'x':512, 'y':512})
ds.ndvi.encoding = {'chunks': (10,512,512)}
ary = ds.ndvi[10,:,:].load().values
ary.shape

In [None]:
ds = da.to_dataset(name='ndvi') # reload the big scene

In [None]:
#ary.hvplot() #nope

In [None]:
#my_obj = ds.ndvi[10,:,:].load()

my_obj = ds.ndvi[10,:,:]
type(my_obj)

In [None]:
#my_obj.hvplot(grid=True, invert=True)
my_obj.hvplot(grid=True, )

In [None]:
my_obj

In [None]:
#help (my_obj.hvplot)

In [None]:
ds_masked = ds.where(ds['ndvi']>0.0) 

In [None]:
#my_objm = ds_masked.ndvi[10,:,:].load()
my_objm = ds_masked.ndvi[10,:,:]


type(my_objm)

In [None]:
my_objm.hvplot(grid=True, invert=True, cmap='hot')

In [None]:
my_objm

In [None]:
my_objm.hvplot(grid=True, invert=True, width=700, height=700, cmap='hot')

In [None]:
my_objm

In [None]:
!date