Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to make a big-ish median mosaic #12

Open
MikeBeller opened this issue Dec 6, 2021 · 25 comments
Open

How to make a big-ish median mosaic #12

MikeBeller opened this issue Dec 6, 2021 · 25 comments

Comments

@MikeBeller
Copy link

I am working on creating a mosaic of sentinel data for a sizable area of interest -- around 600km by 400km. The goal is to get a median of the non-cloudy pixels over a 2 month period. My experience is that, with the code I've developed, it's not doable reliably even on a 400-core cluster.

To dig in to this I have a reproducible example of a smaller area -- 1/16 of the above example (roughly 150km x 100km), running on a 100 core cluster. This works about 1 out of 3 runs, otherwise dying with various errors. Below is the code, and then the output of 3 runs. The first died with a missing dependent, the second worked, and the third died with set size changed. (Note as regards my previously filed issue #11 -- this is with a static cluster size where I wait for all workers before proceeding.)

Just to clarify why the code is organized the way it is -- I am applying the median over time of each band, after NaNing out pixels that are identified in the Sentinel-2 SCL band as being clouds, I.e. I'm using pixel-by-pixel cloud removal instead of using a stac query based on overall percentage cloudiness, in order to maximize the information I can get from the tiles. This is probably contributing to the complexity. But it also seems like a very reasonable thing to do.

My questions are

  • Is the code inefficiently organized? Is there an obviously better way to do it? Can the where/isin/median process be merged into the stack call? (even though the 'isin' uses the SCL band to mask pixels in other bands)
  • Why are these various missing dependent / set size changed type errors happening? Is it that I'm trying to do too much with too little worker memory? Is there some kind of rule of thumb that would let me know how many workers I need for a particular computation? The full 670GB cube should be fitting roughly in the memory of the workers -- that is the rule of thumb I used to get to 100 cores.

Side note as regards scheduler: For larger areas (e.g. the full 600km x 400km) I can't even reliable get the computation going, even with 400 cores. I think it's overwhelming the scheduler. Should I try to (can I) increase memory to the scheduler somehow?

Or perhaps the real problem is the computation is just out of scope unless it's done in a more algorithmicly aligned manner.

Very interested in your thoughts.

import numpy as np
import dask
import matplotlib.pyplot as plt
from pystac_client import Client as pystac_client
#from pystac.extensions.eo import EOExtension as eo
import planetary_computer as pc
import stackstac
from dask.distributed import LocalCluster, Client, wait

import time
start_time = time.time()
def log(*args):
    el = time.time() - start_time
    current_time = time.strftime("%H:%M:%S", time.localtime())
    print(current_time, el, flush=True, *args)

log("STARTING")

import dask_gateway

nworkers = 100
cluster = dask_gateway.GatewayCluster()
client = cluster.get_client()
cluster.scale(nworkers)
log("DASHBOARD:", cluster.dashboard_link)
client.wait_for_workers(nworkers)
log("WORKERS READY.")

aoi_bounds_zim = (27.11764655, -21.18948202, 32.8563901, -16.99215633) # big chunk of zimbabwe
(xmn,ymn,xmx,ymx) = aoi_bounds_zim
aoi_bounds_half = (xmn, ymn, (xmn+xmx)/2, (ymn+ymx)/2)  # cut it in half in each dimension (/4 in area)
aoi_bounds_quarter = (xmn, ymn, (xmn+xmx)/2, (ymn+ymx)/2)  # cut it in quarter in each dimension (/16 in area)
#aoi_bounds_tiny = (xmn, ymn, xmn+0.1, ymn+0.1)
aoi_bounds = aoi_bounds_quarter

date_range = "2020-01-01/2020-03-01"
all_bands = ['B02', 'B03', 'B04', 'B08', 'SCL']
rgb_bands = ['B04', 'B03', 'B02']

catalog = pystac_client.open("https://planetarycomputer.microsoft.com/api/stac/v1")

search = catalog.search(
    collections=["sentinel-2-l2a"],
    bbox=aoi_bounds,
    datetime=date_range,
    #query={"eo:cloud_cover": {"lt": 10}},
)

# Check how many items were returned
items = list(search.get_items())
print(f"Returned {len(items)} Items")
# Note that there are more than 1 tile here:
print("Sentinel 2 Unique Tiles:", np.unique([i.properties['s2:mgrs_tile'] for i in items]))
# Print unique CRSs:
print("EPSGS:", np.unique([i.properties['proj:epsg'] for i in items]))

# Stack it
epsg_int = 4326  # because we (may) have multiple UTM zones, stick with lat/long
chunksize = 2048

signed_items = [pc.sign(item).to_dict() for item in search.get_items()]
data = (
        stackstac.stack(
            signed_items,
            epsg=epsg_int,
            #resolution=10, 
            #bounds=bounds,
            bounds_latlon=aoi_bounds,
            assets=all_bands,
            chunksize=chunksize,)
    .where(lambda x: x > 0, other=np.nan)  # sentinel-2 uses 0 as nodata
    .assign_coords(
        time=lambda x: x.time.dt.round("D"))  # round time to daily for nicer plot labels
    .groupby('time')          # one tile per datetime
    .apply(stackstac.mosaic)  # mosaic together tiles for same datetime
    )
log("Full data size GB:", data.nbytes/1e9)
log("Dimensions:", data.dims)

cloud_scl_values = [8,9,10]  # sentinel 2 cloudy pixel SCL values
mask = ~data.sel(band='SCL').isin(cloud_scl_values)
img = (data.sel(band=rgb_bands)
        .where(mask)
        .median(dim='time'))
log("Image size GB:", img.nbytes/1e9)

log("CALLING PERSIST")
img_p = img.persist()
log("AFTER CALL TO PERSIST, WAITING")
wait(img_p)
log("PERSIST COMPLETE, PLOTTING")

SCALE = 4
img_c = (img_p
         .coarsen({'x': SCALE, 'y': SCALE}, boundary='pad')
         .mean(skipna=True, keep_attrs=True)
         .compute() )

fig,ax = plt.subplots(1,1)
img_c.plot.imshow(ax=ax, x='x', y='y', robust=True)
fig.savefig("foo.png")
plt.show()

log("AFTER PLOT")
      
cluster.close()

RUN 1 -- missing dependent

17:20:04 9.5367431640625e-07 starting
17:20:17 13.552658557891846 DASHBOARD: https://pccompute.westeurope.cloudapp.azure.com/compute/services/dask-gateway/clusters/prod.cb21467cb71b46dc9aa9634aa6e82591/status
17:25:24 319.86250376701355 workers ready.
Returned 204 Items
Sentinel 2 Unique Tiles: ['35KNS' '35KNT' '35KNU' '35KPS' '35KPT' '35KPU' '35KQS' '35KQT' '35KQU'
 '35KRS' '35KRT' '35KRU']
EPSGS: [32735]
/srv/conda/envs/notebook/lib/python3.8/site-packages/stackstac/accumulate_metadata.py:151: VisibleDeprecationWarning: Creating an ndarray from ragged nested sequences (which is a list-or-tuple of lists-or-tuples-or ndarrays with different lengths or shapes) is deprecated. If you meant to do this, you must specify 'dtype=object' when creating the ndarray.
  props_arr = np.squeeze(np.array(props))
17:25:28 323.98693227767944 Full data size GB: 673.44177984
17:25:28 323.9869964122772 Dimensions: ('time', 'band', 'y', 'x')
17:25:29 324.8450291156769 Image size GB: 16.836044496
17:25:29 324.84509015083313 CALLING PERSIST
17:26:34 390.26140332221985 AFTER CALL TO PERSIST, WAITING
17:32:58 773.6941854953766 PERSIST COMPLETE, PLOTTING
Traceback (most recent call last):
  File "big_image.py", line 99, in <module>
    img_c = (img_p
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/core/dataarray.py", line 951, in compute
    return new.load(**kwargs)
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/core/dataarray.py", line 925, in load
    ds = self._to_temp_dataset().load(**kwargs)
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/core/dataset.py", line 862, in load
    evaluated_data = da.compute(*lazy_data.values(), **kwargs)
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/dask/base.py", line 568, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/client.py", line 2671, in get
    results = self.gather(packed, asynchronous=asynchronous, direct=direct)
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/client.py", line 1948, in gather
    return self.sync(
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/client.py", line 845, in sync
    return sync(
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils.py", line 326, in sync
    raise exc.with_traceback(tb)
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils.py", line 309, in f
    result[0] = yield future
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/client.py", line 1813, in _gather
    raise exception.with_traceback(traceback)
ValueError: Could not find dependent ('where-getitem-db29c7abdfe1267a3a46298a39ef2497', 2, 0, 0).  Check worker logs

real    13m6.952s
user    1m57.003s
sys     0m4.014s

RUN 2 -- successful

(notebook) jovyan@jupyter-mike-40beller-2etech:~/github/carbon/msft$ time python big_image.py
17:34:32 1.9073486328125e-06 starting
17:34:44 11.641934633255005 DASHBOARD: https://pccompute.westeurope.cloudapp.azure.com/compute/services/dask-gateway/clusters/prod.ec4908e8f66c4ef78afd69b17e6ace5b/status
17:34:56 23.47306203842163 workers ready.
Returned 204 Items
Sentinel 2 Unique Tiles: ['35KNS' '35KNT' '35KNU' '35KPS' '35KPT' '35KPU' '35KQS' '35KQT' '35KQU'
 '35KRS' '35KRT' '35KRU']
EPSGS: [32735]
/srv/conda/envs/notebook/lib/python3.8/site-packages/stackstac/accumulate_metadata.py:151: VisibleDeprecationWarning: Creating an ndarray from ragged nested sequences (which is a list-or-tuple of lists-or-tuples-or ndarrays with different lengths or shapes) is deprecated. If you meant to do this, you must specify 'dtype=object' when creating the ndarray.
  props_arr = np.squeeze(np.array(props))
17:35:01 28.43052911758423 Full data size GB: 673.44177984
17:35:01 28.430601119995117 Dimensions: ('time', 'band', 'y', 'x')
17:35:01 29.03855848312378 Image size GB: 16.836044496
17:35:01 29.038624048233032 CALLING PERSIST
17:36:08 95.49182105064392 AFTER CALL TO PERSIST, WAITING
17:42:17 464.612357378006 PERSIST COMPLETE, PLOTTING
17:42:45 492.436571598053 AFTER PLOT

real    8m16.301s
user    2m2.096s
sys     0m8.124s

RUN 3 -- set changed size error

(notebook) jovyan@jupyter-mike-40beller-2etech:~/github/carbon/msft$ time python big_image.py 
17:45:01 2.1457672119140625e-06 STARTING
17:45:12 10.9552001953125 DASHBOARD: https://pccompute.westeurope.cloudapp.azure.com/compute/services/dask-gateway/clusters/prod.45fe85a379c447da88f06e9e6c407135/status
17:45:25 23.309569597244263 WORKERS READY.
Returned 204 Items
Sentinel 2 Unique Tiles: ['35KNS' '35KNT' '35KNU' '35KPS' '35KPT' '35KPU' '35KQS' '35KQT' '35KQU'
 '35KRS' '35KRT' '35KRU']
EPSGS: [32735]
/srv/conda/envs/notebook/lib/python3.8/site-packages/stackstac/accumulate_metadata.py:151: VisibleDeprecationWarning: Creating an ndarray from ragged nested sequences (which is a list-or-tuple of lists-or-tuples-or ndarrays with different lengths or shapes) is deprecated. If you meant to do this, you must specify 'dtype=object' when creating the ndarray.
  props_arr = np.squeeze(np.array(props))
17:45:30 28.56306266784668 Full data size GB: 673.44177984
17:45:30 28.563136100769043 Dimensions: ('time', 'band', 'y', 'x')
17:45:31 29.170450448989868 Image size GB: 16.836044496
17:45:31 29.17051339149475 CALLING PERSIST
17:46:37 95.30292534828186 AFTER CALL TO PERSIST, WAITING
17:53:00 478.3570439815521 PERSIST COMPLETE, PLOTTING
Traceback (most recent call last):
  File "big_image.py", line 94, in <module>
    img_c = (img_p
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/core/dataarray.py", line 951, in compute
    return new.load(**kwargs)
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/core/dataarray.py", line 925, in load
    ds = self._to_temp_dataset().load(**kwargs)
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/core/dataset.py", line 862, in load
    evaluated_data = da.compute(*lazy_data.values(), **kwargs)
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/dask/base.py", line 568, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/client.py", line 2671, in get
    results = self.gather(packed, asynchronous=asynchronous, direct=direct)
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/client.py", line 1948, in gather
    return self.sync(
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/client.py", line 845, in sync
    return sync(
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils.py", line 326, in sync
    raise exc.with_traceback(tb)
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils.py", line 309, in f
    result[0] = yield future
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/client.py", line 1813, in _gather
    raise exception.with_traceback(traceback)
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/dask/optimization.py", line 969, in __call__
    return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/dask/core.py", line 151, in get
    result = _execute_task(task, cache)
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/dask/core.py", line 121, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/stackstac/to_dask.py", line 172, in fetch_raster_window
    data = reader.read(current_window)
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/stackstac/rio_reader.py", line 423, in read
    reader = self.dataset
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/stackstac/rio_reader.py", line 419, in dataset
    self._dataset = self._open()
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/stackstac/rio_reader.py", line 369, in _open
    log_event("open_dataset_initial", dict(url=self.url))
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/stackstac/rio_reader.py", line 42, in log_event
    worker.log_event(topic, dict(msg, thread=_curthread()))
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 820, in log_event
    self.batched_stream.send(
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/batched.py", line 142, in send
    self.waker.set()
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/locks.py", line 224, in set
    for fut in self._waiters:
RuntimeError: Set changed size during iteration

real    8m8.199s
user    1m53.300s
sys     0m3.607s

@TomAugspurger
Copy link

TomAugspurger commented Dec 7, 2021

Thanks for posting this. A quick thought before I dig into it more: The Dask Array median docs mention that the current algorithm rechunks the reduced axis (time) to a single chunk. Since the data starts out with chunks (1, 1, y_chunks, x_chunks), it's a challenging computation. We have to shuffle the entire dataset.

I could imagine a few possible alternatives:

  1. Write a smarter median algorithm that doesn't require (as much) rechunking. I believe there are some approximate algorithms that could worker here. I'm not sure about exact versions (maybe look into dask.arary.percentile?)m
  2. Rewrite write the data I/O layer to be contiguous along time. Essentially, the current workflow goes COGs -> (in-memory) NumPy arrays -> rechunk / transfer. Ideally xarray / dask would be smart enough to rewrite an I/O layer followed by a rechunk with a "smarter" I/O layer, but it's not.

I can try to spend a bit of time on the second item later in the week. But the basic idea is to have a function that takes a list of STAC items (or COGs?) and a window. It would then read that window out of each STAC COG that returns a DataArray that's contiguous in time but fits in memory

def read_window(cogs, window):
    # or use stackstac.stack?
    datasets = [rioxarray.open_rasterio(cog).rio.isel_window(window) for cog in cogs]
    # somehow set time?
    return xr.concat(datasets)

So you would need to have pretty small chunks in the x / y so that the entire timeseries would fit in memory. Then the median computation should be embarrassingly parallel and will run much smoother.

That said, the fact that you're seeing these internal Dask errors indicates a problem that might be worth pursuing if / when we come across a problem that can't be worked around like this.

@MikeBeller
Copy link
Author

Hi Tom. Thanks for your response. Following on your suggestion I replaced the call to median with mean, just to see if this resulted in a faster, more reliable run. Short answer: no. The first run completed, and ran in roughly the same amount of time as the successful run of the median computation (RUN 2 in the original email). (actually the same amount of time between the call to PERSIST and the finish -- which is what matters -- not interested in cluster spinup time for this purpose). The second one errored out the same way as "RUN 3" above -- with "set changed size during iteration. So from an admittedly small sample size -- I'd say the problem is not specifically the median.

Your point about setting up the computation to be more efficiently 'chunkable' -- that makes a lot of sense but I think it may be somewhat confounded by the intrinsic overlap of neighboring Sentinel-2 tiles. That is, based on eyeballing the graph of the original program, for smaller areas, and trying this with and without the call to groupby/stackstack.mosaic, the graph looks a lot more complicated because of the "cascading overlaps" that happen from the mosaicing. But I have little experience reading or tweaking dask graphs, so I'm not sure that's really a problem. If that is part of the problem, I'd be fine with dropping the 'overlap areas' from the sentinel tiles in the original stack, to simplify, if I could figure out an efficient way to do that.

Finally -- as regards your more detailed suggestion of rewriting the query -- I will have to do a lot more reading of the code for stackstac in order to even begin to take a shot at that. Happy for any further input there if you find the time.

Cheers

@TomAugspurger
Copy link

I have an initial prototype that I'll share later today. It's looking somewhat promising but will need a bit of benchmarking and maybe a bit of work on your part to adapt it to your specific workflow.

@TomAugspurger
Copy link

TomAugspurger commented Dec 8, 2021

https://nbviewer.org/gist/TomAugspurger/c027408ae923222fab2a45d5109d114a / https://gist.github.com/TomAugspurger/c027408ae923222fab2a45d5109d114a has the prototype, but it has issues:

  1. there's an off-by-one error somewhere, so all the pixels are shifted
  2. performance is worse than I expected. I need to look into it more
  3. I still saw some of the runtime errors, which surprised me! I'll also look into this but it looks more like an upstream dask / distributed issue

@MikeBeller
Copy link
Author

Hi Tom Thanks for this. I ran it on a static 24 core cluster and, interestingly, the computation is quite a bit slower for the 'good' solution than for the 'bad' one. (80% more run time, and 50% more total compute for the 'good' one). I recognize that as you scale the computation up, the shuffles in the 'bad' solution grow non-linearly, so the 'good' solution may become better than the 'bad' one at larger scale. (I have yet to test). Nonetheless, some additional thoughts:

  • I noticed, by eyeballing the dashboard, that the workers in the 'good' solution have low CPU usage. Looks like less than 40% during the active part of the computation, vs nearly fully utilized in the 'bad' solution.
  • I wonder whether the weird x,y chunk sizes (732x732) cause part of the slowness because they are not aligned with the chunks in the COGs?
  • I feel like calculating the chunks by computing bounds that we hope/expect will align with the pixels seems fragile (and may be the reason for your off by one error). Is there a way to work directly with the pixel offsets, rather than the bounds?

One more complication: I note that your test applies only to a single Sentinel-2 tile (in space), tile 10TET:

import numpy as np
print("Sentinel 2 Unique Tiles:", np.unique([i.properties['s2:mgrs_tile'] for i in items]))

Sentinel 2 Unique Tiles: ['10TET']

I bring this up because in your test you can safely ignore the fact that Sentinel-2 tiles overlap each other. When you use a larger area for the search, you get multiple overlapping tiles among the items. In my original test I ensured that the same pixel does not appear twice in the media, by doing a groupby/map(stackstack.mosaic) in the original call to stack. This would make the graph messier I think.

I was thinking to try to apply your test to my area of interest, but need to consider this overlapping tile issue. Can I just add a group/mosaic in somewhere? I have a feeling it's more complicated than that, but when reading your code, I get a bit lost in the stackstac, pystack, xr.concatenate stuff, so trying to make this change myself to address the overlap might not go well. :-) I'd be willing to just ignore the overlapping sections of the neighboring tiles (rather than actually mosaicing them) if I could figure out how to exclude them as part of the workflow.

@gjoseph92
Copy link

@MikeBeller the Set size changed during iteration was caused by dask/distributed#5552 and fixed in gjoseph92/stackstac#95, which is released in stackstac==0.2.2. Upgrading should eliminate this error; it's a red herring here.

My guess is that the groupby('day').apply(stackstac.mosaic) is the problem here. Xarray's groupby can lead to some convoluted dask graphs, and mosaic already produces a convoluted graph as it is. Since you're flattening all the data through a median anyway, pre-flattening by day should be nearly superfluous. What happens if you remove it? (I hope you don't get banding/striping where the tiles overlap.) If it turns out you need this, but it is the performance problem, we should look more at gjoseph92/stackstac#66 to make an optimized way to do this sort of thing.

@TomAugspurger your approach here is an interesting workaround—the stackstac-ception is not something I'd ever considered. I haven't read it that carefully, but I bet there's an easier way to do this.

Just doing median("time"), say dask wants to rechunk from (time=1, band=1, y=2048, x=2048) to (time=36, band=1, y=256, x=256. So what if you initially tell stackstac to use chunksize=256, and ensure that the median uses 256 as well? Then you have a much simpler rechunking operation, where you're just vertically stacking each 256x256 window—basically the same as your workaround—prior to taking the median. I'd imagine that eliminating the spatial rechunking would make the operation much simpler.

@TomAugspurger
Copy link

Thanks for chiming in Gabe. I'll look into getting the version on planetary-computer images updated.

I do think it's worth exploring just using stackstac.stack with the small initial chunks. My initial worry is that this would blow up the size of the task graph, but we should actually verify that before going down this rabbit hole.

@MikeBeller
Copy link
Author

My guess is that the groupby('day').apply(stackstac.mosaic) is the problem here. Xarray's groupby can lead to some convoluted dask graphs, and mosaic already produces a convoluted graph as it is. Since you're flattening all the data through a median anyway, pre-flattening by day should be nearly superfluous. What happens if you remove it? (I hope you don't get banding/striping where the tiles overlap.) If it turns out you need this, but it is the performance problem, we should look more at gjoseph92/stackstac#66 to make an optimized way to do this sort of thing.

Thanks @gjoseph92 this makes a lot of sense. I was concerned that the double-counting of certain overlapping pixels might produce weird effects. But it's certainly worth a try. I will give it a shot.

@gjoseph92
Copy link

I gave this a quick try, and using chunksize=256 with stackstac makes 10 million tasks for the initial dataset. So yeah, that's not going to work.

@MikeBeller
Copy link
Author

It seems like we are fighting two battles: The first is that stackstac is not organized to provide the data in small-area-large-time order, and the second is large dask graphs (above a few million tasks?) end up swamping the scheduler node. (true?)

And yet our problem is actually embarrassingly parallel in the space dimension. So one simpler-to-implement idea is to take the smaller tile approach, but split the area of interest into 16 large tiles that are run serially through dask, and the final image assembled at the end. like:

for i in range(num_sub_areas):
    sub_area_persisted[i] = sub_area[i].persist()
    wait(sub_area_persisted[i])

num_sub_areas would need to be small enough to keep the graph size below some threshold
Thoughts?

Mike

@gjoseph92
Copy link

gjoseph92 commented Dec 16, 2021

our problem is actually embarrassingly parallel in the space dimension

This is exactly what @TomAugspurger was observing, and trying to take advantage of in the stackstac-within-dask approach. I agree that it makes it particularly frustrating that this is so hard to do.

the second is large dask graphs (above a few million tasks?) end up swamping the scheduler node

To me this is the real root problem. My rule of thumb is that anything over 500k tasks is a no-go for the distributed scheduler. The scheduler hangs for minutes just unpacking the graph, and is so overloaded (possibly also due to GC because of the sheer number of Python objects dask/distributed#4987) that it can't respond to workers fast enough, causing enormous memory buildup due to dask/distributed#5223 and dask/distributed#5114, and an unresponsive dashboard.

Obviously if the scheduler could handle millions of tasks, this would all be easy!

stackstac is not organized to provide the data in small-area-large-time order

Yeah, stackstac is currently designed around always having 1 chunk per asset. It was both easier to design this way, and felt important for task-culling reasons (if you sub-select the time/bands dimensions, it would feel pretty silly to load entire files that you don't need!). But it does lead to lots of tasks.

I think it would be both reasonable and feasible to extend the chunksize= argument to allow you to specify a chunking for the time and band dimensions though (gjoseph92/stackstac#106). While dask can't handle this large of graphs (not likely to change anytime soon), this would open up a lot of options for doing bigger computations. In your case, you could have stackstac load everything along the time dimension into one chunk, since you need that anyway for the median, which would give you the "split the area of interest into 16 large tiles" behavior you need, while staying nicely in xarray-land.

@MikeBeller
Copy link
Author

These seem like excellent future enhancements. Meanwhile to get something working now, I tried a middle ground where I do the time medians on a per-sentinel-2-tile basis. I.e. I group the items returned by the stac search into groups by sentinel tile identifier. Then I compute the medians on a tile by tile basis in serial (to keep the graphs small). This process is quite reliable and reasonably fast.

After computing the per-tile median tiles, I use xr.concat and stackstac.mosaic to combine the persisted tiles. I have a feeling this is not a great strategy as the resulting mosaic has 188,000 tasks in it's DAG, whereas the individual tiles each have only a few hundred or at most a few thousand.

https://gist.github.com/MikeBeller/64cd01bef75e2dfc1b4d46d99a2f0b61

Thoughts on how I can combine the persisted per-tile medians without blowing up the DAG? (This would allow me to do a larger area which is my actual goal. For the example I restricted it to one district instead of the whole AOI.)

@gjoseph92
Copy link

@MikeBeller I think part of the problem is that in your stackstac.stack, you're using bounds_latlon=aoi_bounds for every tile. You only load a few items per array, but each array ends up being the same dimensions (the size of your full AOI) and is mostly NaNs. That results in tons of extraneous tasks to produce those chunks of all NaNs—in fact, after concatenating I think you'll end up with the exact same number of tasks as when you just loaded all the items into one array.

This is easier to see visually. Both arrays have the same overall footprint:
Screen Shot 2021-12-16 at 4 20 53 PM
Just data in different places. And the vast majority of that data is NaN:
Screen Shot 2021-12-16 at 4 21 24 PM

Also, looks like xr.concat adds a lot of steps. After removing bounds_latlon and concatenating just two images (the input nanmedian arrays only had one layer each), I get this:
image

It sounds like xarray isn't actually good at combining/mosaicing many "tiles" of arrays into one big one: pydata/xarray#4213 (comment).

Your best bet will probably be to ensure the tiles don't have any overlap, and are perfectly abutting each other, then combine them using something like xr.combine_nested.

You could do that by splitting your input AOI up into tiles (some simple linspace/meshgrid code probably). Then for each tile, do a stackstac.stack(all_items, bounds_latlon=tile_bounds). Stackstac will automatically drop the items that fall outside the tile. Then you can persist each tile as you're doing now, and finally try combining them together with xr.combine_nested.

In general, a way to combine overlapping tiles with xarray feels like a pretty basic need for the geospatial community, and something it would be possible to add (xref pangeo-data/cog-best-practices#4).

@gjoseph92
Copy link

(Sidenote, @TomAugspurger just want to mention how awesome it is to be able to launch the PC hub, run someone's notebook from a gist, and have everything just work! Amazing to not have to spend any time fiddling with dependencies, launching clusters, or dealing with auth.)

@MikeBeller
Copy link
Author

Thanks @gjoseph92 -- you've provided a lot of ideas and references for me to use in taking another stab at it. (And also a couple of new tools I hadn't seen before so thanks for that). Unfortunately sentinel tiles do overlap by 4900km on each side, but maybe there is a way for me to just slice that off the pre-generated median tiles before persisting them.

Cheers

Mike

@MikeBeller
Copy link
Author

I mean they overlap by 4900m, not km. Oops.

@gjoseph92
Copy link

@MikeBeller I don't think the overlap matters. I'm saying that you don't need to group the STAC items by sentinel-2 tiles anymore. Just write some linspace code to divide up your AOI into your own tiles that touch each other, and pass in the entire list of STAC items, plus each of those tiles as the bounds_latlon, in a for-loop. Then you'll end up with non-overlapping DataArrays, which hopefully you can combine more efficiently with xr.combine_nested.

@MikeBeller
Copy link
Author

@MikeBeller I don't think the overlap matters. I'm saying that you don't need to group the STAC items by sentinel-2 tiles anymore. Just write some linspace code to divide up your AOI into your own tiles that touch each other, and pass in the entire list of STAC items, plus each of those tiles as the bounds_latlon, in a for-loop. Then you'll end up with non-overlapping DataArrays, which hopefully you can combine more efficiently with xr.combine_nested.

Ah -- thanks yes that's helpful. I was hung up on potential reading efficiencies of "one tile at a time", but what you describe is a lot simpler to code up for sure. I will give it a shot.

@MikeBeller
Copy link
Author

I tried your suggestion @gjoseph92 of using combine_nested -- took a couple of stabs first using a 2 x 2 grid here:

https://gist.github.com/MikeBeller/2fd43ca9393d775b8190cce7d3be9259

But ran into a problem where the x-coordinates were not lining up between the tiles -- and so they ended up 'doubling' in the final image. I.e. the output image had twice as many x coordinates as the full-sized image would be if created directly. And you can see from the notebook it's because the coordinates don't exactly line up -- as the values on the X index have non-even intervals. Perhaps I just made some kind of simple error in my attempt? More likely, I have a feeling that getting around this issue is what @TomAugspurger was doing in his original example above, where he slices by pixel, but I am having trouble following the steps.

For what it's worth:
A second attempt with just vertical strips: https://gist.github.com/MikeBeller/cf2c479d259a01af0791a08735825474 but this one doubles in the Y direction. Bummer.

@TomAugspurger
Copy link

TomAugspurger commented Jan 3, 2022

I reran the prototype in #12 (comment) (slightly refactored to https://nbviewer.org/gist/TomAugspurger/7b4d8448c5d4ad29af036043b7527bba). Here's the performance report: https://gistcdn.githack.com/TomAugspurger/8c22e5bbcbf2974f52d10dd073a49b04/raw/afe94ae9b48181edfbe2832c9eabf84fdd6ccc34/good-median.html. As expected, there's zero communication and memory use is stable:
image

I haven't actually checked the output, and my note above said that there was an off-by-one error somewhere, but this might be an idea worth pursing a bit more. @MikeBeller are you able to check if that method works on your own use-case? Note that this is on our staging hub with and updated stackstack (0.2.2). I'll try to get that updated today on the production hub.

Edit: opened dask/dask#8526 to discuss whether Dask can do the rechunking in the I/O layer. I'd love to preserve the stackstac.stack(...).median(dim="time") spelling of this operation.

@MikeBeller
Copy link
Author

@TomAugspurger thanks for the refactored code. It works for a small area (modulo the off-by-one) but not a larger one, as you can see from the following gist: https://gist.github.com/MikeBeller/22f41bf27b5588840c6b525ad99c4fe0

(The gist is mostly just your notebook with the bbox replaced with a couple of different size options.)

At first look, the boundary between "works" and "doesn't work" seems to be when the AOI spans multiple Sentinel 2 tiles.
It wasn't immediately obvious to me how to adjust for that, and I had limited time to look in to it today, but I wanted to get you feedback in a timely manner.

@gjoseph92
Copy link

@TomAugspurger for the off-by-one error, my guess is that you're not accounting for the fact that bounds include the width of the rightmost pixel, so it's one extra step of size resolution larger than the distance between the first and last x coordinate. stackstac.array_bounds will do this for you.

The ValueError: cannot reindex or align along dimension 'time' because the index has duplicate values isn't too surprising for this method. The bigger problem is that different spatial regions will have different numbers of images within them. Taken to the extreme, if you want to mosaic the entire world, you'll obviously have a different number of scenes (and different timestamps) in Japan vs Spain. xr.concat(..., dim=y) wants the arrays to be the same shape in all dimensions except y, but in general they won' be the same shape along time until you do some operation to coerce them to a consistent number of images for every tile.

I think the better solution is to move the median into read_window. So rather than concatenating tiles of shape (T, X, Y) (where T may be different every time), you're concatenating (1, X, Y) which will work.

Also just FYI, I've been working on gjoseph92/stackstac#106 which hopefully will be an easier solution for this.

In general though @TomAugspurger I think you're onto something with this approach. The pattern of "split the AOI up into tiles, apply the same embarrassingly-parallel logic to every tile" is very common in this field because it's both conceptually simple and works for most cases. I could see value in a higher-level library that makes it easy to split the AOI into tiles, then create 1 dask task per tile, where the task itself:

  • searches the STAC API within that tile
  • makes the array (maybe with stackstac like you did), does whatever operations to it, does the computation
    The tricky part is how to make an interface that feels intuitive and easily compatible with xarray.

@MikeBeller
Copy link
Author

@gjoseph92 thanks. I'd like to point out two things that you may wish to take into consideration in building the 'easier solution' you mention:

(1) Sometimes "the embarrassingly parallel logic" is slightly more complicated than a median over time. In my case, I need to mask each band for clouds using the QA band. It might make sense if the user could supply a function to be applied to each tile to create the composite.

(2) To make the solution work for large areas of interest, it would need to still function correctly if tiles are in different UTM zones -- will the indexing calculations work if the calculation has to revert to epsg 4326 ? (or some other solution)

@jessjaco
Copy link

jessjaco commented Jan 25, 2022

Just randomly chiming in that a colleague programmed this task last year (pre-stac and pre-planetary computer but using the hls cogs from blob storage and an aks instance roughly equivalent to the PC). As for the approach here, we processed all images within a year for a single HLS tile, and saved the median of each tile individually to blob storage. I then created a VRT mosaic and transformed to a single geotiff using gdal. Yes, it was hacky, but we couldn't come up with another solution. As I recall it took a day or two to produce an annual mosaic for the conterminous U.S. I think we were using a 64 node cluster.

Anyway, the relevant xarray code is here

@gilbertocamara
Copy link

Hi @MikeBeller @jessjaco @gjoseph92 @TomAugspurger, using the R sits package we have created large scale mosaics of Sentinel-2 data in a reasonable sized VM (64 cores, 256 GB RAM). Please see the following code, that creates a 16-day time series of an yearly data set of Sentinel-2/2A images over the state of Rondonia/Brazil, which has 40 S2 tiles. The data has about the same size mentioned by @MikeBeller.

After the full data set has been created, you can merge them using GDAL.

#### parameters ####
period <- "P16D"
output_dir <- "/data/cubes/RO_20m_P16D"

if (!dir.exists(output_dir))
  dir.create(output_dir, recursive = TRUE, mode = "0775")
stopifnot(dir.exists(output_dir))

start_date <- "2020-06-01"
end_date <- "2021-09-11"

bands <- c("B02", "B03", "B04", "B05", "B08", "B8A", "B11", "B12", "CLOUD")

tile_names <- c("19LHH", "19LHJ", "20LKM", "20LKN", "20LKP", "20LLM", "20LLN",
                "20LLP", "20LLQ", "20LLR", "20LMM", "20LMN", "20LMP", "20LMQ",
                "20LMR", "20LNL", "20LNM", "20LNN", "20LNP", "20LNQ", "20LNR",
                "20LPL", "20LPM", "20LPN", "20LPP", "20LPQ", "20LQL", "20LQM",
                "20LQN", "20LRN", "20LRM", "20LRL", "20LQK", "20LPR", "20MNS",
                "20MMS", "20LKQ", "19LHK", "19LGK", "20LKR")
#### source ####
library(sits)
s2_cube_2021 <- sits_cube(
  source = "MSPC",
  collection = "SENTINEL-2-L2A",
  bands = bands,
  tiles = tile_names,
  start_date = start_date,
  end_date = end_date,
  multicores = 32
)

# regularize and create a time series of two month mosaics
reg_cube <- sits_regularize(
        cube = s2_cube_2021,
        period = period,
        res = 20,
        output_dir = output_dir,
        multicores = 10,
        memsize = 128
)

The R code above uses the gdalcubes R package. Our experience is that, even though the Azure VM has 64 cores, we got a better result using 10 cores. We think the reason is because there might be an I/O limit set by the Planetary Computer back-end. In other words, more cores means more I/O requests, which result in blocking by the back-end. In the above case, we obtained a 237,000 km2 data cube with 24 instances in 24 hours.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants