In [1]:
from dask_gateway import GatewayCluster
import dask.distributed
import dask.utils
import dask.array
import dask
import planetary_computer
from pystac_client import Client
import odc.stac
import numpy
import xarray
import rasterio
import rasterio.enums
import gc
import math
import os
import json
from azure.storage.blob import BlobClient

In [2]:
def get_img_metadata(img_file):
    img_data_obj = rasterio.open(img_file)
    img_bounds = img_data_obj.bounds
    img_bbox = [img_bounds.left, img_bounds.bottom, img_bounds.right, img_bounds.top]
    img_x_res, img_y_res  = img_data_obj.res
    if img_y_res > 0:
        img_y_res = img_y_res * (-1)
    img_data_obj = None
    return img_bbox, img_x_res, img_y_res

def get_img_band_array(img_file, band=1):
    img_data_obj = rasterio.open(img_file)
    img_arr = img_data_obj.read(band)
    img_data_obj = None
    return img_arr

In [3]:
def expand_ls_qa_pixel_msks(scn_xa, qa_pxl_msk="qa_pixel"):
    scn_lcl_xa = scn_xa.copy()
    unq_img_vals = numpy.unique(numpy.squeeze(scn_xa[qa_pxl_msk].values))

    fill_da = scn_xa[qa_pxl_msk].copy()
    fill_da[...] = 0
    fill_da = fill_da.astype(numpy.uint8)

    dilated_clouds_da = scn_xa[qa_pxl_msk].copy()
    dilated_clouds_da[...] = 0
    dilated_clouds_da = dilated_clouds_da.astype(numpy.uint8)

    cirrus_da = scn_xa[qa_pxl_msk].copy()
    cirrus_da[...] = 0
    cirrus_da = cirrus_da.astype(numpy.uint8)

    clouds_da = scn_xa[qa_pxl_msk].copy()
    clouds_da[...] = 0
    clouds_da = clouds_da.astype(numpy.uint8)

    cloud_shadows_da = scn_xa[qa_pxl_msk].copy()
    cloud_shadows_da[...] = 0
    cloud_shadows_da = cloud_shadows_da.astype(numpy.uint8)

    snow_da = scn_xa[qa_pxl_msk].copy()
    snow_da[...] = 0
    snow_da = snow_da.astype(numpy.uint8)

    clear_da = scn_xa[qa_pxl_msk].copy()
    clear_da[...] = 0
    clear_da = clear_da.astype(numpy.uint8)

    water_da = scn_xa[qa_pxl_msk].copy()
    water_da[...] = 0
    water_da = water_da.astype(numpy.uint8)

    all_clouds_da = scn_xa[qa_pxl_msk].copy()
    all_clouds_da[...] = 0
    all_clouds_da = all_clouds_da.astype(numpy.uint8)

    for val in unq_img_vals:
        val_bin = numpy.flip(
            numpy.unpackbits(numpy.flip(numpy.array([val]).view(numpy.uint8)))
        )

        if val_bin[0] == 1:
            fill_da.values[scn_xa[qa_pxl_msk].values == val] = 1
        if val_bin[1] == 1:
            dilated_clouds_da.values[scn_xa[qa_pxl_msk].values == val] = 1
        if val_bin[2] == 1:
            cirrus_da.values[scn_xa[qa_pxl_msk].values == val] = 1
        if val_bin[3] == 1:
            clouds_da.values[scn_xa[qa_pxl_msk].values == val] = 1
        if val_bin[4] == 1:
            cloud_shadows_da.values[scn_xa[qa_pxl_msk].values == val] = 1
        if val_bin[5] == 1:
            snow_da.values[scn_xa[qa_pxl_msk].values == val] = 1
        if val_bin[6] == 1:
            clear_da.values[scn_xa[qa_pxl_msk].values == val] = 1
        if val_bin[7] == 1:
            water_da.values[scn_xa[qa_pxl_msk].values == val] = 1
        if (
            (val_bin[1] == 1)
            or (val_bin[2] == 1)
            or (val_bin[3] == 1)
            or (val_bin[4] == 1)
        ):
            all_clouds_da.values[scn_xa[qa_pxl_msk].values == val] = 1

    scn_lcl_xa["FILL"] = fill_da
    scn_lcl_xa["DILATED_CLOUDS"] = dilated_clouds_da
    scn_lcl_xa["CIRRUS"] = cirrus_da
    scn_lcl_xa["CLOUDS"] = clouds_da
    scn_lcl_xa["CLOUD_SHADOWS"] = cloud_shadows_da
    scn_lcl_xa["SNOW"] = snow_da
    scn_lcl_xa["CLEAR"] = clear_da
    scn_lcl_xa["WATER"] = water_da
    scn_lcl_xa["ALL_CLOUDS"] = all_clouds_da
    return scn_lcl_xa

