In [1]:
import json
import geopandas as gpd

#INPUT VARIABLES
config_path = '00_input/korindo.json'
is_export_image_to_drive = False

# to-do, filter the AOI based on size, if the AOI is too big, we will use the GEE approach, if small, we will use the STAC approach
with open(config_path, 'r') as f:
    config = json.load(f)

aoi_gpd = gpd.GeoDataFrame.from_file(config["AOI_path"])
aoi_gpd = aoi_gpd.to_crs(epsg=int(config["output_crs"].split(":")[-1])) # satellite crs is epsg code of projected UTM crs
aoi_ha = aoi_gpd.geometry.area.sum()/10000

if aoi_ha > 5000: # 5,000 is the threshold for the AOI size
    print(f"AOI area ({aoi_ha} ha) is too big, using GEE approach when available and mix with STAC MPC")
    use_gee = True
else:
    print(f"AOI area ({aoi_ha} ha) is small, using STAC (local process xarray) approach")
    use_gee = False

AOI area (144217.6737820076 ha) is too big, using GEE approach when available and mix with STAC MPC


In [2]:
!pip install gcsfs



In [3]:
from dotenv import load_dotenv
load_dotenv()

True

In [4]:
# SIMPLIFIED EFFICIENT ZARR SAVING
# Focus: Proper parallelism, simple and reliable
# Removed complex auto-detection that might cause issues

import os
import time
import xarray as xr
from numcodecs import Blosc
import gcsfs

# Re-use a global filesystem client when possible
gcs = gcsfs.GCSFileSystem(project=os.getenv("GOOGLE_CLOUD_PROJECT"), token='/usr/src/app/user_id.json')

def save_dataset_efficient_zarr(
    ds,
    zarr_path,
    chunk_sizes=None,
    compression='lz4',
    compression_level=1,
    overwrite=True,
    consolidated=True,
    storage='auto',
    gcs_project=None,
):
    """
    Simplified zarr saving ‚Äì focuses on reliable parallelism.

    Parameters
    ----------
    ds : xarray.Dataset
        Dataset to save (lazy dask arrays or in-memory).
    zarr_path : str
        Destination path or GCS URI (e.g. gs://bucket/path.zarr).
    chunk_sizes : dict, optional
        Chunk sizes per dimension (e.g. {'time': 20, 'x': 256, 'y': 256}).
    compression : {'lz4','blosc','zstd',None} or dict
        Built-in compressor choice or explicit encoding dict.
    compression_level : int
        Compression level (1 fastest, 9 best compression).
    overwrite : bool
        Overwrite existing zarr store.
    consolidated : bool
        Create consolidated metadata (recommended).
    storage : {'auto','local','gcs'}
        Force storage backend or infer from path when 'auto'.
    gcs_project : str, optional
        Explicit GCP project for a fresh filesystem client.

    Returns
    -------
    str
        The zarr_path that was written.
    """
    def _format_size(num_bytes: int) -> str:
        size_mb = num_bytes / (1024 * 1024)
        size_gb = size_mb / 1024
        return f"{size_gb:.2f} GB" if size_gb >= 1 else f"{size_mb:.2f} MB"

    start_time = time.time()

    storage = storage.lower()
    if storage == 'auto':
        storage = 'gcs' if zarr_path.startswith('gs://') else 'local'
    if storage not in {'local', 'gcs'}:
        raise ValueError("storage must be one of {'auto', 'local', 'gcs'}")

    fs = None
    if storage == 'gcs':
        fs = gcs if gcs_project is None else gcsfs.GCSFileSystem(project=gcs_project)
    else:
        zarr_dir = os.path.dirname(zarr_path) if os.path.dirname(zarr_path) else '.'
        if zarr_dir and not os.path.exists(zarr_dir):
            os.makedirs(zarr_dir, exist_ok=True)

    # Handle overwrite
    if storage == 'gcs':
        if fs.exists(zarr_path):
            if not overwrite:
                raise FileExistsError(
                    f"Zarr store already exists on GCS: {zarr_path}\n"
                    "Set overwrite=True to replace it."
                )
            print(f"üóëÔ∏è  Removing existing GCS zarr store: {zarr_path}")
            fs.rm(zarr_path, recursive=True)
    else:
        if os.path.exists(zarr_path):
            if not overwrite:
                raise FileExistsError(
                    f"Zarr store already exists: {zarr_path}\n"
                    "Set overwrite=True to replace it."
                )
            import shutil
            print(f"üóëÔ∏è  Removing existing zarr store: {zarr_path}")
            shutil.rmtree(zarr_path)

    # Default chunk sizes
    if chunk_sizes is None:
        chunk_sizes = {}
        dims = ds.dims
        if 'time' in dims:
            chunk_sizes['time'] = min(20, dims['time'])
        if 'x' in dims:
            chunk_sizes['x'] = min(256, dims['x'])
        if 'y' in dims:
            chunk_sizes['y'] = min(256, dims['y'])
        for dim_name, dim_len in dims.items():
            chunk_sizes.setdefault(dim_name, min(100, dim_len))

    print(f"üì¶ Saving to zarr: {zarr_path}")
    print(f"   Dimensions: {dict(ds.dims)}")
    print(f"   Chunks: {chunk_sizes}")
    print(f"   Compression: {compression} (level {compression_level})")
    print(f"   Storage: {storage}")

    # Prepare compression
    if compression == 'lz4':
        compressor = Blosc(cname='lz4', clevel=compression_level, shuffle=Blosc.SHUFFLE, blocksize=0)
        encoding = {var: {'compressor': compressor} for var in ds.data_vars}
    elif compression == 'blosc':
        compressor = Blosc(cname='blosclz', clevel=compression_level, shuffle=Blosc.SHUFFLE, blocksize=0)
        encoding = {var: {'compressor': compressor} for var in ds.data_vars}
    elif compression == 'zstd':
        compressor = Blosc(cname='zstd', clevel=compression_level, shuffle=Blosc.SHUFFLE, blocksize=0)
        encoding = {var: {'compressor': compressor} for var in ds.data_vars}
    elif compression is None:
        encoding = {}
    else:
        encoding = compression  # assume dict supplied

    # Chunk and save
    ds_chunked = ds.chunk(chunk_sizes)
    print("üíæ Writing to zarr (with automatic parallelism)...")

    store = fs.get_mapper(zarr_path) if storage == 'gcs' else zarr_path
    try:
        from dask.diagnostics import ProgressBar
        with ProgressBar():
            ds_chunked.to_zarr(
                store,
                mode='w',
                encoding=encoding,
                consolidated=consolidated,
                compute=True,
            )
    except ImportError:
        ds_chunked.to_zarr(
            store,
            mode='w',
            encoding=encoding,
            consolidated=consolidated,
            compute=True,
        )

    elapsed = time.time() - start_time

    # Size reporting
    total_size = None
    if storage == 'gcs':
        try:
            size_info = fs.du(zarr_path)
            if isinstance(size_info, dict):
                total_size = sum(size_info.values())
            elif isinstance(size_info, (int, float)):
                total_size = size_info
        except Exception as exc:
            print(f"‚ö†Ô∏è  Could not compute GCS store size: {exc}")
    else:
        if os.path.exists(zarr_path):
            total_size = 0
            for dirpath, _, filenames in os.walk(zarr_path):
                for f in filenames:
                    fp = os.path.join(dirpath, f)
                    total_size += os.path.getsize(fp)

    if total_size is not None:
        size_str = _format_size(total_size)
        write_speed = total_size / elapsed / (1024 * 1024)
        print("‚úÖ Dataset saved successfully!")
        print(f"   Store size: {size_str}")
        print(f"   Time: {elapsed:.1f} seconds ({elapsed/60:.1f} minutes)")
        print(f"   Write speed: {write_speed:.1f} MB/s")
        print(f"   Path: {zarr_path}")
    else:
        print("‚úÖ Dataset saved successfully! (size unavailable)")
        print(f"   Time: {elapsed:.1f} seconds ({elapsed/60:.1f} minutes)")
        print(f"   Path: {zarr_path}")

    return zarr_path


