In [1]:
import coiled

import fsspec
import numpy as np
import rioxarray
import xarray as xr
import fsspec
import pandas as pd
import logging
from flox.xarray import xarray_reduce
import numpy as np
import dask

In [2]:
logging.getLogger("distributed.client").setLevel(logging.ERROR)  # or logging.ERROR

In [3]:
fs = fsspec.filesystem("s3", requester_pays=True)

## Start the cluster

- Computation is sensitive to the cpu to memory ratio of instances
- Memory optimized instance type is best - using r7g.2xlarge (8vcpu and 64GB; 1:8 cpu to memory)
- Out of memory error on instances with low cpu to memory ratio (even 1:4)
- Graviton instances performed better than intel (and they are cheaper!)
- Notebook, the data and cluster are in the same region
- Using spot instances when available

In [4]:
cluster = coiled.Cluster(
    name="dist_alerts_zonal_stat_count",
    region="us-east-1",
    n_workers=5,
    tags={"project": "dist_alerts_zonal_stat"},
    scheduler_vm_types="r7g.xlarge",
    worker_vm_types="r7g.2xlarge",
    compute_purchase_option="spot_with_fallback"
)

client = cluster.get_client()

Output()

Output()

In [5]:
tcl = xr.open_dataset("s3://gfw-data-lake/umd_tree_cover_loss/v1.12/raster/epsg-4326/cog/year__tcd10_2000.tif").band_data.chunk(
        {"x": 10000, "y": 10000})
tcl



Unnamed: 0,Array,Chunk
Bytes,2.93 TiB,381.47 MiB
Shape,"(1, 560000, 1440000)","(1, 10000, 10000)"
Dask graph,8064 chunks in 2 graph layers,8064 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 2.93 TiB 381.47 MiB Shape (1, 560000, 1440000) (1, 10000, 10000) Dask graph 8064 chunks in 2 graph layers Data type float32 numpy.ndarray",1440000  560000  1,

Unnamed: 0,Array,Chunk
Bytes,2.93 TiB,381.47 MiB
Shape,"(1, 560000, 1440000)","(1, 10000, 10000)"
Dask graph,8064 chunks in 2 graph layers,8064 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


In [6]:
cluster.adapt(min=5, max=50)

2025-07-18 14:48:23,455 - distributed.deploy.adaptive - INFO - Adaptive scaling started: minimum=1 maximum=200


<coiled.cluster.CoiledAdaptive at 0x152b1ca70>

In [11]:
tcl = tcl.where(tcl > 0)
tcl += 2000
df = tcl.to_dask_dataframe().persist()
df

This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.


Unnamed: 0_level_0,band,x,y,spatial_ref,band_data
npartitions=8208,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
0,int64,float64,float64,int64,float32
99680000,...,...,...,...,...
...,...,...,...,...,...
806341200000,...,...,...,...,...
806399999999,...,...,...,...,...


In [12]:
partitioned = df.repartition(npartitions=50)
partitioned

Unnamed: 0_level_0,band,x,y,spatial_ref,band_data
npartitions=50,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
0,int64,float64,float64,int64,float32
16184000000,...,...,...,...,...
...,...,...,...,...,...
790198080000,...,...,...,...,...
806399999999,...,...,...,...,...