In [4]:
def apply_cloud_msk(scns_xa, bands, clouds_var="ALL_CLOUDS", fill_var="FILL"):
    scns_lcl_xa = scns_xa.copy()
    for band in bands:
        scns_lcl_xa[band].values[scns_lcl_xa[clouds_var].values == 1] = 0.0
        scns_lcl_xa[band].values[scns_lcl_xa[fill_var].values == 1] = 0.0
    return scns_lcl_xa

In [5]:
def read_json_to_dict(input_file: str):
    """
    Read a JSON file. Will return a list or dict.

    :param input_file: input JSON file path.

    """
    with open(input_file) as f:
        data = json.load(f)
    return data

In [6]:
cluster = GatewayCluster()  # Creates the Dask Scheduler. Might take a minute.
cluster.adapt(minimum=4, maximum=24)
print(cluster.dashboard_link)

client = dask.distributed.Client(cluster, timeout=10)
odc.stac.configure_rio(cloud_defaults=True, client=client)

https://pccompute.westeurope.cloudapp.azure.com/compute/services/dask-gateway/clusters/prod.f7e248b9f05f4ed084f9bf558f139813/status


2024-05-08 10:13:51,781 - distributed.client - ERROR - Failed to reconnect to scheduler after 10.00 seconds, closing client


In [7]:
catalog = Client.open("https://planetarycomputer.microsoft.com/api/stac/v1")

In [8]:
# Date range of the ROI
time_range = "2020-01-01/2020-02-28"
date_str = "2020"
# Bands to be read
bands = ["red", "nir08", "swir16", "qa_pixel"]

In [9]:
tiles_lut_file = "/home/jovyan/gmw_v4_change_layers/04_create_base_chng_rngs/06_create_2020_baseline/03_get_tile_lst/vld_tile_lst.json"
tile_img_dir = "/home/jovyan/gmw_v4_change_layers/99_plantary_comp_dev/99_combined_coastal_msk_roi/combined_coastal_msk_roi/"

tiles_lut = read_json_to_dict(tiles_lut_file)
tiles_lut = {"GMW_N00E117": tiles_lut["GMW_N00E117"]}
tiles_name_lst = list(tiles_lut.keys())

n_tiles = len(tiles_name_lst)
print(f"Number of tiles: {n_tiles}")

Number of tiles: 1


In [10]:
gmw_tile = tiles_name_lst[0]
print(f"Processing {gmw_tile}")
gmw_tile_img = os.path.join(tile_img_dir, f"{gmw_tile}_comb_coastal_roi_msk.kea")
tilename = gmw_tile.replace("GMW_", "")



Processing GMW_N00E117


In [11]:
# Get the bbox and image resolution of the input image.
bbox, img_x_res, img_y_res = get_img_metadata(gmw_tile_img)

# Read the GMW Coastal extent into a numpy array
gmw_msk_arr = get_img_band_array(gmw_tile_img)

In [12]:
gmw_msk_arr.shape