def load_dataset_zarr(zarr_path, consolidated=True, storage='auto', gcs_project=None):
    """
    Load a dataset from a zarr store located locally or on GCS.
    """
    storage = storage.lower()
    if storage == 'auto':
        storage = 'gcs' if zarr_path.startswith('gs://') else 'local'
    if storage not in {'local', 'gcs'}:
        raise ValueError("storage must be one of {'auto', 'local', 'gcs'}")

    if storage == 'gcs':
        fs = gcs if gcs_project is None else gcsfs.GCSFileSystem(project=gcs_project)
        if not fs.exists(zarr_path):
            raise FileNotFoundError(f"Zarr store not found on GCS: {zarr_path}")
        mapper = fs.get_mapper(zarr_path)
        print(f"üìÇ Loading dataset from GCS zarr: {zarr_path}")
        ds = xr.open_zarr(mapper, consolidated=consolidated)
    else:
        if not os.path.exists(zarr_path):
            raise FileNotFoundError(f"Zarr store not found: {zarr_path}")
        print(f"üìÇ Loading dataset from zarr: {zarr_path}")
        ds = xr.open_zarr(zarr_path, consolidated=consolidated)

    print(f"‚úÖ Dataset loaded: {dict(ds.dims)}")
    return ds


print("‚úÖ Simplified zarr saving functions loaded!")
print("\nKey simplifications:")
print("  - No complex auto-detection")
print("  - Always uses compute=True (let dask handle parallelism)")
print("  - Simple, reliable, focuses on parallelism")
print("  - Works with both lazy and in-memory arrays")


‚úÖ Simplified zarr saving functions loaded!

Key simplifications:
  - No complex auto-detection
  - Always uses compute=True (let dask handle parallelism)
  - Simple, reliable, focuses on parallelism
  - Works with both lazy and in-memory arrays


In [6]:
zarr_path = os.getenv('GCS_ZARR_DIR') + '/ds_resampled.zarr'
# zarr_path = 'data/ds_resampled.zarr'
# storage = 'local'
storage = 'gcs'

ds_resampled = load_dataset_zarr(zarr_path, storage=storage)
ds_resampled


üìÇ Loading dataset from GCS zarr: gs://remote_sensing_saas/01-korindo/timeseries_zarr/ds_resampled.zarr


‚úÖ Dataset loaded: {'time': 81, 'x': 4489, 'y': 3213}


  print(f"‚úÖ Dataset loaded: {dict(ds.dims)}")


Unnamed: 0,Array,Chunk
Bytes,5.38 kiB,2.66 kiB
Shape,"(81,)","(40,)"
Dask graph,3 chunks in 2 graph layers,3 chunks in 2 graph layers
Data type,,
"Array Chunk Bytes 5.38 kiB 2.66 kiB Shape (81,) (40,) Dask graph 3 chunks in 2 graph layers Data type",81  1,

Unnamed: 0,Array,Chunk
Bytes,5.38 kiB,2.66 kiB
Shape,"(81,)","(40,)"
Dask graph,3 chunks in 2 graph layers,3 chunks in 2 graph layers
Data type,,

Unnamed: 0,Array,Chunk
Bytes,4.35 GiB,160.00 MiB
Shape,"(81, 4489, 3213)","(40, 1024, 1024)"
Dask graph,60 chunks in 2 graph layers,60 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 4.35 GiB 160.00 MiB Shape (81, 4489, 3213) (40, 1024, 1024) Dask graph 60 chunks in 2 graph layers Data type float32 numpy.ndarray",3213  4489  81,

Unnamed: 0,Array,Chunk
Bytes,4.35 GiB,160.00 MiB
Shape,"(81, 4489, 3213)","(40, 1024, 1024)"
Dask graph,60 chunks in 2 graph layers,60 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,4.35 GiB,160.00 MiB
Shape,"(81, 4489, 3213)","(40, 1024, 1024)"
Dask graph,60 chunks in 2 graph layers,60 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 4.35 GiB 160.00 MiB Shape (81, 4489, 3213) (40, 1024, 1024) Dask graph 60 chunks in 2 graph layers Data type float32 numpy.ndarray",3213  4489  81,

Unnamed: 0,Array,Chunk
Bytes,4.35 GiB,160.00 MiB
Shape,"(81, 4489, 3213)","(40, 1024, 1024)"
Dask graph,60 chunks in 2 graph layers,60 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


In [7]:
import numpy as np

min_date = ds_resampled.time.values.min()
min_date_str = np.datetime_as_string(min_date, unit='D')
min_date_str

'2018-02-15'

In [8]:
import os
import json
from shapely.geometry import box

from forestry_carbon_arr.core import ForestryCarbonARR

# Initialize Forestry Carbon ARR system
forestry = ForestryCarbonARR(config_path='./00_input/korindo.json')
from_gee_version_1_config = forestry.config