In [2]:
%load_ext autoreload
%autoreload 2


In [None]:
# scipy basics
import pandas as pd
import numpy as np
import geopandas as gpd
from shapely.wkt import loads

from collections import OrderedDict
# dask/parallelization libraries
import dask.array as da
import dask.dataframe as dd
import dask_geopandas
import pystac
from odc import stac
import rasterio


from dask.distributed import LocalCluster, Client, get_worker
from shapely.geometry import shape


# zonal stats library
from zonal_datacube.zonal_datacube import ZonalDataCube
from zonal_datacube.analysis_functions import count as dc_count, mean as dc_mean, AnalysisFunction

# notebook utility functions that integrate with the infrastructure
# from wri_notebooks_utils.wri_notebooks_utils import get_data_api_stac_items
# from wri_notebooks_utils.wri_notebooks_utils import get_dask_ecs_client

In [4]:
dataset = "umd_glad_landsat_alerts"
ptw_path = "s3://gfw-stac-prod/ptw_grid.csv"

Currently running in local cluster

In [5]:
# client = get_dask_ecs_client()
cluster = LocalCluster(n_workers=4)
client = Client(cluster.scheduler_address)
client

2023-02-01 15:07:50,724 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-oknjxwbb', purging


0,1
Connection method: Direct,
Dashboard: http://127.0.0.1:8787/status,

0,1
Comm: tcp://127.0.0.1:36765,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads: 8
Started: Just now,Total memory: 15.64 GiB

0,1
Comm: tcp://127.0.0.1:40043,Total threads: 2
Dashboard: http://127.0.0.1:37325/status,Memory: 3.91 GiB
Nanny: tcp://127.0.0.1:42221,
Local directory: /tmp/dask-worker-space/worker-jyv3fufn,Local directory: /tmp/dask-worker-space/worker-jyv3fufn
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 0.0%,Last seen: Just now
Memory usage: 56.53 MiB,Spilled bytes: 0 B
Read bytes: 0.0 B,Write bytes: 0.0 B

0,1
Comm: tcp://127.0.0.1:40205,Total threads: 2
Dashboard: http://127.0.0.1:41939/status,Memory: 3.91 GiB
Nanny: tcp://127.0.0.1:37117,
Local directory: /tmp/dask-worker-space/worker-n2uflqxt,Local directory: /tmp/dask-worker-space/worker-n2uflqxt
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 0.0%,Last seen: Just now
Memory usage: 56.35 MiB,Spilled bytes: 0 B
Read bytes: 0.0 B,Write bytes: 0.0 B

0,1
Comm: tcp://127.0.0.1:46289,Total threads: 2
Dashboard: http://127.0.0.1:34119/status,Memory: 3.91 GiB
Nanny: tcp://127.0.0.1:44985,
Local directory: /tmp/dask-worker-space/worker-vrpwcftt,Local directory: /tmp/dask-worker-space/worker-vrpwcftt
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 0.0%,Last seen: Just now
Memory usage: 56.46 MiB,Spilled bytes: 0 B
Read bytes: 0.0 B,Write bytes: 0.0 B

0,1
Comm: tcp://127.0.0.1:42905,Total threads: 2
Dashboard: http://127.0.0.1:32969/status,Memory: 3.91 GiB
Nanny: tcp://127.0.0.1:39319,
Local directory: /tmp/dask-worker-space/worker-1fvs3nnc,Local directory: /tmp/dask-worker-space/worker-1fvs3nnc
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 0.0%,Last seen: Just now
Memory usage: 56.57 MiB,Spilled bytes: 0 B
Read bytes: 0.0 B,Write bytes: 0.0 B


TODO: The function below will be replaced by the one in wri_notebooks_utils when this notebook is set up to run in AWS

In [7]:
DATA_API_STAC_CATALOG_URI = "http://gfw-stac-prod.s3.amazonaws.com/gfw-catalog.json"
def get_data_api_stac_items(dataset, group=None):
    catalog = pystac.Catalog.from_file(DATA_API_STAC_CATALOG_URI)
    collection = catalog.get_child(dataset)
    if group:
        collection = collection.get_child(group)
    items = [
        collection.get_item(tile_id) for tile_id in
        ['10S_070W', '10S_080W']
    ]
    # items = list(collection.get_items())
    items_dict = [item.to_dict() for item in items]

    # rename assets from tile ID to dataset so STAC clients know they're part
    # of the same dataset
    items_renamed = []
    for item in items_dict:
        asset = item["assets"].pop(list(item["assets"].keys())[0])

        # temp workaround
        if asset["raster:bands"][0]["nodata"] == 0:
            asset["raster:bands"][0]["nodata"] = int(asset["raster:bands"][0]["nodata"])
        item["assets"][dataset] = asset

        # temp workaround, manually add mising proj files
        bbox = list(shape(item["geometry"]).bounds)
        transform = list(
            rasterio.transform.from_bounds(*bbox, *item["properties"]["proj:shape"])
        )

        item["properties"]["proj:bbox"] = bbox
        item["properties"]["proj:transform"] = transform

        items_renamed.append(pystac.Item.from_dict(item))

    return items_renamed

Collect the STAC items for GLAD alerts and ESA land cover data. Note we're selecting a subset of tiles that overlap with selected zones to run this locally

In [8]:
# get STAC items from Data API catalog
glad_items = get_data_api_stac_items(dataset)
glad_items

[<Item id=10S_070W>, <Item id=10S_080W>]

In [9]:
lulc_items = get_data_api_stac_items("esa_land_cover_2015", group="class")
lulc_items

[<Item id=10S_070W>, <Item id=10S_080W>]

