In [2]:
import xarray as xr
from distributed import LocalCluster
import json
import folium
from datetime import datetime, timedelta
import s3fs
from collections import defaultdict
import numpy as np
from dask.distributed import Client
from distributed import LocalCluster

In [3]:
cluster = LocalCluster()
# Initialize Dask client for parallel processing
client = Client(n_workers=4, threads_per_worker=2)
client

Perhaps you already have a cluster running?
Hosting the HTTP server on port 37695 instead


Perhaps you already have a cluster running?
Hosting the HTTP server on port 34223 instead


0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:34223/status,

0,1
Dashboard: http://127.0.0.1:34223/status,Workers: 4
Total threads: 8,Total memory: 7.75 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:46599,Workers: 4
Dashboard: http://127.0.0.1:34223/status,Total threads: 8
Started: Just now,Total memory: 7.75 GiB

0,1
Comm: tcp://127.0.0.1:41999,Total threads: 2
Dashboard: http://127.0.0.1:41507/status,Memory: 1.94 GiB
Nanny: tcp://127.0.0.1:45377,
Local directory: /tmp/dask-scratch-space/worker-lz6wbsfw,Local directory: /tmp/dask-scratch-space/worker-lz6wbsfw

0,1
Comm: tcp://127.0.0.1:33389,Total threads: 2
Dashboard: http://127.0.0.1:36747/status,Memory: 1.94 GiB
Nanny: tcp://127.0.0.1:46431,
Local directory: /tmp/dask-scratch-space/worker-gibo72jf,Local directory: /tmp/dask-scratch-space/worker-gibo72jf

0,1
Comm: tcp://127.0.0.1:43007,Total threads: 2
Dashboard: http://127.0.0.1:33011/status,Memory: 1.94 GiB
Nanny: tcp://127.0.0.1:34845,
Local directory: /tmp/dask-scratch-space/worker-ak5ruz8c,Local directory: /tmp/dask-scratch-space/worker-ak5ruz8c

0,1
Comm: tcp://127.0.0.1:37035,Total threads: 2
Dashboard: http://127.0.0.1:34905/status,Memory: 1.94 GiB
Nanny: tcp://127.0.0.1:43907,
Local directory: /tmp/dask-scratch-space/worker-6g__a04v,Local directory: /tmp/dask-scratch-space/worker-6g__a04v


In [4]:
bucket = "e05ab01a9d56408d82ac32d69a5aae2a:sample-data"
prefixes = ["tutorial_data/cpm_v253/", "tutorial_data/cpm_v256/"]
prefix_url = "https://objectstore.eodc.eu:2222"
start_date = datetime(2023, 6, 1)
end_date = datetime(2023, 10, 30)

fs = s3fs.S3FileSystem(anon=True, client_kwargs={"endpoint_url": prefix_url})

# Unregister CEPH handler (required for EODC)
handlers = fs.s3.meta.events._emitter._handlers
handlers_to_unregister = handlers.prefix_search("before-parameter-build.s3")
fs.s3.meta.events._emitter.unregister("before-parameter-build.s3", handlers_to_unregister[0])

filtered_urls = []

for prefix in prefixes:
    # Find all potential Sentinel-3 LST datasets
    s3_pattern = f"s3://{bucket}/{prefix}S3[AB]_SL_2_LST____*.zarr"
    candidates = fs.glob(s3_pattern)
    
    for s3_path in candidates:
        # Convert S3 path to HTTP URL
        http_url = f"{prefix_url}/{s3_path.replace('s3://', '')}"
        
        # Extract filename from URL
        filename = s3_path.split('/')[-1]
        
        # Parse sensing start time (4th segment after splitting by '_')
        try:
            # Split: S3B_SL_2_LST____20231029T203741_20231029T204041_...
            time_segment = filename.split("____")[1]
            sensing_start = time_segment.split("_")[0]  # 20231029T203741
            file_date = datetime.strptime(sensing_start[:8], "%Y%m%d")
            
            if start_date <= file_date <= end_date:
                filtered_urls.append(http_url)
                
        except (IndexError, ValueError) as e:
            print(f"Skipping malformed filename: {filename}")
            continue

print(f"Found {len(filtered_urls)} datasets between {start_date.date()} and {end_date.date()}")


Found 1050 datasets between 2023-06-01 and 2023-10-30


In [5]:
bbox = [3.3, 50.7, 7.2, 53.6] 

In [6]:
def get_dataset_shape(url):
    """Get the shape of a Sentinel-3 SLSTR dataset"""
    try:
        ds = xr.open_dataset(
            url,
            engine="zarr",
            group="measurements",
            chunks={}
        )
        shape = ds['lst'].shape
        ds.close()
        return shape
    except Exception as e:
        print(f"Error getting shape for {url}: {e}")
        return None

def group_urls_by_shape(url_list):
    """Group URLs by their dataset shapes"""
    shape_groups = defaultdict(list)
    
    for i, url in enumerate(url_list):
        print(f"Checking shape for URL {i+1}/{len(url_list)}")
        shape = get_dataset_shape(url)
        if shape:
            shape_groups[shape].append(url)
    
    return dict(shape_groups)

In [7]:
def extract_date_from_s3_filename(url):
    """Extract sensing start date from Sentinel-3 SLSTR filename"""
    filename = url.split('/')[-1]
    
    # Split by four underscores: S3X_SL_2_LST____YYYYMMDDTHHMMSS_...
    parts = filename.split('____')
    if len(parts) > 1:
        time_segment = parts[1]
        sensing_start = time_segment.split('_')[0]  # Gets "20230614T202459"
        
        # Extract date part (first 8 characters)
        date_str = sensing_start[:8]  # Gets "20230614"
        date_obj = datetime.strptime(date_str, "%Y%m%d").date()
        return date_obj
    else:
        return None

def group_urls_by_date(url_list):
    """Group URLs by their acquisition date"""
    date_groups = defaultdict(list)
    
    for url in url_list:
        date = extract_date_from_s3_filename(url)
        if date:
            date_groups[date].append(url)
    
    return dict(date_groups)


# Combined Date and Shape Grouping
Since we’ve already discovered that shape-based grouping resolves dimension conflicts, we can combine both approaches for optimal results:

In [8]:
def group_by_date_and_shape(url_list):
    """Group URLs by both date and shape for maximum efficiency"""
    date_groups = group_urls_by_date(url_list)
    
    final_groups = {}
    
    for date, urls in date_groups.items():
        # Further group by shape within each date
        shape_groups = group_urls_by_shape(urls)
        
        for shape, shape_urls in shape_groups.items():
            group_key = f"{date}_{shape}"
            final_groups[group_key] = shape_urls
    
    return final_groups


In [9]:
def process_daily_groups(date_groups, bbox):
    """Process each date group separately"""
    daily_results = {}
    
    for date, urls in date_groups.items():
        print(f"Processing {len(urls)} files for {date}")
        
        # Filter by shape within date group if needed
        shape_groups = group_urls_by_shape(urls)
        
        # Process the largest shape group (most files)
        largest_group = max(shape_groups.values(), key=len)
        
        def extract_time_and_filter(ds):
            # Add time coordinate for this specific date
            ds = ds.assign_coords(time=datetime.combine(date, datetime.min.time()))
            
            # Apply spatial filtering for Netherlands
            mask = (
                (ds.latitude >= bbox[1]) & 
                (ds.latitude <= bbox[3]) &
                (ds.longitude >= bbox[0]) & 
                (ds.longitude <= bbox[2])
            )
            return ds.where(mask)
        
        # Process daily data
        daily_data = xr.open_mfdataset(
            largest_group,
            engine="zarr",
            group="measurements",
            preprocess=extract_time_and_filter,
            combine="nested",
            concat_dim="time",
            chunks="auto",
            parallel=True,
            consolidated=True
            
        )['lst']
        
        # Take daily maximum (or mean) to reduce to single value per day
        daily_results[date] = daily_data.max(dim="time")
    
    return daily_results


In [10]:
def create_time_series_from_daily_results(daily_results):
    """Combine daily results into time series"""
    # Sort by date and combine
    sorted_dates = sorted(daily_results.keys())
    
    datasets = []
    for date in sorted_dates:
        data = daily_results[date]
        # Add time dimension back
        data = data.expand_dims(time=[datetime.combine(date, datetime.min.time())])
        datasets.append(data)
    
    # Combine into time series
    time_series = xr.concat(datasets, dim="time")
    return time_series.sortby("time")


In [None]:
# Step 1: Group by date first
date_groups = group_urls_by_date(filtered_urls)

# Step 2: Process each date group
daily_lst_data = process_daily_groups(date_groups, bbox=[3.3, 50.7, 7.2, 53.6])

# Step 3: Create time series
lst_time_series = create_time_series_from_daily_results(daily_lst_data)




Processing 7 files for 2023-06-01
Checking shape for URL 1/7
Checking shape for URL 2/7
Checking shape for URL 3/7
Checking shape for URL 4/7
Checking shape for URL 5/7
Checking shape for URL 6/7
Checking shape for URL 7/7
Processing 6 files for 2023-06-02
Checking shape for URL 1/6
Checking shape for URL 2/6
Checking shape for URL 3/6
Checking shape for URL 4/6
Checking shape for URL 5/6
Checking shape for URL 6/6
Processing 8 files for 2023-06-03
Checking shape for URL 1/8
Checking shape for URL 2/8
Checking shape for URL 3/8
Checking shape for URL 4/8
Checking shape for URL 5/8
Checking shape for URL 6/8
Checking shape for URL 7/8
Checking shape for URL 8/8
Processing 8 files for 2023-06-04
Checking shape for URL 1/8
Checking shape for URL 2/8
Checking shape for URL 3/8
Checking shape for URL 4/8
Checking shape for URL 5/8
Checking shape for URL 6/8
Checking shape for URL 7/8
Checking shape for URL 8/8
Processing 7 files for 2023-06-05
Checking shape for URL 1/7
Checking shape for U

2025-06-01 14:39:45,878 - distributed.worker - ERROR - Failed to communicate with scheduler during heartbeat.
Traceback (most recent call last):
  File "/workspaces/eopf-sample-notebooks/venv/lib/python3.11/site-packages/distributed/comm/tcp.py", line 225, in read
    frames_nosplit_nbytes_bin = await stream.read_bytes(fmt_size)
                                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
tornado.iostream.StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/workspaces/eopf-sample-notebooks/venv/lib/python3.11/site-packages/distributed/worker.py", line 1252, in heartbeat
    response = await retry_operation(
               ^^^^^^^^^^^^^^^^^^^^^^
  File "/workspaces/eopf-sample-notebooks/venv/lib/python3.11/site-packages/distributed/utils_comm.py", line 452, in retry_operation
    return await retry(
           ^^^^^^^^^^^^
  File "/workspaces/eopf-sample-notebooks/venv/lib/python3.11/s

KeyboardInterrupt: 

2025-06-01 14:39:47,887 - distributed.nanny - ERROR - Worker process died unexpectedly
Process Dask Worker process (from Nanny):
Traceback (most recent call last):
  File "/workspaces/eopf-sample-notebooks/venv/lib/python3.11/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/workspaces/eopf-sample-notebooks/venv/lib/python3.11/asyncio/base_events.py", line 654, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "/workspaces/eopf-sample-notebooks/venv/lib/python3.11/site-packages/distributed/nanny.py", line 981, in run
    await worker.finished()
  File "/workspaces/eopf-sample-notebooks/venv/lib/python3.11/site-packages/distributed/core.py", line 630, in finished
    await self._event_finished.wait()
  File "/workspaces/eopf-sample-notebooks/venv/lib/python3.11/asyncio/locks.py", line 213, in wait
    await fut
asyncio.exceptions.CancelledError

During handling of 

In [14]:
lst_time_series

Unnamed: 0,Array,Chunk
Bytes,1.01 GiB,6.87 MiB
Shape,"(151, 1200, 1500)","(1, 1200, 1500)"
Dask graph,151 chunks in 15786 graph layers,151 chunks in 15786 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 1.01 GiB 6.87 MiB Shape (151, 1200, 1500) (1, 1200, 1500) Dask graph 151 chunks in 15786 graph layers Data type float32 numpy.ndarray",1500  1200  151,

Unnamed: 0,Array,Chunk
Bytes,1.01 GiB,6.87 MiB
Shape,"(151, 1200, 1500)","(1, 1200, 1500)"
Dask graph,151 chunks in 15786 graph layers,151 chunks in 15786 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


In [11]:
ds = xr.open_datatree(filtered_urls[0], engine="zarr", chunks={})

In [12]:
ds.measurements

Unnamed: 0,Array,Chunk
Bytes,1.71 MiB,1.71 MiB
Shape,"(1200, 187)","(1200, 187)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 1.71 MiB 1.71 MiB Shape (1200, 187) (1200, 187) Dask graph 1 chunks in 2 graph layers Data type float64 numpy.ndarray",187  1200,

Unnamed: 0,Array,Chunk
Bytes,1.71 MiB,1.71 MiB
Shape,"(1200, 187)","(1200, 187)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,1.71 MiB,1.71 MiB
Shape,"(1200, 187)","(1200, 187)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 1.71 MiB 1.71 MiB Shape (1200, 187) (1200, 187) Dask graph 1 chunks in 2 graph layers Data type float64 numpy.ndarray",187  1200,

Unnamed: 0,Array,Chunk
Bytes,1.71 MiB,1.71 MiB
Shape,"(1200, 187)","(1200, 187)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,1.71 MiB,1.71 MiB
Shape,"(1200, 187)","(1200, 187)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 1.71 MiB 1.71 MiB Shape (1200, 187) (1200, 187) Dask graph 1 chunks in 2 graph layers Data type float64 numpy.ndarray",187  1200,

Unnamed: 0,Array,Chunk
Bytes,1.71 MiB,1.71 MiB
Shape,"(1200, 187)","(1200, 187)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,1.71 MiB,1.71 MiB
Shape,"(1200, 187)","(1200, 187)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 1.71 MiB 1.71 MiB Shape (1200, 187) (1200, 187) Dask graph 1 chunks in 2 graph layers Data type float64 numpy.ndarray",187  1200,

Unnamed: 0,Array,Chunk
Bytes,1.71 MiB,1.71 MiB
Shape,"(1200, 187)","(1200, 187)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,876.56 kiB,876.56 kiB
Shape,"(1200, 187)","(1200, 187)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 876.56 kiB 876.56 kiB Shape (1200, 187) (1200, 187) Dask graph 1 chunks in 2 graph layers Data type float32 numpy.ndarray",187  1200,

Unnamed: 0,Array,Chunk
Bytes,876.56 kiB,876.56 kiB
Shape,"(1200, 187)","(1200, 187)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,13.73 MiB,3.43 MiB
Shape,"(1200, 1500)","(600, 750)"
Dask graph,4 chunks in 2 graph layers,4 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 13.73 MiB 3.43 MiB Shape (1200, 1500) (600, 750) Dask graph 4 chunks in 2 graph layers Data type float64 numpy.ndarray",1500  1200,

Unnamed: 0,Array,Chunk
Bytes,13.73 MiB,3.43 MiB
Shape,"(1200, 1500)","(600, 750)"
Dask graph,4 chunks in 2 graph layers,4 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,13.73 MiB,3.43 MiB
Shape,"(1200, 1500)","(600, 750)"
Dask graph,4 chunks in 2 graph layers,4 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 13.73 MiB 3.43 MiB Shape (1200, 1500) (600, 750) Dask graph 4 chunks in 2 graph layers Data type float64 numpy.ndarray",1500  1200,

Unnamed: 0,Array,Chunk
Bytes,13.73 MiB,3.43 MiB
Shape,"(1200, 1500)","(600, 750)"
Dask graph,4 chunks in 2 graph layers,4 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,13.73 MiB,3.43 MiB
Shape,"(1200, 1500)","(600, 750)"
Dask graph,4 chunks in 2 graph layers,4 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 13.73 MiB 3.43 MiB Shape (1200, 1500) (600, 750) Dask graph 4 chunks in 2 graph layers Data type float64 numpy.ndarray",1500  1200,

Unnamed: 0,Array,Chunk
Bytes,13.73 MiB,3.43 MiB
Shape,"(1200, 1500)","(600, 750)"
Dask graph,4 chunks in 2 graph layers,4 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,13.73 MiB,3.43 MiB
Shape,"(1200, 1500)","(600, 750)"
Dask graph,4 chunks in 2 graph layers,4 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 13.73 MiB 3.43 MiB Shape (1200, 1500) (600, 750) Dask graph 4 chunks in 2 graph layers Data type float64 numpy.ndarray",1500  1200,

Unnamed: 0,Array,Chunk
Bytes,13.73 MiB,3.43 MiB
Shape,"(1200, 1500)","(600, 750)"
Dask graph,4 chunks in 2 graph layers,4 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,6.87 MiB,6.87 MiB
Shape,"(1200, 1500)","(1200, 1500)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 6.87 MiB 6.87 MiB Shape (1200, 1500) (1200, 1500) Dask graph 1 chunks in 2 graph layers Data type float32 numpy.ndarray",1500  1200,

Unnamed: 0,Array,Chunk
Bytes,6.87 MiB,6.87 MiB
Shape,"(1200, 1500)","(1200, 1500)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


In [13]:
ds1 = xr.open_mfdataset(
    [filtered_urls[0]],
    engine="zarr",
    group="quality",
    combine="nested",
    concat_dim="time",
    chunks="auto",
    parallel=True,
    consolidated=True
)

In [14]:
ds1

Unnamed: 0,Array,Chunk
Bytes,3.43 MiB,3.43 MiB
Shape,"(1, 1200, 1500)","(1, 1200, 1500)"
Dask graph,1 chunks in 3 graph layers,1 chunks in 3 graph layers
Data type,uint16 numpy.ndarray,uint16 numpy.ndarray
"Array Chunk Bytes 3.43 MiB 3.43 MiB Shape (1, 1200, 1500) (1, 1200, 1500) Dask graph 1 chunks in 3 graph layers Data type uint16 numpy.ndarray",1500  1200  1,

Unnamed: 0,Array,Chunk
Bytes,3.43 MiB,3.43 MiB
Shape,"(1, 1200, 1500)","(1, 1200, 1500)"
Dask graph,1 chunks in 3 graph layers,1 chunks in 3 graph layers
Data type,uint16 numpy.ndarray,uint16 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,3.43 MiB,3.43 MiB
Shape,"(1, 1200, 1500)","(1, 1200, 1500)"
Dask graph,1 chunks in 3 graph layers,1 chunks in 3 graph layers
Data type,uint16 numpy.ndarray,uint16 numpy.ndarray
"Array Chunk Bytes 3.43 MiB 3.43 MiB Shape (1, 1200, 1500) (1, 1200, 1500) Dask graph 1 chunks in 3 graph layers Data type uint16 numpy.ndarray",1500  1200  1,

Unnamed: 0,Array,Chunk
Bytes,3.43 MiB,3.43 MiB
Shape,"(1, 1200, 1500)","(1, 1200, 1500)"
Dask graph,1 chunks in 3 graph layers,1 chunks in 3 graph layers
Data type,uint16 numpy.ndarray,uint16 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,3.43 MiB,3.43 MiB
Shape,"(1, 1200, 1500)","(1, 1200, 1500)"
Dask graph,1 chunks in 3 graph layers,1 chunks in 3 graph layers
Data type,int16 numpy.ndarray,int16 numpy.ndarray
"Array Chunk Bytes 3.43 MiB 3.43 MiB Shape (1, 1200, 1500) (1, 1200, 1500) Dask graph 1 chunks in 3 graph layers Data type int16 numpy.ndarray",1500  1200  1,

Unnamed: 0,Array,Chunk
Bytes,3.43 MiB,3.43 MiB
Shape,"(1, 1200, 1500)","(1, 1200, 1500)"
Dask graph,1 chunks in 3 graph layers,1 chunks in 3 graph layers
Data type,int16 numpy.ndarray,int16 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,6.87 MiB,6.87 MiB
Shape,"(1, 1200, 1500)","(1, 1200, 1500)"
Dask graph,1 chunks in 3 graph layers,1 chunks in 3 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 6.87 MiB 6.87 MiB Shape (1, 1200, 1500) (1, 1200, 1500) Dask graph 1 chunks in 3 graph layers Data type float32 numpy.ndarray",1500  1200  1,

Unnamed: 0,Array,Chunk
Bytes,6.87 MiB,6.87 MiB
Shape,"(1, 1200, 1500)","(1, 1200, 1500)"
Dask graph,1 chunks in 3 graph layers,1 chunks in 3 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,6.87 MiB,6.87 MiB
Shape,"(1, 1200, 1500)","(1, 1200, 1500)"
Dask graph,1 chunks in 3 graph layers,1 chunks in 3 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 6.87 MiB 6.87 MiB Shape (1, 1200, 1500) (1, 1200, 1500) Dask graph 1 chunks in 3 graph layers Data type float32 numpy.ndarray",1500  1200  1,

Unnamed: 0,Array,Chunk
Bytes,6.87 MiB,6.87 MiB
Shape,"(1, 1200, 1500)","(1, 1200, 1500)"
Dask graph,1 chunks in 3 graph layers,1 chunks in 3 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,6.87 MiB,6.87 MiB
Shape,"(1, 1200, 1500)","(1, 1200, 1500)"
Dask graph,1 chunks in 3 graph layers,1 chunks in 3 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 6.87 MiB 6.87 MiB Shape (1, 1200, 1500) (1, 1200, 1500) Dask graph 1 chunks in 3 graph layers Data type float32 numpy.ndarray",1500  1200  1,

Unnamed: 0,Array,Chunk
Bytes,6.87 MiB,6.87 MiB
Shape,"(1, 1200, 1500)","(1, 1200, 1500)"
Dask graph,1 chunks in 3 graph layers,1 chunks in 3 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,6.87 MiB,6.87 MiB
Shape,"(1, 1200, 1500)","(1, 1200, 1500)"
Dask graph,1 chunks in 3 graph layers,1 chunks in 3 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 6.87 MiB 6.87 MiB Shape (1, 1200, 1500) (1, 1200, 1500) Dask graph 1 chunks in 3 graph layers Data type float32 numpy.ndarray",1500  1200  1,

Unnamed: 0,Array,Chunk
Bytes,6.87 MiB,6.87 MiB
Shape,"(1, 1200, 1500)","(1, 1200, 1500)"
Dask graph,1 chunks in 3 graph layers,1 chunks in 3 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,6.87 MiB,6.87 MiB
Shape,"(1, 1200, 1500)","(1, 1200, 1500)"
Dask graph,1 chunks in 3 graph layers,1 chunks in 3 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 6.87 MiB 6.87 MiB Shape (1, 1200, 1500) (1, 1200, 1500) Dask graph 1 chunks in 3 graph layers Data type float32 numpy.ndarray",1500  1200  1,

Unnamed: 0,Array,Chunk
Bytes,6.87 MiB,6.87 MiB
Shape,"(1, 1200, 1500)","(1, 1200, 1500)"
Dask graph,1 chunks in 3 graph layers,1 chunks in 3 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,6.87 MiB,6.87 MiB
Shape,"(1, 1200, 1500)","(1, 1200, 1500)"
Dask graph,1 chunks in 3 graph layers,1 chunks in 3 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 6.87 MiB 6.87 MiB Shape (1, 1200, 1500) (1, 1200, 1500) Dask graph 1 chunks in 3 graph layers Data type float32 numpy.ndarray",1500  1200  1,

Unnamed: 0,Array,Chunk
Bytes,6.87 MiB,6.87 MiB
Shape,"(1, 1200, 1500)","(1, 1200, 1500)"
Dask graph,1 chunks in 3 graph layers,1 chunks in 3 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,6.87 MiB,6.87 MiB
Shape,"(1, 1200, 1500)","(1, 1200, 1500)"
Dask graph,1 chunks in 3 graph layers,1 chunks in 3 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 6.87 MiB 6.87 MiB Shape (1, 1200, 1500) (1, 1200, 1500) Dask graph 1 chunks in 3 graph layers Data type float32 numpy.ndarray",1500  1200  1,

Unnamed: 0,Array,Chunk
Bytes,6.87 MiB,6.87 MiB
Shape,"(1, 1200, 1500)","(1, 1200, 1500)"
Dask graph,1 chunks in 3 graph layers,1 chunks in 3 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,1.72 MiB,1.72 MiB
Shape,"(1, 1200, 1500)","(1, 1200, 1500)"
Dask graph,1 chunks in 3 graph layers,1 chunks in 3 graph layers
Data type,uint8 numpy.ndarray,uint8 numpy.ndarray
"Array Chunk Bytes 1.72 MiB 1.72 MiB Shape (1, 1200, 1500) (1, 1200, 1500) Dask graph 1 chunks in 3 graph layers Data type uint8 numpy.ndarray",1500  1200  1,

Unnamed: 0,Array,Chunk
Bytes,1.72 MiB,1.72 MiB
Shape,"(1, 1200, 1500)","(1, 1200, 1500)"
Dask graph,1 chunks in 3 graph layers,1 chunks in 3 graph layers
Data type,uint8 numpy.ndarray,uint8 numpy.ndarray


In [None]:
import dask

# Configure Dask for optimal performance
dask.config.set(**{
    'array.slicing.split_large_chunks': True,
    'distributed.worker.memory.target': 0.8,
    'distributed.worker.memory.spill': 0.9
})

def preprocess_slstr(ds):
    """Single-pass preprocessing with spatial/temporal handling"""
    try:
        # Extract time from filename (fast string operations)
        filename = ds.encoding['source'].split('/')[-1]
        time_str = filename.split('____')[1].split('_')[0][:15]  # YYYYMMDDTHHMMSS
        time = datetime.strptime(time_str, "%Y%m%dT%H%M%S")
        
        # Netherlands bounding box filter
        bbox = (3.3, 50.7, 7.2, 53.6)
        mask = (
            (ds.latitude >= bbox[1]) & 
            (ds.latitude <= bbox[3]) &
            (ds.longitude >= bbox[0]) & 
            (ds.longitude <= bbox[2])
        )
        
        # Select and mask variables in one go
        ds = ds[['lst', 'confidence_in', 'cloud_in']].where(mask)
        ds = ds.assign_coords(time=time)
        
        return ds
    except Exception as e:
        print(f"Skipping corrupt/malformed dataset: {e}")
        return None

# Load all data in one parallel operation
ds = xr.open_mfdataset(
    filtered_urls,
    engine="zarr",
    group="measurements",
    preprocess=preprocess_slstr,
    combine="nested",
    concat_dim="time",
    chunks={'time': 10, 'rows': 600, 'columns': 750},  # Match Zarr chunks
    parallel=True,  # Critical for speed
    data_vars='minimal',
    coords='minimal',
    join='override',
    compat='override',
    consolidated= True
)['lst']




In [None]:
# Apply quality masks (vectorized)
ds['lst'] = ds.lst.where(ds.confidence_in >= 16384).where(ds.cloud_in == 0)
# Heatwave detection with optimized rolling
heatwave_mask = (
    ds.lst.rolling(time=5, min_periods=5)
    .reduce(lambda x: np.logical_and(np.all(x > 295), np.sum(x > 300) >= 3))
)

# Aggregate and save
heatwave_count = heatwave_mask.sum('time')
heatwave_count.rio.write_crs("EPSG:4326").to_netcdf(
    "heatwave_netherlands.nc",
    encoding={'heatwave_count': {'zlib': True, 'complevel': 5}}
)

# Step 1: Group Datasets by Shape

In [15]:
from concurrent.futures import ThreadPoolExecutor
from collections import defaultdict
import zarr
from tqdm import tqdm

def get_zarr_shape(url):
    """Get array shape through direct Zarr metadata access"""
    try:
        group = zarr.open_group(url, mode='r')
        lst_array = group['measurements/lst']
        return lst_array.shape
    except Exception as e:
        print(f"Error processing {url}: {e}")
        return None

def optimized_group_by_shape(urls, max_workers=8):
    """Process 1000 URLs with configurable parallelism"""
    shape_groups = defaultdict(list)
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {executor.submit(get_zarr_shape, url): url for url in urls}
        
        with tqdm(total=len(urls), desc="Processing URLs") as pbar:
            for future in futures:
                url = futures[future]
                try:
                    shape = future.result()
                    if shape:
                        shape_groups[shape].append(url)
                except Exception as e:
                    print(f"Failed {url}: {e}")
                pbar.update(1)
    
    return dict(shape_groups)


In [16]:
grouped_list = optimized_group_by_shape(filtered_urls)

Processing URLs:   0%|          | 0/1050 [00:00<?, ?it/s]

Processing URLs: 100%|██████████| 1050/1050 [00:24<00:00, 42.99it/s]


In [17]:
len(grouped_list[(1200, 1500)])

1002

# Step 2: Process Each Shape Group

In [18]:
def preprocess_slstr(ds):
    """Load both measurements and quality data"""
    try:
        
        # Extract time from filename
        filename = ds.encoding['source'].split('/')[-1]
        time_str = filename.split('____')[1].split('_')[0]
        time = datetime.strptime(time_str, "%Y%m%dT%H%M%S")
        ds = ds.assign_coords(time=time)
        
        # Spatial filtering
        bbox = (3.3, 50.7, 7.2, 53.6)
        mask = (
            (ds.latitude >= bbox[1]) & 
            (ds.latitude <= bbox[3]) &
            (ds.longitude >= bbox[0]) & 
            (ds.longitude <= bbox[2])
        )
        return ds.where(mask)
        
    except Exception as e:
        print(f"Skipping dataset: {e}")
        return None


In [None]:
# Load data with both groups
ds = xr.open_mfdataset(
    grouped_list[(1200, 1500)],
    engine="zarr",
    group="measurements",
    preprocess=preprocess_slstr,
    combine="nested",
    concat_dim="time",
    chunks="auto",
    parallel=True,
    consolidated=True
)
ds

Unnamed: 0,Array,Chunk
Bytes,13.44 GiB,13.73 MiB
Shape,"(1002, 1200, 1500)","(1, 1200, 1500)"
Dask graph,1002 chunks in 3003 graph layers,1002 chunks in 3003 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 13.44 GiB 13.73 MiB Shape (1002, 1200, 1500) (1, 1200, 1500) Dask graph 1002 chunks in 3003 graph layers Data type float64 numpy.ndarray",1500  1200  1002,

Unnamed: 0,Array,Chunk
Bytes,13.44 GiB,13.73 MiB
Shape,"(1002, 1200, 1500)","(1, 1200, 1500)"
Dask graph,1002 chunks in 3003 graph layers,1002 chunks in 3003 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,13.44 GiB,13.73 MiB
Shape,"(1002, 1200, 1500)","(1, 1200, 1500)"
Dask graph,1002 chunks in 3003 graph layers,1002 chunks in 3003 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 13.44 GiB 13.73 MiB Shape (1002, 1200, 1500) (1, 1200, 1500) Dask graph 1002 chunks in 3003 graph layers Data type float64 numpy.ndarray",1500  1200  1002,

Unnamed: 0,Array,Chunk
Bytes,13.44 GiB,13.73 MiB
Shape,"(1002, 1200, 1500)","(1, 1200, 1500)"
Dask graph,1002 chunks in 3003 graph layers,1002 chunks in 3003 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,13.44 GiB,13.73 MiB
Shape,"(1002, 1200, 1500)","(1, 1200, 1500)"
Dask graph,1002 chunks in 3003 graph layers,1002 chunks in 3003 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 13.44 GiB 13.73 MiB Shape (1002, 1200, 1500) (1, 1200, 1500) Dask graph 1002 chunks in 3003 graph layers Data type float64 numpy.ndarray",1500  1200  1002,

Unnamed: 0,Array,Chunk
Bytes,13.44 GiB,13.73 MiB
Shape,"(1002, 1200, 1500)","(1, 1200, 1500)"
Dask graph,1002 chunks in 3003 graph layers,1002 chunks in 3003 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,13.44 GiB,13.73 MiB
Shape,"(1002, 1200, 1500)","(1, 1200, 1500)"
Dask graph,1002 chunks in 3003 graph layers,1002 chunks in 3003 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 13.44 GiB 13.73 MiB Shape (1002, 1200, 1500) (1, 1200, 1500) Dask graph 1002 chunks in 3003 graph layers Data type float64 numpy.ndarray",1500  1200  1002,

Unnamed: 0,Array,Chunk
Bytes,13.44 GiB,13.73 MiB
Shape,"(1002, 1200, 1500)","(1, 1200, 1500)"
Dask graph,1002 chunks in 3003 graph layers,1002 chunks in 3003 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,6.72 GiB,6.87 MiB
Shape,"(1002, 1200, 1500)","(1, 1200, 1500)"
Dask graph,1002 chunks in 15031 graph layers,1002 chunks in 15031 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 6.72 GiB 6.87 MiB Shape (1002, 1200, 1500) (1, 1200, 1500) Dask graph 1002 chunks in 15031 graph layers Data type float32 numpy.ndarray",1500  1200  1002,

Unnamed: 0,Array,Chunk
Bytes,6.72 GiB,6.87 MiB
Shape,"(1002, 1200, 1500)","(1, 1200, 1500)"
Dask graph,1002 chunks in 15031 graph layers,1002 chunks in 15031 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


In [19]:
def preprocess(ds):
    """Process individual Zarr dataset with parallel I/O"""
    # Extract time from filename
    filename = ds.encoding['source'].split('/')[-1]
    time_str = filename.split('____')[1].split('_')[0]
    time = np.datetime64(datetime.strptime(time_str, "%Y%m%dT%H%M%S"))
    
    # Merge measurements and quality groups
    ds_combined = xr.merge([
        ds['measurements'][['lst', 'latitude', 'longitude']],
        ds['quality'][['confidence_in', 'cloud_in']]
    ])
    
    # Spatial filtering
    mask = (
        (ds_combined.latitude >= bbox[1]) & 
        (ds_combined.latitude <= bbox[3]) &
        (ds_combined.longitude >= bbox[0]) & 
        (ds_combined.longitude <= bbox[2])
    )
    
    # Quality filtering
    ds_combined['lst'] = ds_combined.lst.where(
        (ds_combined.confidence_in >= 16384) & 
        (ds_combined.cloud_in == 0) & 
        mask
    )
    
    # Add time coordinate
    return ds_combined.assign_coords(time=time).expand_dims('time')

In [20]:
ds1 = xr.open_mfdataset(
    grouped_list[(1200, 1500)],
    engine="zarr",
    preprocess=preprocess,
    combine="nested",
    concat_dim="time",
    parallel=True,
    chunks={}
)

Key:       preprocess-ff58de4f-14ef-4a76-95c7-1604e0420e6b
Function:  preprocess
args:      (<xarray.Dataset> Size: 0B
Dimensions:  ()
Data variables:
    *empty*
Attributes:
    other_metadata:  {'L0_offset_between_scan_index_and_ISP_scan_count_in': 3...
    stac_discovery:  {'assets': {}, 'bbox': [29.2355, 51.3347, -0.876932, 64....)
kwargs:    {}
Exception: 'KeyError("No variable named \'measurements\'. Variables on the dataset include []")'

Key:       preprocess-fe7a850f-dd90-42f3-826f-96d150470cf1
Function:  preprocess
args:      (<xarray.Dataset> Size: 0B
Dimensions:  ()
Data variables:
    *empty*
Attributes:
    other_metadata:  {'L0_offset_between_scan_index_and_ISP_scan_count_in': 4...
    stac_discovery:  {'assets': {}, 'bbox': [13.0191, 41.1213, -10.2605, 54.4...)
kwargs:    {}
Exception: 'KeyError("No variable named \'measurements\'. Variables on the dataset include []")'

Key:       preprocess-fcdfc4c5-c50b-4449-b9c5-58cf321dd724
Function:  preprocess
args:      (<xarray

KeyError: "No variable named 'measurements'. Variables on the dataset include []"

In [None]:
    # Apply quality mask (bit 14: high confidence)
ds['lst'] = ds.lst.where(ds.confidence_in >= 16384)

# Step 3: Heatwave Detection & Aggregation

In [12]:
def detect_heatwave(da):
    """Vectorized 5-day heatwave detection"""
    window = da.rolling(time=5, min_periods=5)
    criteria1 = window.min() > 295  # All days > 295K
    criteria2 = window.sum() > 300*3  # At least 3 days > 300K
    return (criteria1 & criteria2).astype(np.float32)

In [13]:
shape_groups = group_by_shape(filtered_urls)

In [14]:
len(shape_groups[1200,1500])

1002

# Process all shape groups

In [17]:
results = {}
for shape, urls in shape_groups.items():
    ds = process_group(urls, shape)
    results[shape] = detect_heatwave(ds['lst']).sum('time')

: 

In [21]:
def preprocess_slstr(ds):
    """Extract time from filename and ensure nanosecond precision"""
    filename = ds.encoding["source"].split("/")[-1]
    time_str = filename.split("____")[1].split("_")[0]
    # Parse string to datetime, then to np.datetime64 with 'ns' precision
    dt = datetime.strptime(time_str, "%Y%m%dT%H%M%S")
    time_ns = np.datetime64(dt, 'ns')
    return ds.assign_coords(time=time_ns)


In [22]:
# Load measurements group
ds_meas = xr.open_mfdataset(
    grouped_list[(1200, 1500)],
    engine="zarr",
    group="measurements",
    preprocess=preprocess_slstr,
    combine="nested",
    concat_dim="time",
    chunks="auto",
    parallel=True,
    consolidated= True
)

# Load quality group
ds_qual = xr.open_mfdataset(
    grouped_list[(1200, 1500)],
    engine="zarr",
    group="quality",
    preprocess=preprocess_slstr,
    combine="nested",
    concat_dim="time",
    chunks={},
    parallel=True,
    consolidated=True
)

# Merge groups into single dataset
ds_combined = xr.merge([ds_meas, ds_qual])

# Apply quality mask
ds_combined['lst'] = ds_combined.lst.where(ds_combined.confidence_in >= 16384)

: 

In [1]:
from dask.diagnostics import ProgressBar

def process_group(group_name):
    return xr.open_mfdataset(
        grouped_list[(1200, 1500)],
        engine="zarr",
        group=group_name,
        preprocess=preprocess_slstr,
        combine="nested",
        concat_dim="time",
        chunks="auto",
        parallel=True,
        data_vars='minimal',
        coords='minimal',
        compat='override',
        consolidated=True
    )

# Process groups in parallel
with ProgressBar():
    ds_measurements = process_group("measurements")
    ds_quality = process_group("quality")

# Merge datasets using aligned coordinates
combined_ds = xr.merge(
    [ds_measurements, ds_quality],
    combine_attrs="drop_conflicts"
)


NameError: name 'xr' is not defined