# Reading rasters with rasterio


# Downscaling rasters thanks to dask

In this notebook we will look at a concrete case of data aggregation, with the example of changing the resolution using dask.

Dask is a Python library that enables calculations to be parallelized and large quantities of data to be handled in a scalable way, using available resources (CPU, memory, etc.). Unlike tools such as Pandas or NumPy, Dask allows you to work on data sets that exceed the available RAM memory by chunking the data into smaller pieces (chunks) and parallelizing calculations.
Dask works with lazy computation: instead of executing immediately, it builds a task graph. Calculations are only executed when the final result is explicitly requested (for example, by calling .compute()).

Dask offers a wide range of [modules](https://docs.dask.org/en/stable/#how-to-use-dask) (dask dataframe...) that can be used to distribute calculations. In this notebook we will focus on the use of [dask-arrays](https://docs.dask.org/en/stable/array.html) to manipulate raster data.

In order to change the resolution, we will look at two possible methods. One of them will prove less effective than the other, which will allow us to establish some general principles to follow for optimal use of Dask. The first one will use [map_blocks](https://docs.dask.org/en/stable/generated/dask.array.map_blocks.html) and the second one only [dask.array.mean](https://docs.dask.org/en/stable/generated/dask.array.mean.html)

For this tutorial, we will use a Sentinel-2 acquisition stored on the local disk. We will target the storage directory under the variable ``sentinel_2_dir``. Users can modify this directory and the associated paths.

## Python scripts

### Import libraries

First let's import libraries needed for this tutorial and create our dask [LocalCluster](https://docs.dask.org/en/stable/deploying-python.html#localcluster) which allow us to create workers and use [dask's dashboard](https://docs.dask.org/en/latest/dashboard.html).


In [53]:
from pathlib import Path

from typing import List, Tuple, Union, Dict
import dask.array as da
import numpy as np
import rasterio
import rioxarray as rxr
from dask import delayed
from dask.distributed import Client, LocalCluster
from rasterio.transform import Affine

from utils import create_map_with_rasters

cluster = LocalCluster()
client = Client(cluster)

print("Dask Dashboard: ", client.dashboard_link)
client

Perhaps you already have a cluster running?
Hosting the HTTP server on port 37043 instead


Dask Dashboard:  http://127.0.0.1:37043/status


0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:37043/status,

0,1
Dashboard: http://127.0.0.1:37043/status,Workers: 4
Total threads: 16,Total memory: 30.63 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:41837,Workers: 4
Dashboard: http://127.0.0.1:37043/status,Total threads: 16
Started: Just now,Total memory: 30.63 GiB

0,1
Comm: tcp://127.0.0.1:39503,Total threads: 4
Dashboard: http://127.0.0.1:40587/status,Memory: 7.66 GiB
Nanny: tcp://127.0.0.1:42947,
Local directory: /tmp/dask-scratch-space/worker-3of2mpm6,Local directory: /tmp/dask-scratch-space/worker-3of2mpm6

0,1
Comm: tcp://127.0.0.1:33637,Total threads: 4
Dashboard: http://127.0.0.1:46855/status,Memory: 7.66 GiB
Nanny: tcp://127.0.0.1:34317,
Local directory: /tmp/dask-scratch-space/worker-5wbyfhxh,Local directory: /tmp/dask-scratch-space/worker-5wbyfhxh

0,1
Comm: tcp://127.0.0.1:40539,Total threads: 4
Dashboard: http://127.0.0.1:45731/status,Memory: 7.66 GiB
Nanny: tcp://127.0.0.1:37707,
Local directory: /tmp/dask-scratch-space/worker-_e6spiif,Local directory: /tmp/dask-scratch-space/worker-_e6spiif

0,1
Comm: tcp://127.0.0.1:32929,Total threads: 4
Dashboard: http://127.0.0.1:38331/status,Memory: 7.66 GiB
Nanny: tcp://127.0.0.1:46609,
Local directory: /tmp/dask-scratch-space/worker-u77lvci7,Local directory: /tmp/dask-scratch-space/worker-u77lvci7


Dask return an url where the dashboard is availaible (usually http://127.0.0.1:8787/status). This is not a tutorial on how to use this dashboard, but we recommend using it in a separate window while using this notebook.

### Open raster thanks to rioxarray

Here we are going to open the raster data required for this tutorial, the RGB bands from a Sentinel-2 acquisition. To do this, we're going to use rioxarray and, more specifically, the [open_rasterio](https://corteva.github.io/rioxarray/html/rioxarray.html#rioxarray-open-rasterio) method, which opens the images lazily (without loading data into memory) and returns a `dask.array` object. 
From this method we will use the ``chunks`` and ``lock`` arguments, which respectively set a chunk size and limit access to the data to one thread at a time to avoid read problems. Here ``chunks`` is set to ``True`` to allow dask to automatically size chunks.


## Calculation of the Average NDVI with dask

In this example, we will use what we have learned to: 

1. Read the data from the disk and stack them.
2. Calculate the associated NDVI, which combines multi-band information into a single band.
3. Reduce the information by calculating the average NDVI within a window.
4. Write the resulting image to the disk.

First, let's read the data we need to perform the NDVI.

In [74]:
def open_raster_and_get_metadata(raster_paths: List[str], chunks: Union[int, Tuple, Dict, None]):
    """
    Opens multiple raster files, extracts shared geospatial metadata, 
    and returns the concatenated data along with resolution and CRS info.

    Parameters:
    -----------
    raster_paths : List[str]
        Paths to the raster files.
    chunks : Union[int, Tuple, Dict, bool, None]
        Chunk sizes for Dask (bands, height, width).

    Returns:
    --------
    Tuple[dask.array.Array, float, float, float, float, Union[str, CRS]]
        Concatenated raster data, x and y resolution, top-left coordinates, and CRS.
    """
    results = []
    for raster_path in raster_paths:
        with rxr.open_rasterio(raster_path, chunks=chunks, lock=True) as tif:
            reprojection = tif
            transform = reprojection.rio.transform()
            crs = reprojection.rio.crs
            x_res = transform[0]
            y_res = -transform[4]
            top_left_x = transform[2]
            top_left_y = transform[5]
            results.append(reprojection)

    return da.concatenate(results), x_res, y_res, top_left_x, top_left_y, crs

# Paths to RGB Sentinel-2 bands
sentinel_2_dir = "/home/tromain/Data/S2"
s2_b4 = f"{sentinel_2_dir}/SENTINEL2B_20240822-105857-973_L2A_T31TCJ_C_V3-1/SENTINEL2B_20240822-105857-973_L2A_T31TCJ_C_V3-1_FRE_B4.tif"
s2_b8 = f"{sentinel_2_dir}/SENTINEL2B_20240822-105857-973_L2A_T31TCJ_C_V3-1/SENTINEL2B_20240822-105857-973_L2A_T31TCJ_C_V3-1_FRE_B8.tif"
#reading_chunks = (-1,2200,2200)
reading_chunks = True
input_data_array, x_res, y_res, top_left_x, top_left_y, crs = open_raster_and_get_metadata([s2_b4,s2_b8], reading_chunks)

When the data is read, we can express the NDVI calculation as if it were a numpy array. We add ``[None, :, :]`` to keep the shape as ``(bands, rows, cols)``. Then we can apply reduction on the dask.array and use ``compute()`` on it to triger the computation.

In [75]:
input_data_array

Unnamed: 0,Array,Chunk
Bytes,459.90 MiB,127.98 MiB
Shape,"(2, 10980, 10980)","(1, 6111, 10980)"
Dask graph,4 chunks in 5 graph layers,4 chunks in 5 graph layers
Data type,int16 numpy.ndarray,int16 numpy.ndarray
"Array Chunk Bytes 459.90 MiB 127.98 MiB Shape (2, 10980, 10980) (1, 6111, 10980) Dask graph 4 chunks in 5 graph layers Data type int16 numpy.ndarray",10980  10980  2,

Unnamed: 0,Array,Chunk
Bytes,459.90 MiB,127.98 MiB
Shape,"(2, 10980, 10980)","(1, 6111, 10980)"
Dask graph,4 chunks in 5 graph layers,4 chunks in 5 graph layers
Data type,int16 numpy.ndarray,int16 numpy.ndarray


In [76]:
print(input_data_array.shape)
ndvi_array = (input_data_array[1] - input_data_array[0]) / (input_data_array[1] + input_data_array[0])[None, :, :]

(2, 10980, 10980)


In [77]:
%%time

mean_ndvi = ndvi_array.compute()

CPU times: user 469 ms, sys: 1.01 s, total: 1.48 s
Wall time: 2.1 s


In [79]:
def create_raster(data: np.ndarray, output_file: Path, x_res, y_res, top_left_x, top_left_y, crs):
    transform = Affine.translation(top_left_x, top_left_y) * Affine.scale(x_res, -y_res)
    with rasterio.open(
            output_file, "w",
            driver="GTiff",
            height=data.shape[1],
            width=data.shape[2],
            count=data.shape[0],
            dtype=data.dtype,
            crs=crs,
            transform=transform
    ) as dst:
        dst.write(data)

crs="EPSG:4326"
output_file = Path("/home/tromain/Data/S2/ndvi_dask.tif")
create_raster(ndvi_array, output_file, x_res , y_res,
                  top_left_x, top_left_y, crs)

# Calculate NDVI With OTB in python

In [8]:
import otbApplication as otb

sentinel_2_dir = "/work/scratch/data/romaint"
s2_b4 = f"{sentinel_2_dir}/SENTINEL2B_20240822-105857-973_L2A_T31TCJ_C_V3-1/SENTINEL2B_20240822-105857-973_L2A_T31TCJ_C_V3-1_FRE_B4.tif"
s2_b8 = f"{sentinel_2_dir}/SENTINEL2B_20240822-105857-973_L2A_T31TCJ_C_V3-1/SENTINEL2B_20240822-105857-973_L2A_T31TCJ_C_V3-1_FRE_B8.tif"
out_ndvi_otb_py="/work/scratch/data/romaint/img_ndvi_otb.tif"
#Compute NDVI with OTB in python
app_ndvi_otb = otb.Registry.CreateApplication("BandMath")
app_ndvi_otb.SetParameterStringList("il",[s2_b4,s2_b8])
app_ndvi_otb.SetParameterString("exp","(im2b1-im1b1)/(im2b1+im1b1)")
app_ndvi_otb.SetParameterString("out",out_ndvi_otb_py)
app_ndvi_otb.ExecuteAndWriteOutput()




Writing /work/scratch/data/romaint/img_ndvi_otb.tif...: 100% [**************************************************] (3s)


0

# Calculate NDVI with OTB in C++
This part will call BandMath with the otb CLI to compare performances with the python swig interface

In [9]:
%%bash
otbcli_BandMath -il "/work/scratch/data/romaint/SENTINEL2B_20240822-105857-973_L2A_T31TCJ_C_V3-1/SENTINEL2B_20240822-105857-973_L2A_T31TCJ_C_V3-1_FRE_B4.tif" "/work/scratch/data/romaint/SENTINEL2B_20240822-105857-973_L2A_T31TCJ_C_V3-1/SENTINEL2B_20240822-105857-973_L2A_T31TCJ_C_V3-1_FRE_B8.tif" -exp "( im2b1 - im1b1 ) / ( im2b1 + im1b1 )" -out "/work/scratch/data/romaint/img_ndvi_otb_cpp.tif" 



Writing /work/scratch/data/romaint/img_ndvi_otb_cpp.tif...: 100% [**************************************************] (2s)


# Compute SuperImpose with OTB

Superimpose does a resampling then a crop to have a new raster that has the same resolution as the reference input image. This app is using multithreading a lot, it is interesting to compare it with a solution like dask and full python / rasterio

In [10]:
import otbApplication as otb

#product_dir = "/work/scratch/data/romaint"
pan_raster_url = "/work/scratch/data/tanguyy/public/PHR_OTB/IMG_PHR1B_P_001/IMG_PHR1B_P_202302281104151_SEN_6967639101-1_R1C1.JP2"
xs_raster_url = "/work/scratch/data/tanguyy/public/PHR_OTB/IMG_PHR1B_MS_004/IMG_PHR1B_MS_202302281104151_SEN_6967639101-2_R1C1.JP2"
appSI = otb.Registry.CreateApplication("Superimpose")

appSI.SetParameterString("inr", pan_raster_url)
appSI.SetParameterString("inm", xs_raster_url)
appSI.SetParameterString("out", "/work/scratch/data/romaint/SuperimposedXS_to_PAN.tif")

appSI.ExecuteAndWriteOutput()





Writing /work/scratch/data/romaint/SuperimposedXS_to_PAN.tif?&writerpctags=true...: 100% [**************************************************] (2m 59s)


0

# Performance improvement with xarray.coarsen method ?

In [20]:
%%time

import xarray
from rioxarray.merge import merge_arrays

pan_raster_url = "/home/tromain/Data/phr_pan.tif"
xs_raster_url = "/home/tromain/Data/phr_xs.tif"
origin_raster = rxr.open_rasterio(pan_raster_url,chunks=True)
#.squeeze('band', drop=True)
print(origin_raster)
#origin_raster = origin_raster.load()
pxs_raster = origin_raster.coarsen(x=4, y=4, boundary='pad').mean()
xs_raster = rxr.open_rasterio(xs_raster_url,chunks=True)
print(pxs_raster)
#print("X == X? {} Y==Y? {}".format((origin_raster.x == xs_raster.x),(origin_raster.y == xs_raster.y)))
print(all(list(origin_raster.x == xs_raster.x)))
#ppxs_raster = xarray.concat([pxs_raster,xs_raster],dim="band",coords="all")
#ppxs_raster = merge_arrays([pxs_raster,xs_raster])
crs="EPSG:4326"
output_file = Path("/home/tromain/Data/superimpose_coarsen.tif")
pxs_raster.rio.to_raster(output_file)

<xarray.DataArray (band: 1, y: 23796, x: 31280)> Size: 3GB
dask.array<open_rasterio-f5d1704f3630819f1bebdb57e7c103c4<this-array>, shape=(1, 23796, 31280), dtype=float32, chunksize=(1, 1072, 31280), chunktype=numpy.ndarray>
Coordinates:
  * band         (band) int64 8B 1
  * x            (x) float64 250kB 0.5 1.5 2.5 ... 3.128e+04 3.128e+04 3.128e+04
  * y            (y) float64 190kB 0.5 1.5 2.5 ... 2.379e+04 2.379e+04 2.38e+04
    spatial_ref  int64 8B 0
Attributes: (12/33)
    METADATATYPE:         OTB
    OTB_VERSION:          9.1.0
    TimeRangeStart:       2023-02-28T11:04:15.2040000Z
    ProductionDate:       2024-04-15T13:25:33.956Z
    AcquisitionDate:      2023-02-28T11:04:15.1Z
    SatAzimuth:           202.389
    ...                   ...
    PhysicalBias:         0
    PhysicalGain:         11.71
    EnhancedBandName:     PAN
    BandName:             P
    scale_factor:         1.0
    add_offset:           0.0
<xarray.DataArray (band: 1, y: 5949, x: 7820)> Size: 186MB
da

# SuperImpose using rasterio reproject_match

In [None]:
%%time

import xarray
from pathlib import Path

pan_raster_url = "/home/tromain/Data/PHR_OTB/IMG_PHR1B_P_001/IMG_PHR1B_P_202302281104151_SEN_6967639101-1_R1C1.JP2"
xs_raster_url = "/home/tromain/Data/PHR_OTB/IMG_PHR1B_MS_004/IMG_PHR1B_MS_202302281104151_SEN_6967639101-2_R1C1.JP2"

xds = xarray.open_dataarray(pan_raster_url,chunks=True)
xds.rio.write_crs("epsg:4326", inplace=True)
xds.rio.write_nodata(0, inplace=True)
xds_match = xarray.open_dataarray(xs_raster_url,chunks=True)
print(xds.shape)
print(xds.dtype)
print(xds_match.shape)
xds_match.rio.write_crs("epsg:4326", inplace=True)
xds_match.rio.write_nodata(0, inplace=True)
xds_repr_match = xds.rio.reproject_match(xds_match)
crs="EPSG:4326"
output_file = Path("/home/tromain/Data/superimpose_reproject_match.tif")
xds_repr_match.rio.to_raster(output_file)

  var_chunks = _get_chunk(var, chunks, chunkmanager)
  var_chunks = _get_chunk(var, chunks, chunkmanager)
ERROR 1: PROJ: internal_proj_create_from_database: /home/tromain/Apps/miniconda3/envs/env_greenit/otb9/share/proj/proj.db contains DATABASE.LAYOUT.VERSION.MINOR = 2 whereas a number >= 3 is expected. It comes from another PROJ installation.
  var_chunks = _get_chunk(var, chunks, chunkmanager)
  var_chunks = _get_chunk(var, chunks, chunkmanager)
ERROR 1: PROJ: internal_proj_create_from_database: /home/tromain/Apps/miniconda3/envs/env_greenit/otb9/share/proj/proj.db contains DATABASE.LAYOUT.VERSION.MINOR = 2 whereas a number >= 3 is expected. It comes from another PROJ installation.


(1, 23796, 31283)
float32
(4, 5949, 7822)


# Using numba for NDVI ?
Here we will try to use numba for ndvi computation

In [81]:
%%time

#from numba import jit,njit
from xarray import DataArray 
from typing import List, Tuple, Union, Dict
import rioxarray as rxr
import numpy as np
import time
import rasterio

@njit
def one_pixel_ndvi(p1,p2):
    return (p2-p1) / (p2+p1) 

@njit
def compute_ndvi(input_data_1: np.ndarray,input_data_2: np.ndarray):
    ndvi_array = [[one_pixel_ndvi(i,j) for i in input_data_1] for j in input_data_2]
    return ndvi_array

def compute_ndvi_dask(input_data_1: DataArray,input_data_2: DataArray):
    ndvi_array = [[(j-i)/(i+j) for i in input_data_1] for j in input_data_2]
    return ndvi_array

sentinel_2_dir = "/home/tromain/Data/S2"
s2_b4 = f"{sentinel_2_dir}/SENTINEL2B_20240822-105857-973_L2A_T31TCJ_C_V3-1/SENTINEL2B_20240822-105857-973_L2A_T31TCJ_C_V3-1_FRE_B4.tif"
s2_b8 = f"{sentinel_2_dir}/SENTINEL2B_20240822-105857-973_L2A_T31TCJ_C_V3-1/SENTINEL2B_20240822-105857-973_L2A_T31TCJ_C_V3-1_FRE_B8.tif"

with rasterio.open(s2_b4, 'r') as ds:
    input_data_b4 = ds.read() 

with rasterio.open(s2_b8, 'r') as ds:
    input_data_b8 = ds.read()

start = time.perf_counter()
ndvi_computed = compute_ndvi(input_data_b4,input_data_b8)
end = time.perf_counter()
print("Elapsed with Raster IO = {}s".format((end - start)))
# Example with xarray
#input_data_b4 =  xarray.open_dataarray(s2_b4,chunks=False)
#input_data_b8 =  xarray.open_dataarray(s2_b8,chunks=False)
#ndvi_computed = compute_ndvi(input_data_b4.to_numpy(),input_data_b8.to_numpy())

# Example with rioxarray 
input_data_b4 = rxr.open_rasterio(s2_b4,chunks=True)
input_data_b8 = rxr.open_rasterio(s2_b8,chunks=True)
start = time.perf_counter()
ndvi_computed = compute_ndvi_dask(input_data_b4,input_data_b8)
end = time.perf_counter()

print("Elapsed with RIOXarray + dask = {}s".format((end - start)))

start = time.perf_counter()
ndvi_array = [[(j-i)/(i+j) for i in input_data_b4] for j in input_data_b8]
end = time.perf_counter()
print("Elapsed with list comprehension = {}s".format((end - start)))
start = time.perf_counter()
ndvi_array = (input_data_array[1] - input_data_array[0]) / (input_data_array[1] + input_data_array[0])[None, :, :]
end = time.perf_counter()
print("Elapsed with normal compute = {}s".format((end - start)))

crs="EPSG:4326"
output_file = Path("/home/tromain/Data/S2/ndvi_dask_optimized.tif")
create_raster(ndvi_computed, output_file, x_res , y_res,
                  top_left_x, top_left_y, crs)

Elapsed with Raster IO = 1.1344427589974657s
Elapsed with RIOXarray + dask = 0.027490641001350014s
Elapsed with list comprehension = 0.0035700430016731843s
Elapsed with normal compute = 0.0015671629989810754s


AttributeError: 'list' object has no attribute 'shape'

# Performances multiprocessing vs multithreading

Depending on the processing applied to an image it is interesting to use the multiprocessing approach instead of the multithreading which is not really a multithreading in python as every thread is executed one after the other. (in Python 3.13, the GIL has been reworked to improve the situation)

In [None]:
import multiprocessing


# Optimizing the size of your data

In [None]:
using COG ?