# Pangeo demo: dinoSAR data

This is a simple demo for AGU2018, using InSAR processing output from dinoSAR (https://github.com/scottyhq/dinosar).

To run each code cell, use 'shift+enter'

**Warning!** you can modify this notebook, upload files, and save files listed on the left (right-click and you will see a download option). BUT!... it is an ephemeral demo. Work will be lost if you leave this idle for a bit. Everything shuts down automatically.

In [None]:
# Import python packages
import rasterio
from rasterio.mask import mask
import xarray as xr
import numpy as np
import hvplot.xarray
import hvplot.pandas
import holoviews as hv
import gcsfs
import intake
import pandas as pd
import geopandas as gpd
import os.path
import s3fs
from dask_kubernetes import KubeCluster
from dask.distributed import Client
import matplotlib.pyplot as plt
%matplotlib inline

## Launch a Kubernetes Cluster

We can use a kubernetes cluster to increase our computational resources. Note that our final geocoded dataset for this example is pretty small. So we don't actually need a large cluster, but we'll use it anyways to illustrate how it works!

10 workers are selected by default (each w/ customizable CPUs and RAM). When parallizeable computations are requested, you'll see the cluster activity on the right hand dashboards

In [None]:
cluster = KubeCluster(n_workers=4)
cluster

In [None]:
client = Client(cluster)

## List files on AWS S3

In [None]:
bucket = 'stac-uniongap/D115'

# This creates a virtual local file listing
fs = s3fs.S3FileSystem(anon=True)
pairs = fs.ls(bucket)

print('Number of images:', len(pairs))
print('First images:', pairs[:2])

In [None]:
# Each of these images has an associates public URL:
# We'll use pandas to make a sorted dataframe of all the images
def parse_name(s3Path, key='date1'):
    ''' grab project, bucket, date1, date2, format from file name, return dictionary'''
    pattern = '{project}/{orbit}/int-{date1:%Y%m%d}-{date2:%Y%m%d}/{cog}'
    parsed = intake.source.utils.reverse_format(pattern, s3Path)
    val = parsed[key]
    return val

def make_dataframe(images):
    ''' organize pandas dataframe by parsing filename'''
    df = pd.DataFrame(dict(s3=images))
    df = df.sort_values('s3').reset_index(drop=True)
    df['url'] = 'http://s3.amazonaws.com/' + df.s3.str[:] 
    df['date1'] = df.s3.apply(parse_name, args=('date1',))
    df['date2'] = df.s3.apply(parse_name, args=('date2',))
    df['dt'] = df.date1 - df.date2
    return df

In [None]:
image='unwrapped-phase-cog.tif'
images = [os.path.join(x,image) for x in pairs[1:]]

df = make_dataframe(images)
print('Total images:', len(df))
print('First date:', df.date2.iloc[0])
print('Last date:', df.date1.iloc[-1])
df.head()

## Read Cloud-optimized geotiffs (COGs)

In [None]:
# Rasterio uses the gdal vsicurl system to access files
# on a cloud server
env = rasterio.Env(GDAL_DISABLE_READDIR_ON_OPEN='EMPTY_DIR',
                  CPL_VSIL_CURL_USE_HEAD=False,
                  CPL_VSIL_CURL_ALLOWED_EXTENSIONS='TIF',
                 )

In [None]:
# Read the first file in the set of images into xarray DataArray (w/ dask)
# note this is very fast b/c only metadata is downloaded to local memory
# chunks are based on cloud-optimized geotiff internal tiling
xchunk = 512
ychunk = 512
with env:
    da = xr.open_rasterio(df.url[0], parse_coordinates=True, chunks={'band': 1, 'x': xchunk, 'y': ychunk})

## Create an xarray DataSet

In [None]:
# Since all these images are pre-aligned ('analysis ready')
# we get best performance loading w/o metadata & coordinate checking
def create_dataset(df, chunks={'band': 1, 'x': 512, 'y': 512}):
    # Note: this takes a minute b/c coordinate alignment is checked
    from ipywidgets import IntProgress
    from IPython.display import display
    probar = IntProgress(value=0, min=0, max=len(df), step=1, 
                         description='Loading:')
    display(probar)
    #print(rasterio.env.getenv())
    datasets = []
    # Create dataset to fill based on first image
    da = xr.open_rasterio(df.url[0], 
                          parse_coordinates=True, 
                          chunks=chunks) 
    probar.value += 1
    datasets.append(da.to_dataset(name='unw'))
    
    # Loop over remaining images to fill array
    for i,row in df[1:].iterrows():
        probar.value += 1
        url = row.url
        
        try:
            da = xr.open_rasterio(url, parse_coordinates=False, chunks=chunks)
        except:
            #print(f'Error loading {url}')
            pass
        datasets.append(da.to_dataset(name='unw'))
    
    ds = xr.concat(datasets, dim='band')
    ds.coords['band'] = np.arange(len(df))
    return ds

In [None]:
with env:
    DS = create_dataset(df)

In [None]:
DS

## Interactive visualization with holoviews

**NOTE:** you may need to resize this pane to see all the buttons (drag grey separator bar to the right)

* Once in an xarray DataSet, hvplot can easily display images interactively:
* Note column of buttons on upper right side of figure.
* In addition to buttons, there is a time slider for band selection
    * click slider button and use arrow keys for fine control
* Box zoom button updates displayed resolution on the fly
* Moving cursor over image gives coordinates and unwrapped phase value

In [None]:
img = DS.hvplot('x', 'y', groupby='band', dynamic=True, rasterize=True, 
                      width=700, height=500, cmap='magma')

limits = hv.streams.RangeXY(source=img)

img

# Save current view / subset

We can save a local copy of the current image with a function.

* select band=1 in interactive image browser above
    * zoom into volcano deformation zone in south (bright area)
        * run 2 cells below to save the local image subset
            * a geotiff will appear in the file browser on the left
                * right click the file and select 'download to get it on your laptop'

In [None]:
def get_src(img):
    ''' get current image displayed '''
    image_no = img.callback.args
    image_url = df.url.iloc[image_no]

    return image_url


def get_window(img,src):
    ''' get current rasterio window from holoviews plot '''
    limits = img.streams[1]
    if limits.x_range == None:
        bounds = src.bounds
    else:
        bounds = (limits.x_range[0], limits.y_range[0], limits.x_range[1], limits.y_range[1])
    uly,ulx = src.index(bounds[0], bounds[3])
    lry,lrx = src.index(bounds[2], bounds[1])

    width = lrx - ulx
    height = lry - uly

    return rasterio.windows.Window(ulx, uly, width, height)

def save_current_view(img, name='local-image.tif'):
    from ipywidgets import IntProgress
    from IPython.display import display
    probar = IntProgress(value=0, min=0, max=4, step=1, 
                         description='Saving:')
    display(probar)     
    
    with env:
        image_url = get_src(img)
        print(f'Saving {image_url}...')
        with rasterio.open(image_url) as src:
            probar.value +=1
            profile = src.profile.copy()
            window = get_window(img, src)
            print(window)
            win_transform = src.window_transform(window)
            probar.value +=1
            data = src.read(1, window=window)
        
        profile.update({
                'dtype': 'float32',
                'height': data.shape[0],
                'width': data.shape[1],
                'blockxsize': 128,
                'blockysize': 128,
                'transform': win_transform})  
        probar.value += 1
        localname = 'subset-' + os.path.basename(src.name)
        with rasterio.open(localname, 'w', **profile) as dst:
            dst.write_band(1, data) 
    probar.value +=1
    
    return localname

In [None]:
localname = save_current_view(img)

In [None]:
# Load and plot the saved subset to verify it's the same
# Since this is only a single file, we won't load with dask
print(localname)
with env:
    with rasterio.open(localname) as src:
        print(src.profile)
        da = xr.open_rasterio(src.name)
da.hvplot('x', 'y', groupby='band', dynamic=True, rasterize=True, 
          width=700, height=500, cmap='magma')

# Parallel computations

With xarray DataSets, we can do parallel computations on the KubeCluster, using dask behind the scenes. Here is a simple example getting the mean phase value for each interferogram

In [None]:
def get_xarray_selection(img, band=False):
    ''' get selection dictionary from hvplot'''
    selection = {}
    selection['x'] = slice(*limits.x_range)
    selection['y'] = slice(*limits.y_range[::-1])
    if band:
        selection['band'] = [img.callback.args[0],]
    return selection

In [None]:
# Reset chunks after selection for better performance
ds = DS.sel(get_xarray_selection(img))
ds = ds.chunk(dict(band=249,x=721,y=720)) #dimensions (249, 720, 721)
ds

In [None]:
# Basic Stack
# NOTE: haven't normalized to common reference point, this is just for illustration purposes
stack = ds.mean(dim='band')
stack

In [None]:
# keep in distributed cluster memory
ds_stack = stack.persist() 

In [None]:
ds_stack.unw.plot.imshow(center=False, cmap='magma')

In [None]:
# Get all values of pixel at a specfic lon,lat
# compute pulls from distributed memory to local RAM
xcen = -120.46
ycen = 46.5275
ts = ds.sel(x=xcen, y=ycen, method='nearest').compute()
ts

In [None]:
s = ts.unw.to_series()

In [None]:
# Plot this
# Holoviews is also great for interative 2D plots

#line = s.hvplot(width=700, height=300, legend=False)
points = s.hvplot.scatter(width=700, height=300, legend=False)
label = f'Unwrapped LOS Phase [rad]: easting={xcen:g} , northing={ycen:g}'

#(line * points).relabel(label)
points.relabel(label)

In [None]:
# Data from plot can easily be saved to a CSV
#points.data.to_csv()
#or
s.to_csv('time-series.csv')

In [None]:
# Instead of band index, use time index to illustrate group-by. Use date1
index = pd.DatetimeIndex(df.date1)
DS.band.values=index
DS = DS.rename(dict(band='time'))

In [None]:
pRef = (-120.46225,  46.53021)
xref,yref = pRef
#refMean = DS.sel(dict(x=xref,y=yref), method='nearest').compute()

pad = 0.001
reference = dict(x=slice(xref-pad, xref+pad),
                 y=slice(yref+pad, yref-pad))
refMean = DS.sel(**reference).mean(dim=['x','y']).compute()

refMean

In [None]:
# Subregion and convert to mm LOS displacement
aoi = dict(x=slice(-120.48, -120.46), y=slice(46.54, 46.52))
DSnorm = (DS - refMean).sel(aoi) * 5546576/12.5663706
DSnorm

In [None]:
annual = DSnorm.groupby('time.year').mean(dim='time')
fg = annual.unw.plot(x='x', y='y', col='year', col_wrap=2, center=False,cmap='plasma')#,vmin=-40,vmax=0)
plt.plot(xref, yref, 'wo')
_ = plt.text(xref, yref, 'Ref')
#plt.title('Annual Stacks [mm LOS]')