Read the zones into a geopandas `GeoDataFrame` and select a subset to run the analysis in a local cluster

In [10]:
ptw_df = pd.read_csv(ptw_path, delimiter='\t', names=['geometry', 'weight', 'id', 'region'])

In [11]:
ptw_df = ptw_df[(ptw_df['id'] >= "south_america_57523") & (ptw_df['id'] <= "south_america_57530")]
ptw_df["geometry"] = ptw_df["geometry"].apply(lambda x: loads(x).buffer(0))
ptw_gdf = gpd.GeoDataFrame(ptw_df, geometry="geometry")

ptw_gdf.head()

Unnamed: 0,geometry,weight,id,region
57523,"POLYGON ((-61.02697 -10.23191, -61.01453 -10.1...",0.00033,south_america_57523,south_america
57524,"POLYGON ((-60.92377 -10.23191, -60.91135 -10.1...",0.105667,south_america_57524,south_america
57525,"POLYGON ((-60.82058 -10.23191, -60.80817 -10.1...",0.057248,south_america_57525,south_america
57526,"POLYGON ((-60.71738 -10.23191, -60.70500 -10.1...",0.082111,south_america_57526,south_america
57527,"POLYGON ((-60.20139 -10.23191, -60.18911 -10.1...",0.284836,south_america_57527,south_america


We need to know the land cover classes beforehand as dask dataframe `map_partitions`
requires `meta` object specifying dtype of the resuls columns which are the land cover
classes in this case. Here we read the classes from a csv file and construct the meta objects

In [12]:
lc_classes = pd.read_csv('land_cover_classes.csv')
lc_classes.head()

Unnamed: 0,id,class
0,0,no_data
1,10,agriculture
2,11,agriculture
3,12,agriculture
4,20,agriculture


In [13]:
lc_meta = OrderedDict({d: int for d in lc_classes.id})
lc_agg = {d: "sum" for d in lc_classes.id}

In [39]:
def get_alerts(datacube, zone):
    alerts = datacube.umd_glad_landsat_alerts > 0
    return alerts.astype(int).sum()

alerts_count_func = AnalysisFunction(
    name="count",
    func=get_alerts,
    agg=lc_agg,
    meta=lc_meta
    
)

In [49]:
zonal_dc = ZonalDataCube(
    ptw_gdf, glad_items,
    npartitions=10,
    bounds=ptw_gdf.total_bounds,
    groupby_stac_items=lulc_items,
    groupby_variable='esa_land_cover_2015'
)

In [None]:
%%time
results_graph = zonal_dc.analyze(funcs=[alerts_count_func])
results = results_graph.compute()

In [53]:
results.head()

Unnamed: 0,weight,region,id,0,10,11,12,20,30,40,...,140,150,151,152,153,200,201,202,210,220
0,0.082111,south_america,south_america_57526,0.0,0,0,0,5.0,0.0,7.0,...,0.0,0,0,0,0,0,0,0,0.0,0
1,0.057248,south_america,south_america_57525,0.0,0,0,0,0.0,0.0,1.0,...,0.0,0,0,0,0,0,0,0,0.0,0
2,0.105667,south_america,south_america_57524,0.0,0,0,0,0.0,0.0,11.0,...,0.0,0,0,0,0,0,0,0,0.0,0
3,0.00033,south_america,south_america_57523,0.0,0,0,0,0.0,0.0,7.0,...,0.0,0,0,0,0,0,0,0,0.0,0
4,1.227149,south_america,south_america_57528,0.0,0,0,0,0.0,0.0,53.0,...,0.0,0,0,0,0,0,0,0,0.0,0


Aggregate the land cover ids by class

In [54]:
column_names = lc_classes.set_index('id').to_dict(orient='dict')['class']
results_by_class = results.rename(columns=column_names)
results_by_class.head()

Unnamed: 0,weight,region,id,no_data,agriculture,agriculture.1,agriculture.2,agriculture.3,agriculture.4,agriculture.5,...,sparse vegetation,sparse vegetation.1,sparse vegetation.2,sparse vegetation.3,sparse vegetation.4,bare,bare.1,bare.2,water,permanent snow and ice
0,0.082111,south_america,south_america_57526,0.0,0,0,0,5.0,0.0,7.0,...,0.0,0,0,0,0,0,0,0,0.0,0
1,0.057248,south_america,south_america_57525,0.0,0,0,0,0.0,0.0,1.0,...,0.0,0,0,0,0,0,0,0,0.0,0
2,0.105667,south_america,south_america_57524,0.0,0,0,0,0.0,0.0,11.0,...,0.0,0,0,0,0,0,0,0,0.0,0
3,0.00033,south_america,south_america_57523,0.0,0,0,0,0.0,0.0,7.0,...,0.0,0,0,0,0,0,0,0,0.0,0
4,1.227149,south_america,south_america_57528,0.0,0,0,0,0.0,0.0,53.0,...,0.0,0,0,0,0,0,0,0,0.0,0


In [80]:
aggregated_results = results_by_class.transpose().loc[lc_classes['class'].unique()]
aggregated_results["classes"] = aggregated_results.index
final_alerts = pd.concat(
    [results[['weight', 'region', 'id']], aggregated_results.groupby("classes").sum().transpose()],
    axis=1
)

final_alerts.head()

Unnamed: 0,weight,region,id,agriculture,bare,forest,grassland,no_data,permanent snow and ice,settlement,shrubland,sparse vegetation,water,wetland
0,0.082111,south_america,south_america_57526,12.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
1,0.057248,south_america,south_america_57525,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
2,0.105667,south_america,south_america_57524,11.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
3,0.00033,south_america,south_america_57523,7.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
4,1.227149,south_america,south_america_57528,53.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