In [13]:
partitioned.to_parquet(

SyntaxError: incomplete input (714372204.py, line 1)

## Reading the grouping datasets

- These are already clipped to DIST alerts extents and saved to zarr for performance
- Have the same chunk size as DIST (10k by 10k)
- Using optimal data types

In [5]:
dist_obj_name = "s3://gfw-data-lake/umd_glad_dist_alerts/v20250510/raster/epsg-4326/zarr/date_conf.zarr"
dist_alerts = xr.open_zarr(dist_obj_name)
dist_alerts

Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7fb30667ead0>
Unclosed connector
connections: ['deque([(<aiohttp.client_proto.ResponseHandler object at 0x7fb3062ddf60>, 2338.437162932)])']
connector: <aiohttp.connector.TCPConnector object at 0x7fb30667ea10>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7fb30612b5d0>
Unclosed connector
connections: ['deque([(<aiohttp.client_proto.ResponseHandler object at 0x7fb3062dda90>, 2338.441209306)])']
connector: <aiohttp.connector.TCPConnector object at 0x7fb306291c90>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7fb3066e0ad0>
Unclosed connector
connections: ['deque([(<aiohttp.client_proto.ResponseHandler object at 0x7fb3062dd4e0>, 2338.442779742)])']
connector: <aiohttp.connector.TCPConnector object at 0x7fb3066e0a10>


Unnamed: 0,Array,Chunk
Bytes,1.26 TiB,190.73 MiB
Shape,"(1, 480000, 1440000)","(1, 10000, 10000)"
Dask graph,6912 chunks in 2 graph layers,6912 chunks in 2 graph layers
Data type,int16 numpy.ndarray,int16 numpy.ndarray
"Array Chunk Bytes 1.26 TiB 190.73 MiB Shape (1, 480000, 1440000) (1, 10000, 10000) Dask graph 6912 chunks in 2 graph layers Data type int16 numpy.ndarray",1440000  480000  1,

Unnamed: 0,Array,Chunk
Bytes,1.26 TiB,190.73 MiB
Shape,"(1, 480000, 1440000)","(1, 10000, 10000)"
Dask graph,6912 chunks in 2 graph layers,6912 chunks in 2 graph layers
Data type,int16 numpy.ndarray,int16 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,1.26 TiB,190.73 MiB
Shape,"(1, 480000, 1440000)","(1, 10000, 10000)"
Dask graph,6912 chunks in 2 graph layers,6912 chunks in 2 graph layers
Data type,int16 numpy.ndarray,int16 numpy.ndarray
"Array Chunk Bytes 1.26 TiB 190.73 MiB Shape (1, 480000, 1440000) (1, 10000, 10000) Dask graph 6912 chunks in 2 graph layers Data type int16 numpy.ndarray",1440000  480000  1,

Unnamed: 0,Array,Chunk
Bytes,1.26 TiB,190.73 MiB
Shape,"(1, 480000, 1440000)","(1, 10000, 10000)"
Dask graph,6912 chunks in 2 graph layers,6912 chunks in 2 graph layers
Data type,int16 numpy.ndarray,int16 numpy.ndarray


In [None]:
# clipping the grouping layers reduces compute time slightly compared to doing xr.align dynamically but we have to do that when operational to reduce data management burden
countries_from_clipped = xr.open_zarr(
    's3://gfw-data-lake/gadm_administrative_boundaries/v4.1.85/raster/epsg-4326/zarr/adm0_clipped_to_dist.zarr'
).band_data


In [7]:
regions_from_clipped = xr.open_zarr(
    's3://gfw-data-lake/gadm_administrative_boundaries/v4.1.85/raster/epsg-4326/zarr/adm1_clipped_to_dist.zarr'
).band_data


In [8]:
subregions_from_clipped = xr.open_zarr(
    's3://gfw-data-lake/gadm_administrative_boundaries/v4.1.85/raster/epsg-4326/zarr/adm2_clipped_to_dist.zarr'
).band_data

In [9]:
natural_lands_from_clipped  = xr.open_zarr(
    's3://gfw-data-lake/sbtn_natural_lands/zarr/sbtn_natural_lands_all_classes_clipped_to_dist.zarr'
).band_data

In [10]:
adm0_ids = [
    0, 4, 8, 10, 12, 16, 20, 24, 28, 31, 32, 36, 40, 44, 48, 50, 51, 52, 56, 60,
    64, 68, 70, 72, 74, 76, 84, 86, 90, 92, 96, 100, 104, 108, 112, 116, 120,
    124, 132, 136, 140, 144, 148, 152, 156, 158, 162, 166, 170, 174, 175, 178,
    180, 184, 188, 191, 192, 196, 203, 204, 208, 212, 214, 218, 222, 226, 231,
    232, 233, 234, 238, 239, 242, 246, 248, 250, 254, 258, 260, 262, 266, 268,
    270, 275, 276, 288, 292, 296, 300, 304, 308, 312, 316, 320, 324, 328, 332,
    334, 336, 340, 344, 348, 352, 356, 360, 364, 368, 372, 376, 380, 384, 388,
    392, 398, 400, 404, 408, 410, 414, 417, 418, 422, 426, 428, 430, 434, 438,
    440, 442, 446, 450, 454, 458, 462, 466, 470, 474, 478, 480, 484, 492, 496,
    498, 499, 500, 504, 508, 512, 516, 520, 524, 528, 531, 533, 534, 535, 540,
    548, 554, 558, 562, 566, 570, 574, 578, 580, 581, 583, 584, 585, 586, 591,
    598, 600, 604, 608, 612, 616, 620, 624, 626, 630, 634, 638, 642, 643, 646,
    652, 654, 659, 660, 662, 663, 666, 670, 674, 678, 682, 686, 688, 690, 694,
    702, 703, 704, 705, 706, 710, 716, 724, 728, 729, 732, 740, 744, 748, 752,
    756, 760, 762, 764, 768, 772, 776, 780, 784, 788, 792, 795, 796, 798, 800,
    804, 807, 818, 826, 831, 832, 833, 834, 840, 850, 854, 858, 860, 862, 876,
    882, 887, 894
]


In [48]:
alert_dates = np.arange(731, 1590) # we should get this from metadata which includes `content_date_range`

In [12]:
dist_alerts.confidence

Unnamed: 0,Array,Chunk
Bytes,1.26 TiB,190.73 MiB
Shape,"(1, 480000, 1440000)","(1, 10000, 10000)"
Dask graph,6912 chunks in 2 graph layers,6912 chunks in 2 graph layers
Data type,int16 numpy.ndarray,int16 numpy.ndarray
"Array Chunk Bytes 1.26 TiB 190.73 MiB Shape (1, 480000, 1440000) (1, 10000, 10000) Dask graph 6912 chunks in 2 graph layers Data type int16 numpy.ndarray",1440000  480000  1,

Unnamed: 0,Array,Chunk
Bytes,1.26 TiB,190.73 MiB
Shape,"(1, 480000, 1440000)","(1, 10000, 10000)"
Dask graph,6912 chunks in 2 graph layers,6912 chunks in 2 graph layers
Data type,int16 numpy.ndarray,int16 numpy.ndarray


## Computation

In [15]:
%%time

from flox import ReindexArrayType, ReindexStrategy


countries_from_clipped.name = "countries"
regions_from_clipped.name = "regions"
subregions_from_clipped.name = "subregions"
natural_lands_from_clipped.name = "natural_lands"
alerts_count = xarray_reduce(
    dist_alerts.confidence,
    *(
        countries_from_clipped,
        regions_from_clipped,
        subregions_from_clipped,
        natural_lands_from_clipped,
        dist_alerts.alert_date,
        dist_alerts.confidence
    ),
    func='count',
    expected_groups=(
        adm0_ids,
        np.arange(86),
        np.arange(854),
        np.arange(22),
        alert_dates,
        [1, 2, 3]
    ),
    reindex=ReindexStrategy(
        blockwise=False, array_type=ReindexArrayType.SPARSE_COO
    ),
    fill_value=0
).compute()

CPU times: user 4.22 s, sys: 1.46 s, total: 5.69 s
Wall time: 6min 55s


## Transforming sparse array to dataframe and saving to parquet

In [16]:
sparse_data = alerts_count.data

dim_names = alerts_count.dims
indices = sparse_data.coords
values = sparse_data.data

coord_dict = {
    dim: alerts_count.coords[dim].values[indices[i]]
    for i, dim in enumerate(dim_names)
}
coord_dict["value"] = values

df = pd.DataFrame(coord_dict)

In [29]:
df.head()

Unnamed: 0,countries,regions,subregions,natural_lands,alert_date,confidence,value
0,0,0,0,0,731,2,38
1,0,0,0,0,733,2,5
2,0,0,0,0,733,3,3
3,0,0,0,0,735,2,3
4,0,0,0,0,735,3,7


In [23]:
df.to_parquet('s3://gfw-data-lake/sbtn_natural_lands/zarr/dist_alerts_by_natural_lands_adm2_raw.parquet', index=False)

A QC spot-check - counted in qgis that this region has total 26 alerts

In [38]:
df[(df.countries == 566) & (df.regions == 20) & (df.subregions == 31)]

Unnamed: 0,countries,regions,subregions,natural_lands,alert_date,confidence,value
16729620,566,20,31,13,1087,3,1
16729621,566,20,31,13,1217,3,3
16729622,566,20,31,13,1257,3,10
16729623,566,20,31,13,1465,3,10
16729624,566,20,31,13,1477,2,2
