In [1]:
#%load_ext memory_profiler

import numpy
import rasterstats
import pandas, geopandas
import fsspec
import rasterio, rasterio.mask
from shapely.geometry import box

## Single-thread task

In [2]:
mosaic_path = "../../data/ghs_composite_s2/GHS-composite-S2.vrt"
tst_path = "../../data/tessellation/tess_0.pq"

In [3]:
def ndvi_for_chunk(chunk_path, r_path=mosaic_path):
    # Read vectors
    with fsspec.open(chunk_path) as p:
        chunk = geopandas.read_parquet(p,
                                       columns=["hindex", "tessellation"]
                                      )
    # Calculate NDVI
    with rasterio.open(mosaic_path) as src:
        img, transform = rasterio.mask.mask(src, 
                                            [box(*chunk.total_bounds)],
                                            crop=True
                                           )
        meta = src.meta
    ndvi = (img[3] - img[0]) / (img[3] + img[0])
    ndvi[numpy.where(img[0] == meta["nodata"])] = numpy.nan
    # Transfer NDVI to vector
    stats = rasterstats.zonal_stats(chunk,
                                    ndvi,
                                    affine=transform,
                                    stats=["mean"],
                                    all_touched=True,
                                    nodata=numpy.nan
                                   )
    return pandas.DataFrame(stats, index=chunk["hindex"])

Test:

In [28]:
%%time
st = ndvi_for_chunk("../../data/tessellation/tess_83.pq", mosaic_path)

  ndvi = (img[3] - img[0]) / (img[3] + img[0])


CPU times: user 3min 5s, sys: 8.18 s, total: 3min 14s
Wall time: 3min 14s


3min 24s

## Parallelisation with `dask`

In [4]:
from glob import glob
import dask.bag as db
from dask.distributed import Client, LocalCluster

cluster = LocalCluster(
    n_workers=6, 
    threads_per_worker=1,
)
client = Client(cluster)

We'll put the chunks on a bag to be run in parallel:

In [5]:
chunks = glob("../../data/tessellation/*.pq")

bag = db.from_sequence(chunks[:16])
out = bag.map(ndvi_for_chunk)

In [6]:
%%time
ndvi = pandas.concat(out.compute())



KilledWorker: ("('ndvi_for_chunk-fb683ab2fa7dd170f16f7c770a09fb46', 13)", <Worker 'tcp://127.0.0.1:38747', name: 0, memory: 0, processing: 1>)

To do:

- Read window w/ rasterio and calculate ndvi inside the pod's task
- Deal with no data cells
- Parallelise in Dask