(4500, 4500)

In [13]:
search = catalog.search(collections=["landsat-c2-l2"], bbox=bbox, datetime=time_range, query={"eo:cloud_cover": {"lt": 50}},)
items = search.get_all_items()
n_items = len(items)
print(f"\tN Scenes: {n_items}")

	N Scenes: 6




In [14]:
signed_items = [planetary_computer.sign(item) for item in items]

In [15]:
# Read the data into dask xarray structure
ls_scn_xa = odc.stac.stac_load(
    signed_items,
    bands=bands,
    groupby="solar_day",
    #dtype=numpy.uint16,
    chunks={"time":2, "latitude": 512, "longitude": 512},
    bbox=bbox,
    crs="EPSG:4326",
    resolution=img_x_res
)

In [16]:
ls_scn_xa

Unnamed: 0,Array,Chunk
Bytes,154.50 MiB,1.00 MiB
Shape,"(4, 4500, 4500)","(2, 512, 512)"
Dask graph,162 chunks in 1 graph layer,162 chunks in 1 graph layer
Data type,uint16 numpy.ndarray,uint16 numpy.ndarray
"Array Chunk Bytes 154.50 MiB 1.00 MiB Shape (4, 4500, 4500) (2, 512, 512) Dask graph 162 chunks in 1 graph layer Data type uint16 numpy.ndarray",4500  4500  4,

Unnamed: 0,Array,Chunk
Bytes,154.50 MiB,1.00 MiB
Shape,"(4, 4500, 4500)","(2, 512, 512)"
Dask graph,162 chunks in 1 graph layer,162 chunks in 1 graph layer
Data type,uint16 numpy.ndarray,uint16 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,154.50 MiB,1.00 MiB
Shape,"(4, 4500, 4500)","(2, 512, 512)"
Dask graph,162 chunks in 1 graph layer,162 chunks in 1 graph layer
Data type,uint16 numpy.ndarray,uint16 numpy.ndarray
"Array Chunk Bytes 154.50 MiB 1.00 MiB Shape (4, 4500, 4500) (2, 512, 512) Dask graph 162 chunks in 1 graph layer Data type uint16 numpy.ndarray",4500  4500  4,

Unnamed: 0,Array,Chunk
Bytes,154.50 MiB,1.00 MiB
Shape,"(4, 4500, 4500)","(2, 512, 512)"
Dask graph,162 chunks in 1 graph layer,162 chunks in 1 graph layer
Data type,uint16 numpy.ndarray,uint16 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,154.50 MiB,1.00 MiB
Shape,"(4, 4500, 4500)","(2, 512, 512)"
Dask graph,162 chunks in 1 graph layer,162 chunks in 1 graph layer
Data type,uint16 numpy.ndarray,uint16 numpy.ndarray
"Array Chunk Bytes 154.50 MiB 1.00 MiB Shape (4, 4500, 4500) (2, 512, 512) Dask graph 162 chunks in 1 graph layer Data type uint16 numpy.ndarray",4500  4500  4,

Unnamed: 0,Array,Chunk
Bytes,154.50 MiB,1.00 MiB
Shape,"(4, 4500, 4500)","(2, 512, 512)"
Dask graph,162 chunks in 1 graph layer,162 chunks in 1 graph layer
Data type,uint16 numpy.ndarray,uint16 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,154.50 MiB,1.00 MiB
Shape,"(4, 4500, 4500)","(2, 512, 512)"
Dask graph,162 chunks in 1 graph layer,162 chunks in 1 graph layer
Data type,uint16 numpy.ndarray,uint16 numpy.ndarray
"Array Chunk Bytes 154.50 MiB 1.00 MiB Shape (4, 4500, 4500) (2, 512, 512) Dask graph 162 chunks in 1 graph layer Data type uint16 numpy.ndarray",4500  4500  4,

