In [None]:
import logging
from pathlib import Path

import numpy as np
import rasterio

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s %(levelname)s %(name)s %(message)s",
)

In [None]:
index_calibration = 10000
study_area = "CORDILLERA BLANCA"  # "CORDILLERA BLANCA" or "CORDILLERA VILCABAMBA"

image_ids = [
    "S2A_MSIL2A_20160728T152642_N0211_R025_T17LRL_20240315T232844",
    "S2A_MSIL2A_20160728T152642_N0211_R025_T18LTQ_20240316T000332",
    "S2A_MSIL2A_20161115T152642_N0211_R025_T17LRL_20240316T010702",
    "S2A_MSIL2A_20161115T152642_N0211_R025_T18LTQ_20240316T013615",
    "S2A_MSIL2A_20161115T152642_N0211_R025_T18LTQ_20240316T021019"
]

In [None]:
def calculate_ndsi(b3_path, b11_path, out_path):
    with rasterio.open(b3_path) as b3_src:
        b3 = b3_src.read(1)
        b3_meta = b3_src.meta
        no_data = b3_src.meta["nodata"]

    with rasterio.open(b11_path) as b11_src:
        b11 = b11_src.read(1)

    if b3.shape != b11.shape:
        if b3.shape[0] > b11.shape[0]:
            b3 = b3[: b11.shape[0], :]
            b3 = b3[:, : b11.shape[1]]
        else:
            b11 = b11[: b3.shape[0], :]
            b11 = b11[:, : b3.shape[1]]

    b3 = b3.astype(np.float32) / index_calibration
    b11 = b11.astype(np.float32) / index_calibration

    ndsi = (b3 - b11) / np.where(b3 + b11 == 0, np.nan, b3 + b11)
    ndsi = np.where(np.isnan(ndsi), no_data, ndsi)

    ndsi_meta = b3_meta.copy()
    ndsi_meta.update(count=1, dtype=rasterio.float32, nodata=no_data)

    with rasterio.open(out_path, "w", **ndsi_meta) as dst:
        dst.write(ndsi, 1)

In [None]:
def calculate_ndvi(b4_path, b8_path, out_path):
    with rasterio.open(b4_path) as b4_src:
        b4 = b4_src.read(1)
        b4_meta = b4_src.meta
        no_data = b4_src.meta["nodata"]

    with rasterio.open(b8_path) as b8_src:
        b8 = b8_src.read(1)

    if b4.shape != b8.shape:
        if b4.shape[0] > b8.shape[0]:
            b4 = b4[: b8.shape[0], :]
            b4 = b4[:, : b8.shape[1]]
        else:
            b8 = b8[: b4.shape[0], :]
            b8 = b8[:, : b4.shape[1]]

    b4 = b4.astype(np.float32) / index_calibration
    b8 = b8.astype(np.float32) / index_calibration

    ndvi = (b8 - b4) / np.where(b8 + b4 == 0, np.nan, b8 + b4)
    ndvi = np.where(np.isnan(ndvi), no_data, ndvi)

    ndvi_meta = b4_meta.copy()
    ndvi_meta.update(count=1, dtype=rasterio.float32, nodata=no_data)

    with rasterio.open(out_path, "w", **ndvi_meta) as dst:
        dst.write(ndvi, 1)

In [None]:
def calculate_ndwi(b3_path, b8_path, out_path):
    with rasterio.open(b3_path) as b3_src:
        b3 = b3_src.read(1)
        b3_meta = b3_src.meta
        no_data = b3_src.meta["nodata"]

    with rasterio.open(b8_path) as b8_src:
        b8 = b8_src.read(1)

    if b3.shape != b8.shape:
        if b3.shape[0] > b8.shape[0]:
            b3 = b3[: b8.shape[0], :]
            b3 = b3[:, : b8.shape[1]]
        else:
            b8 = b8[: b3.shape[0], :]
            b8 = b8[:, : b3.shape[1]]

    b3 = b3.astype(np.float32) / index_calibration
    b8 = b8.astype(np.float32) / index_calibration

    ndwi = (b3 - b8) / np.where(b3 + b8 == 0, np.nan, b3 + b8)
    ndwi = np.where(np.isnan(ndwi), no_data, ndwi)

    ndwi_meta = b3_meta.copy()
    ndwi_meta.update(count=1, dtype=rasterio.float32, nodata=no_data)

    with rasterio.open(out_path, "w", **ndwi_meta) as dst:
        dst.write(ndwi, 1)

In [None]:
for image_id in image_ids:
    input_path = Path(f"data/{study_area}/{image_id}")
    output_path = Path(f"results/indices/{study_area}/{image_id}")

    output_path.mkdir(parents=True, exist_ok=True)

    images = list(input_path.glob(f"**/*.tif"))

    _b3_path = [str(image) for image in images if "B03" in image.name][0]
    _b4_path = [str(image) for image in images if "B04" in image.name][0]
    _b8_path = [str(image) for image in images if "B08" in image.name][0]
    _b11_path = [str(image) for image in images if "B11" in image.name][0]

    ndsi_path = output_path / f"{image_id}_NDSI.tif"
    ndwi_path = output_path / f"{image_id}_NDWI.tif"
    ndvi_path = output_path / f"{image_id}_NDVI.tif"

    calculate_ndsi(_b3_path, _b11_path, ndsi_path)
    calculate_ndvi(_b4_path, _b8_path, ndvi_path)
    calculate_ndwi(_b3_path, _b8_path, ndwi_path)

logging.info("Done!")