In [None]:
import os
root = os.path.abspath(os.path.join(os.getcwd(),'..'))

# Demo:
- Create a spatial-semantic graph for a geography and features of interest
- Reduce an indicator-of-interest for features (e.g. NDVI, CHIRPS precip) and ingest to graph
- Query the graph as in an API route

## 1. Create a spatial-semantic graph for a geography and features of interest

In [None]:
import rasterio
from rasterio import features
from rasterio.windows import Window
from rasterio.windows import transform as windows_transform
import numpy as np
import matplotlib.pyplot as plt
from scipy.ndimage import gaussian_filter, label
from itertools import product
import geopandas as gpd
from geojson import Feature
from area import area
from shapely import geometry

### 1a. Collect our geographic features of interest

In [None]:
admin_1 = gpd.read_file(os.path.join(root,'data','admin_1.geojson')).set_index('iso_3166_2')

In [None]:
# get a landcover raster for our geography of interest 
# Gaza, MOZ land cover reduced from Google's Dynamic World
im = rasterio.open(os.path.join(root,'data','moz_gaza.tif'))
lc_data = np.squeeze(im.read())

In [None]:
plt.imshow(lc_data[10000:15000, 10000:15000], cmap='tab10')

In [None]:
# get ag areas
lc_ag_mask = (lc_data==4).astype(np.float32)

In [None]:
# convolve a gaussian kernel to get large ag areas
lc_ag_mask = gaussian_filter(lc_ag_mask, 15)