Unnamed: 0,Array,Chunk
Bytes,154.50 MiB,1.00 MiB
Shape,"(4, 4500, 4500)","(2, 512, 512)"
Dask graph,162 chunks in 1 graph layer,162 chunks in 1 graph layer
Data type,uint16 numpy.ndarray,uint16 numpy.ndarray


In [17]:
ls_scn_xa = ls_scn_xa.assign({"gmw_msk": (("latitude", "longitude"),gmw_msk_arr)})

In [18]:
ls_scn_qa_xa = ls_scn_xa.map_blocks(expand_ls_qa_pixel_msks)
ls_scn_qa_xa.coords["spatial_ref"] = ls_scn_xa.coords["spatial_ref"]

In [19]:
ls_scn_qa_mskd_xa = ls_scn_qa_xa.map_blocks(apply_cloud_msk, kwargs={"bands": ["red", "nir08", "swir16"]})
ls_scn_qa_mskd_xa.coords["spatial_ref"] = ls_scn_xa.coords["spatial_ref"]

In [20]:
ls_scn_qa_mskd_xa = ls_scn_qa_mskd_xa.drop_vars(["qa_pixel", "FILL", "DILATED_CLOUDS", "CIRRUS", "CLOUDS", "CLOUD_SHADOWS", "SNOW", "CLEAR", "WATER", "ALL_CLOUDS"])


In [21]:
ls_scn_qa_mskd_xa = ls_scn_qa_mskd_xa.compute()

This may cause some slowdown.
Consider scattering data ahead of time and using futures.


In [22]:
ls_scn_qa_mskd_xa

In [43]:
def apply_flatten(scns_xa, vld_msk, vld_msk_val, out_var, **kwargs):
    scns_lcl_df = scns_xa.to_dataframe()

    scns_lcl_sub_df = scns_lcl_df[scns_lcl_df[vld_msk] == vld_msk_val]

    scns_lcl_sub_df[out_var] = (scns_lcl_sub_df["nir08"] - scns_lcl_sub_df["red"])/(scns_lcl_sub_df["nir08"] + scns_lcl_sub_df["red"])

    scns_lcl_df[out_var] = scns_lcl_sub_df[out_var]
    #scns_lcl_df[out_var] = (scns_lcl_df["nir08"] - scns_lcl_df["red"])/(scns_lcl_df["nir08"] + scns_lcl_df["red"])
    
    scns_out_xa = xarray.Dataset.from_dataframe(scns_lcl_df)
    #scns_out_xa.spatial_ref = scns_xa.spatial_ref
    return scns_out_xa

In [44]:
tmp_xr = ls_scn_qa_mskd_xa.copy()
tmp_xr.assign({"NDVI_calc": (("latitude", "longitude"), gmw_msk_arr)})
tmp_xr.assign({"spatial_ref": (("latitude", "longitude"), gmw_msk_arr)})

In [45]:
#tmp_xr["spatial_ref"]

In [53]:
ls_scn_ndvi_xa = ls_scn_qa_mskd_xa.map_blocks(apply_flatten, template=tmp_xr, kwargs={"vld_msk":"gmw_msk", "vld_msk_val": 1, "out_var":"NDVI"})


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  scns_lcl_sub_df[out_var] = (scns_lcl_sub_df["nir08"] - scns_lcl_sub_df["red"])/(scns_lcl_sub_df["nir08"] + scns_lcl_sub_df["red"])


In [54]:
ls_scn_ndvi_xa

In [55]:
ls_scn_ndvi_xa.compute()

In [56]:
ls_scn_only_ndvi_xa = ls_scn_ndvi_xa.drop_vars(["spatial_ref", "red", "nir08", "swir16", "gmw_msk"])

In [57]:
ls_scn_only_ndvi_xa

In [68]:
ls_mean_ndvi_xa = ls_scn_only_ndvi_xa.mean(dim="time", skipna=True)

In [69]:
ls_mean_ndvi_xa