In [None]:
# ... big raster, let's vectorise in chunks.
chunk_size=5000
row_chunks = [slice(ii*chunk_size,(ii+1)*chunk_size,None) for ii in range(im.shape[0]//chunk_size+1)]
col_chunks = [slice(ii*chunk_size,(ii+1)*chunk_size,None) for ii in range(im.shape[1]//chunk_size+1)]
all_windows = list(product(row_chunks,col_chunks))

In [None]:
# vectorise in chunks
ag_areas = []
for row_chunk, col_chunk in all_windows:
    win = Window.from_slices(row_chunk, col_chunk)
    win_trans = windows_transform(win, im.transform)
    
    shape_gen = features.shapes(
        source=(lc_data[row_chunk, col_chunk]==4).astype(np.uint8), 
        mask=lc_ag_mask[row_chunk, col_chunk]>0.8, 
        connectivity=8, 
        transform=win_trans
    )
    
    ag_areas += [Feature(geometry=ft[0]) for ft in shape_gen]

In [None]:
# cast to geodataframe
gdf = gpd.GeoDataFrame.from_features(ag_areas)

In [None]:
# buffer back a bit to undo the gaussian filter then dissolve to intersect geometries
gdf = gpd.GeoDataFrame(geometry=gdf.buffer(0.025)).dissolve().explode(index_parts=False)

In [None]:
# apply an area filter
gdf['area'] = gdf['geometry'].apply(lambda geom: area(geometry.mapping(geom)))

In [None]:
# and set an index
gdf['idx'] = range(len(gdf))

In [None]:
gdf = gdf.set_index('idx')

In [None]:
fig, ax = plt.subplots(1,1,figsize=(8,8))
admin_1.loc[(admin_1['iso_a2']=='MZ')&(admin_1['name']=='Gaza')].boundary.plot(ax=ax, color='g')
gdf.loc[gdf['area']>1e8].plot(ax=ax, color='b')

In [None]:
# save for some quick loading later
# gdf.loc[gdf['area']>1e8].to_file(os.path.join(root,'data','ag_areas_gaza.geojson'),driver='GeoJSON')
# gdf = gpd.read_file(os.path.join(root,'data','ag_areas_gaza.geojson'))

### 1b. Write our geographic features to a graph db

In [None]:
from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection
from gremlin_python.structure.graph import Graph
from gremlin_python.process.anonymous_traversal import traversal
from gremlin_python.process.traversal import T
import json

In [None]:
# fix to send queries
import nest_asyncio
nest_asyncio.apply()

In [None]:
remoteConn = DriverRemoteConnection('wss://lk-prototype.cluster-cstqmhnp1nqd.eu-central-1.neptune.amazonaws.com:8182/gremlin','g')

In [None]:
graph = Graph()

In [None]:
g = graph.traversal().withRemote(remoteConn)

In [None]:
# clear the graph for the demo
g.V().drop().iterate()

In [None]:
id = T.id

In [None]:
def create_vertex(vid, vtype, properties):

    task = g.addV(vtype).property(id, vid)
    
    for key, value in properties.items():
        task = task.property(key,value)
        
    task.next()
    
    return 1

In [None]:
#g.V('0').addE('foo').to(g.V('MZ-G').next()).next()

In [None]:
def create_edge(vid_1, vid_2, e_type, properties):
    
    task = g.V(vid_1).addE(e_type).to(g.V(vid_2).next())
    
    for key, value in properties.items():
        task = task.property(key,value)
        
    task.next()
    
    return 1

In [None]:
for idx, row in admin_1.loc[(admin_1['iso_a2']=='MZ')&(admin_1['name']=='Gaza')].iterrows():
    
    props = {kk:vv for kk,vv in row.to_dict().items() if kk not in ['geometry']}
    props['geometry'] = json.dumps(geometry.mapping(row['geometry']))
    
    create_vertex(idx, 'admin-1', props)

In [None]:
gaza_idx = 'MZ-G'

In [None]:
for idx, row in gdf.loc[gdf['area']>1e8].iterrows():
    props = {kk:vv for kk,vv in row.to_dict().items() if kk not in ['geometry']}
    props['geometry'] = json.dumps(geometry.mapping(row['geometry']))
    
    create_vertex(str(idx), 'agriculture-area', props)
    
    create_edge(str(idx), gaza_idx, 'isIn', {})

In [None]:
# todo: visualise GRaph

## 2. Reduce an ndvi for each feature

In [None]:
import pystac_client
import stackstac
from rasterio.enums import Resampling
from dask_cloudprovider.aws import FargateCluster, ECSCluster
from rasterio.errors import RasterioIOError
from distributed import Client
from pyproj import CRS, Transformer
from shapely.ops import transform

### 2a. Set up our dask cluster for highly parallelised operation on AWS CoG

In [None]:
# call up a cluster on AWS Fargate
cluster = FargateCluster(
    #cluster_arn="arn:aws:ecs:eu-central-1:413730540186:cluster/LK-dask-test-6",
    #execution_role_arn="arn:aws:iam::413730540186:role/LK-dask-test-6-execution-role",
    #task_role_arn="arn:aws:iam::413730540186:role/LK-dask-test-6-task-role",
    #security_groups=["sg-0f2acfe8a150834ed"],
    cluster_name_template='LK-dask-test-7', # <- if creating new cluster
    region_name="eu-central-1",
    image="daskdev/dask:latest",
    environment={'EXTRA_PIP_PACKAGES':'stackstac'},
    scheduler_cpu=1024*4,
    scheduler_mem=2048*12,
    worker_cpu=2048,
    worker_mem=2048*4,
    #skip_cleanup = True,
    n_workers=20,
    fargate_use_private_ip=False,
    scheduler_timeout="60 minutes"
)

In [None]:
# ... or call up an existing cluster
#cluster = ECSCluster(
#    cluster_arn='arn:aws:ecs:eu-central-1:413730540186:cluster/LK-dask-test-3',
#    execution_role_arn="arn:aws:iam::413730540186:role/LK-dask-test-3-execution-role",
#    task_role_arn="arn:aws:iam::413730540186:role/LK-dask-test-3-task-role",
 #   security_groups = ["sg-06788c6c48e842ea4"],
 #   image="daskdev/dask:latest",
#    environment={'EXTRA_PIP_PACKAGES':'stackstac'},
#    n_workers=4,
#)

In [None]:
# keep an eye on our workers
cluster.dashboard_link

In [None]:
# Use it as our Dask cluster
client = Client(cluster)

### 2b. Query the STAC catalog to get COG for our AOI

In [None]:
URL = "https://earth-search.aws.element84.com/v0"
catalog = pystac_client.Client.open(URL)

In [None]:
gaza_geom = admin_1.loc['MZ-G','geometry']

In [None]:
items = catalog.search(
    intersects=geometry.mapping(geometry.box(*gaza_geom.bounds)),
    collections=["sentinel-s2-l2a-cogs"],
    datetime="2019-01-01/2019-07-01",
).get_all_items()
len(items)

### 2c. For each geom reduce our stackstac as ECS-dask-backended XArray

In [None]:
cluster.close()

In [None]:
dfs = []

for idx, row in gdf.loc[gdf['area']>1e8].iloc[6:].iterrows():
    
    print (idx)
    client.wait_for_workers(20)
    
    geom = row['geometry']
    
    xr_stack = stackstac.stack(
        items,
        resolution=10,
        bounds_latlon=geom.bounds,
        resampling=Resampling.bilinear,
        errors_as_nodata=(
            IOError(r"HTTP response code: \d\d\d"),
            RasterioIOError(".*"),
        ),
    )
    
    # swap geom to utm
    crs_wgs84 = CRS('EPSG:4326')
    crs_utm = CRS(xr_stack.crs)
    wgs2utm = Transformer.from_crs(crs_wgs84, crs_utm, always_xy=True).transform
    utm_geom = transform(wgs2utm, geom)
    
    out_shp = (xr_stack.coords['y'].shape[0], xr_stack.coords['x'].shape[0])
    
    # burn in mask
    mask_arr = features.rasterize(
        [utm_geom], 
        out_shape=out_shp, 
        fill=0, 
        transform=xr_stack.transform, 
        all_touched=False, 
        default_value=1, 
        dtype=None
    )
    
    # build computation graph for NDVI: (NIR-red) / (NIR+RED)
    xr_stack.coords['mask'] = (('y', 'x'), mask_arr)
    
    xr_stack = xr_stack.where(xr_stack.mask==1)
    
    xr_ndvi = (xr_stack.sel({'band':'B08'}) - xr_stack.sel({'band':'B04'})) / (xr_stack.sel({'band':'B08'}) + xr_stack.sel({'band':'B04'})) 
    xr_ndvi = xr_ndvi.mean(dim=['x','y'])
    
    # call the compute with the dask backend
    result = xr_ndvi.compute()
    
    # cast to pandas
    df = result.to_pandas()
    df.index = df.index.date
    
    dfs.append(df)

In [None]:
dfs

In [None]:
import pickle

In [None]:
pickle.dump(dfs, open(os.path.join(root,'data','dfs_pickle_6:.pkl'),'wb'))

### 2d. Populate graph with revisits

## 3. Demo graph queries

##### archive

In [None]:
cluster = ECSCluster(
    cluster_arn="arn:aws:ecs:eu-central-1:413730540186:cluster/LK-dask-test-2",
    image="daskdev/dask:latest",
    environment={'EXTRA_PIP_PACKAGES':'stackstac'},
    scheduler_cpu=1024,
    scheduler_mem=4096,
    worker_cpu=2048,
    worker_mem=8192,
    n_workers=4,
    #fargate_use_private_ip=False,
    #scheduler_timeout="15 minutes"
)

In [None]:
cluster = FargateCluster(region_name="eu-central-1")

In [None]:
xr_stack = stackstac.stack(
    items,
    resolution=10,
    bounds_latlon=(35.15, -18.32, 35.17, -18.34),
    resampling=Resampling.bilinear
)

In [None]:
result

In [None]:
cluster.close()

In [None]:
cluster = FargateCluster(
    cluster_name_template='LK-dask-test-2',
    region_name="eu-central-1",
    image="daskdev/dask:latest",
    environment={'EXTRA_PIP_PACKAGES':'stackstac'},
    scheduler_cpu=1024,
    scheduler_mem=4096,
    worker_cpu=2048,
    worker_mem=8192,
    #execution_role_arn="arn:aws:iam::260849320:role/dask-fargate-execution", #UPDATED
    #task_role_arn='arn:aws:iam::260849720:role/dask-fargate-task', #UPDATED
    #task_role_policies=[]
    #vpc='vpc-0280b92031b9f010c',
    #subnets=[
    #    'subnet-06cc237e',
    #    'subnet-2a505861',
    #    'subnet-cf04f2',
    #    'subnet-3a2756',
    #    'subnet-08ba9c01b59b6'
    #], # updated
    #security_groups=['sg-02fe57ad943901'], #updated
    #skip_cleanup = True,
    n_workers=4,
    fargate_use_private_ip=False,
    scheduler_timeout="15 minutes"
)