## Calculate Spatial Trends

In this notebook, spatial trends for all variables are calculated. The resulting datasets will be plotted as trend maps in a following notebook.

In [9]:
# import libraries
import os
import glob
import xarray as xr
import numpy as np
import dask.array as da
from dask.distributed import Client, LocalCluster, performance_report

In [2]:
# set working directory
os.chdir("E:/Master/Thesis/3_Data")
print("Current working directory: {0}".format(os.getcwd()))

Current working directory: E:\Master\Thesis\3_Data


In [3]:
# define a funtion to calculate spatial trends
# x = time steps
# y = value per pixel over time

def calculate_trend(y,x):

    # define mask to filter out nan values
    mask = np.isfinite(y) & np.isfinite(x)
    if mask.sum() < 2:
        return np.nan
    
    # calculate linear regression slope with np.polyfit and return slope value for each pixel
    return np.polyfit(x[mask], y[mask], deg = 1)[0]

In [4]:
# create list with forest types
forest_types = ("NL", "BL")

### NDVI

In [5]:
# calculate spatial trends for BL and NL seperately
for f in range(len(forest_types)):
    forest_type = forest_types[f]

    # open all yearly datasets as one
    directory = f"./NDVI/Yearly_harmonized"
    filelist = glob.glob(os.path.join(directory, f"*{forest_type}*.nc"))
    ds = xr.open_mfdataset(filelist, concat_dim="time", combine="nested", engine="netcdf4")

    # dechunk time dimension
    ds = ds.chunk(dict(time=-1))

    # get all time step values (in this case, all years)
    time_steps = ds["time"].dt.year.values

    # apply function to dataset to calculate spatial trend
    da_trend = xr.apply_ufunc(calculate_trend,
                              ds["NDVI"],                             # y 
                              xr.DataArray(time_steps, dims="time"),  # x
                              input_core_dims = [["time"], ["time"]],
                              output_core_dims=[[]], 
                              vectorize = True,
                              dask = "parallelized",
                              output_dtypes = [float])
    
    # set variable name
    da_trend.name = "NDVI"

    # convert to dataset (worked better for saving as netcdf)
    ds_trend = da_trend.to_dataset()

    # compress data to reduce size
    comp = dict(zlib=True, complevel=4)
    ds_trend.NDVI.encoding.update(comp)
    
    # save resulting dataset with trend for each pixel as netCDF4 dataset
    ds_trend.to_netcdf(f"./NDVI/NDVI_Spatial_Trend_{forest_type}.nc")

### LAI

In [6]:
# calculate spatial trends for BL and NL seperately
for f in range(len(forest_types)):
    forest_type = forest_types[f]

    # open all yearly datasets as one
    directory = f"./LAI/Yearly_harmonized"
    filelist = glob.glob(os.path.join(directory, f"*{forest_type}*.nc"))
    ds = xr.open_mfdataset(filelist, concat_dim="time", combine="nested", engine="netcdf4")

    # dechunk time dimension
    ds = ds.chunk(dict(time=-1))

    # get all time step values (in this case, all years)
    time_steps = ds["time"].dt.year.values

    # apply function to dataset to calculate spatial trend
    da_trend = xr.apply_ufunc(calculate_trend,
                              ds["LAI"],                             # y 
                              xr.DataArray(time_steps, dims="time"),  # x
                              input_core_dims = [["time"], ["time"]],
                              output_core_dims=[[]], 
                              vectorize = True,
                              dask = "parallelized",
                              output_dtypes = [float])
    
    # set variable name
    da_trend.name = "LAI"

    # convert to dataset (worked better for saving as netcdf)
    ds_trend = da_trend.to_dataset()

    # compress data to reduce size
    comp = dict(zlib=True, complevel=4)
    ds_trend.LAI.encoding.update(comp)
    
    # save resulting dataset with trend for each pixel as netCDF4 dataset
    ds_trend.to_netcdf(f"./LAI/LAI_Spatial_Trend_{forest_type}.nc")

### FAPAR

In [7]:
# calculate spatial trends for BL and NL seperately
for f in range(len(forest_types)):
    forest_type = forest_types[f]

    # open all yearly datasets as one
    directory = f"./FAPAR/Yearly_harmonized"
    filelist = glob.glob(os.path.join(directory, f"*{forest_type}*.nc"))
    ds = xr.open_mfdataset(filelist, concat_dim="time", combine="nested", engine="netcdf4")

    # dechunk time dimension
    ds = ds.chunk(dict(time=-1))

    # get all time step values (in this case, all years)
    time_steps = ds["time"].dt.year.values

    # apply function to dataset to calculate spatial trend
    da_trend = xr.apply_ufunc(calculate_trend,
                              ds["FAPAR"],                             # y 
                              xr.DataArray(time_steps, dims="time"),  # x
                              input_core_dims = [["time"], ["time"]],
                              output_core_dims=[[]], 
                              vectorize = True,
                              dask = "parallelized",
                              output_dtypes = [float])
    
    # set variable name
    da_trend.name = "FAPAR"

    # convert to dataset (worked better for saving as netcdf)
    ds_trend = da_trend.to_dataset()

    # compress data to reduce size
    comp = dict(zlib=True, complevel=4)
    ds_trend.FAPAR.encoding.update(comp)
    
    # save resulting dataset with trend for each pixel as netCDF4 dataset
    ds_trend.to_netcdf(f"./FAPAR/FAPAR_Spatial_Trend_{forest_type}.nc")

### Air Temperature

In [8]:
# calculate spatial trends for BL and NL seperately
for f in range(len(forest_types)):
    forest_type = forest_types[f]

    # open all yearly datasets as one
    directory = f"./Climate/Temperature/Yearly"
    filelist = glob.glob(os.path.join(directory, f"*{forest_type}*.nc"))
    ds = xr.open_mfdataset(filelist, concat_dim="time", combine="nested", engine="netcdf4")

    # dechunk time dimension
    ds = ds.chunk(dict(time=-1))

    # get all time step values (in this case, all years)
    time_steps = ds["time"].dt.year.values

    # apply function to dataset to calculate spatial trend
    da_trend = xr.apply_ufunc(calculate_trend,
                              ds["t2m"],                             # y 
                              xr.DataArray(time_steps, dims="time"),  # x
                              input_core_dims = [["time"], ["time"]],
                              output_core_dims=[[]], 
                              vectorize = True,
                              dask = "parallelized",
                              output_dtypes = [float])
    
    # set variable name
    da_trend.name = "t2m"

    # convert to dataset (worked better for saving as netcdf)
    ds_trend = da_trend.to_dataset()

    # compress data to reduce size
    comp = dict(zlib=True, complevel=4)
    ds_trend.t2m.encoding.update(comp)
    
    # save resulting dataset with trend for each pixel as netCDF4 dataset
    ds_trend.to_netcdf(f"./Climate/Temperature/t2m_Spatial_Trend_{forest_type}.nc")

### Precipitation

In [9]:
# calculate spatial trends for all of GER
# open all yearly datasets as one
directory = f"./Climate/Precipitation/Yearly"
filelist = glob.glob(os.path.join(directory, "*.nc"))
ds = xr.open_mfdataset(filelist, concat_dim="time", combine="nested", engine="netcdf4")

# dechunk time dimension
ds = ds.chunk(dict(time=-1))

# get all time step values (in this case, all years)
time_steps = ds["time"].dt.year.values

# apply function to dataset to calculate spatial trend
da_trend = xr.apply_ufunc(calculate_trend,
                            ds["tp"],                             # y 
                            xr.DataArray(time_steps, dims="time"),  # x
                            input_core_dims = [["time"], ["time"]],
                            output_core_dims=[[]], 
                            vectorize = True,
                            dask = "parallelized",
                            output_dtypes = [float])
    
# set variable name
da_trend.name = "tp"

# convert to dataset (worked better for saving as netcdf)
ds_trend = da_trend.to_dataset()

# compress data to reduce size
comp = dict(zlib=True, complevel=4)
ds_trend.tp.encoding.update(comp)
    
# save resulting dataset with trend for each pixel as netCDF4 dataset
ds_trend.to_netcdf(f"./Climate/Precipitation/tp_Spatial_Trend_GER.nc")

### Wind Speed

In [10]:
# calculate spatial trends for BL and NL seperately
for f in range(len(forest_types)):
    forest_type = forest_types[f]

    # open all yearly datasets as one
    directory = f"./Climate/Windspeed/Yearly"
    filelist = glob.glob(os.path.join(directory, f"*{forest_type}*.nc"))
    ds = xr.open_mfdataset(filelist, concat_dim="time", combine="nested", engine="netcdf4")

    # dechunk time dimension
    ds = ds.chunk(dict(time=-1))

    # get all time step values (in this case, all years)
    time_steps = ds["time"].dt.year.values

    # apply function to dataset to calculate spatial trend
    da_trend = xr.apply_ufunc(calculate_trend,
                              ds["wind10m"],                             # y 
                              xr.DataArray(time_steps, dims="time"),  # x
                              input_core_dims = [["time"], ["time"]],
                              output_core_dims=[[]], 
                              vectorize = True,
                              dask = "parallelized",
                              output_dtypes = [float])
    
    # set variable name
    da_trend.name = "wind10m"

    # convert to dataset (worked better for saving as netcdf)
    ds_trend = da_trend.to_dataset()

    # compress data to reduce size
    comp = dict(zlib=True, complevel=4)
    ds_trend.wind10m.encoding.update(comp)
    
    # save resulting dataset with trend for each pixel as netCDF4 dataset
    ds_trend.to_netcdf(f"./Climate/Windspeed/wind10m_Spatial_Trend_{forest_type}.nc")

### Soil Moisture

In [11]:
# create list of variables to process
# four soil water volume layers with different depths
vars = ("swvl1", "swvl2", "swvl3", "swvl4")

# also create list of folders where datasets are stored
folders = ("Soil_Water_1", "Soil_Water_2", "Soil_Water_3", "Soil_Water_4")

In [12]:
# calculate spatial trends for all four soil layers for BL and NL seperately
for v in range(len(vars)):
    
    # select variable (soil water volume layer 1-4)
    variable = vars[v]
    folder = folders[v]
        
    # process needleleaved and broadleaved forest
    for f in range(len(forest_types)):
        forest_type = forest_types[f]

        # open all yearly datasets as one
        directory = f"./Climate/{folder}/Yearly"
        filelist = glob.glob(os.path.join(directory, f"*{forest_type}*.nc"))
        ds = xr.open_mfdataset(filelist, concat_dim="time", combine="nested", engine="netcdf4")

        # dechunk time dimension
        ds = ds.chunk(dict(time=-1))

        # get all time step values (in this case, all years)
        time_steps = ds["time"].dt.year.values

        # apply function to dataset to calculate spatial trend
        da_trend = xr.apply_ufunc(calculate_trend,
                                ds[variable],                             # y 
                                xr.DataArray(time_steps, dims="time"),    # x
                                input_core_dims = [["time"], ["time"]],
                                output_core_dims=[[]], 
                                vectorize = True,
                                dask = "parallelized",
                                output_dtypes = [float])
        
        # set variable name
        da_trend.name = variable

        # convert to dataset (worked better for saving as netcdf)
        ds_trend = da_trend.to_dataset()

        # compress data to reduce size
        comp = dict(zlib=True, complevel=4)
        ds_trend[variable].encoding.update(comp)
        
        # save resulting dataset with trend for each pixel as netCDF4 dataset
        ds_trend.to_netcdf(f"./Climate/{folder}/{variable}_Spatial_Trend_{forest_type}.nc")

### SPEI

In [16]:
# calculate spatial trends for all of GER
ds = xr.open_dataset("./SPEI/SPEI_calc_3M_GER.nc",  decode_coords="all",decode_times=True)

# dechunk time dimension
ds = ds.chunk(dict(time=-1))

# get all time step values (in this case, all months)
# convert month + year to numeric 
time_steps = ds["time"].astype("datetime64[M]").astype(int)

# apply function to dataset to calculate spatial trend
da_trend = xr.apply_ufunc(calculate_trend,
                            ds["spei"],                             # y 
                            xr.DataArray(time_steps, dims="time"),  # x
                            input_core_dims = [["time"], ["time"]],
                            output_core_dims=[[]], 
                            vectorize = True,
                            dask = "parallelized",
                            output_dtypes = [float])
    
# set variable name
da_trend.name = "spei"

# convert to dataset (worked better for saving as netcdf)
ds_trend = da_trend.to_dataset()

# compress data to reduce size
comp = dict(zlib=True, complevel=4)
ds_trend.spei.encoding.update(comp)
    
# save resulting dataset with trend for each pixel as netCDF4 dataset
ds_trend.to_netcdf(f"./SPEI/SPEI_calc_3M_Spatial_Trend_GER.nc")

### NIRv GPP

In [None]:
# here, i had to dive a little deeper into dask because the normal process i used above kept failing...

In [None]:
# --- Dask Client Setup ---
client = None
cluster = None
try:
    print("Attempting to start Dask LocalCluster...")
    # Configure Dask to use processes (for better memory isolation)
    # and set a memory limit per worker.
    # Total memory usage will be approx. n_workers * memory_limit.
    # For an 8GB system, '2GB' per worker with 3 workers targets 6GB, leaving room.
    cluster = LocalCluster(
        n_workers=3, # Number of worker processes. Adjusted for 8GB RAM.
        memory_limit='2GB', # Memory limit per worker process. Adjusted for 8GB RAM.
        processes=True, # Use processes instead of threads for better memory isolation.
    )
    client = Client(cluster)
    print(f"Dask Dashboard link: {client.dashboard_link}")
    print(f"Dask Client started with {len(client.cluster.workers)} workers.")

except Exception as e:
    print(f"Failed to start Dask client. Proceeding without explicit client. Error: {e}")
    # If client fails to start, Dask will fall back to its default local scheduler.
    # This might still lead to MemoryErrors for very large computations.

Attempting to start Dask LocalCluster...
Dask Dashboard link: http://127.0.0.1:8787/status
Dask Client started with 3 workers.


2025-06-10 13:14:48,920 - distributed.scheduler - ERROR - Task ('vectorize_calculate_trend-vectorize_calculate_trend_0-transpose-89e542e4f12dfedfeee21cdf44905500', 0, 0) marked as failed because 4 workers died while trying to run it


In [None]:
# calculate spatial trends for BL and NL seperately
for f in range(len(forest_types)):
    forest_type = forest_types[f]

    # open all yearly datasets as one
    directory = f"./GPP/GPP_NIRv/Monthly_harmonized"
    filelist = glob.glob(os.path.join(directory, f"*{forest_type}*.nc"))

    ds = xr.open_mfdataset(
            filelist,
            concat_dim="time",
            combine="nested",
            engine="netcdf4",
            chunks={'time': -1, 'lat': 200, 'lon': 200} # explicitly chunk spatial dimensions
        )
    
    # in previous tries, i had problems with the dimensions
    # adding extra checks to print dimension after each step
    print(f"Original dataset for {forest_type} loaded with shape: {ds['GPP'].shape}")
    print(f"Initial chunking of GPP: {ds['GPP'].chunks}")

    # get all time step values (in this case, months)
    # convert month + year to simple numeric sequence
    time_steps = np.arange(len(ds["time"]))

    # apply function to dataset to calculate spatial trend
    print(f"Applying calculate_trend function for {forest_type}...")
    da_trend = xr.apply_ufunc(calculate_trend,
                              ds["GPP"],                                  # y (GPP data for each pixel's time series)
                              xr.DataArray(time_steps, dims="time"),      # x (time steps for each pixel's time series)
                              input_core_dims = [["time"], ["time"]],     # function operates along 'time' for both inputs
                              output_core_dims=[[]],                      # expecting a single scalar output (slope) per pixel
                              vectorize = True,                           # apply function pixel-wise
                              dask = "parallelized",                      # use Dask for parallel computation
                              output_dtypes = [float],
                              dask_gufunc_kwargs={"allow_rechunk": True}) # allow Dask to rechunk core dimensions

    
    print(f"Shape of da_trend for {forest_type} after apply_ufunc: {da_trend.shape}")
    print(f"Chunking of da_trend after apply_ufunc: {da_trend.chunks}")

    # set variable name
    da_trend.name = "GPP"

    # convert to dataset (worked better for saving as netcdf)
    ds_trend = da_trend.to_dataset()

    # explicitly compute result to prevent issues while saving
    ds_trend_computed = ds_trend.compute()
    print(f"Finished explicit computation of trend for {forest_type}. Result is now in memory.")

    # define compression and chunks in encoding
    comp = dict(zlib=True, complevel=4)
    encoding = {'GPP': {**comp, 'chunksizes': (100, 100)}}

    # save dataset
    ds_trend_computed.to_netcdf(f"./GPP/GPP_NIRv/GPP_Spatial_Trend_{forest_type}.nc", encoding=encoding)
    print(f"Successfully saved trend for {forest_type}.")

  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.op

Original dataset for NL loaded with shape: (522, 2952, 3510)
Initial chunking of GPP: ((7, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 11, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12), (200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 152), (200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 110))
Applying calculate_trend function for NL...
Shape of da_trend for NL after apply_ufunc: (2952, 3510)
Chunking of da_trend after apply_ufunc: ((200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 152), (200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 110))


This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.


Finished explicit computation of trend for NL. Result is now in memory.
Successfully saved trend for NL.


  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.op

Original dataset for BL loaded with shape: (522, 2952, 3510)
Initial chunking of GPP: ((7, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 11, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12), (200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 152), (200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 110))
Applying calculate_trend function for BL...
Shape of da_trend for BL after apply_ufunc: (2952, 3510)
Chunking of da_trend after apply_ufunc: ((200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 152), (200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 110))


This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.


Finished explicit computation of trend for BL. Result is now in memory.
Successfully saved trend for BL.


In [13]:
# Close the Dask client and cluster when all processing is done.
if client:
    print("\nClosing Dask client and cluster.")
    client.close()
    cluster.close()


Closing Dask client and cluster.


### NPP

In [None]:
# --- Dask Client Setup ---
client = None
cluster = None
try:
    print("Attempting to start Dask LocalCluster...")
    # Configure Dask to use processes (for better memory isolation)
    # and set a memory limit per worker.
    # Total memory usage will be approx. n_workers * memory_limit.
    # For an 8GB system, '2GB' per worker with 3 workers targets 6GB, leaving room.
    cluster = LocalCluster(
        n_workers=3, # Number of worker processes. Adjusted for 8GB RAM.
        memory_limit='2GB', # Memory limit per worker process. Adjusted for 8GB RAM.
        processes=True, # Use processes instead of threads for better memory isolation.
    )
    client = Client(cluster)
    print(f"Dask Dashboard link: {client.dashboard_link}")
    print(f"Dask Client started with {len(client.cluster.workers)} workers.")

except Exception as e:
    print(f"Failed to start Dask client. Proceeding without explicit client. Error: {e}")
    # If client fails to start, Dask will fall back to its default local scheduler.
    # This might still lead to MemoryErrors for very large computations.

Attempting to start Dask LocalCluster...


Perhaps you already have a cluster running?
Hosting the HTTP server on port 62174 instead


Dask Dashboard link: http://127.0.0.1:62174/status
Dask Client started with 3 workers.


In [None]:
# calculate spatial trends for BL and NL seperately
for f in range(len(forest_types)):
    forest_type = forest_types[f]

    # open all yearly datasets as one
    directory = f"./NPP/NPP_calc/Monthly"
    filelist = glob.glob(os.path.join(directory, f"*{forest_type}*.nc"))

    ds = xr.open_mfdataset(
            filelist,
            concat_dim="time",
            combine="nested",
            engine="netcdf4",
            chunks={'time': -1, 'lat': 200, 'lon': 200} # Explicitly chunk spatial dimensions
        )
    
    # in previous tries, i had problems with the dimensions
    # adding extra checks to print dimension after each step
    print(f"Original dataset for {forest_type} loaded with shape: {ds['NPP'].shape}")
    print(f"Initial chunking of NPP: {ds['NPP'].chunks}")

    # get all time step values (in this case, months)
    # convert month + year to simple numeric sequence
    time_steps = np.arange(len(ds["time"]))

    # apply function to dataset to calculate spatial trend
    print(f"Applying calculate_trend function for {forest_type}...")
    da_trend = xr.apply_ufunc(calculate_trend,
                              ds["NPP"],                                  # y (GPP data for each pixel's time series)
                              xr.DataArray(time_steps, dims="time"),      # x (time steps for each pixel's time series)
                              input_core_dims = [["time"], ["time"]],     # function operates along 'time' for both inputs
                              output_core_dims=[[]],                      # expecting a single scalar output (slope) per pixel
                              vectorize = True,                           # apply function pixel-wise
                              dask = "parallelized",                      # use Dask for parallel computation
                              output_dtypes = [float],
                              dask_gufunc_kwargs={"allow_rechunk": True}) # allow Dask to rechunk core dimensions

    
    print(f"Shape of da_trend for {forest_type} after apply_ufunc: {da_trend.shape}")
    print(f"Chunking of da_trend after apply_ufunc: {da_trend.chunks}")

    # set variable name
    da_trend.name = "NPP"

    # convert to dataset (worked better for saving as netcdf)
    ds_trend = da_trend.to_dataset()

    # explicitly compute result to prevent issues while saving
    ds_trend_computed = ds_trend.compute()
    print(f"Finished explicit computation of trend for {forest_type}. Result is now in memory.")

    # define compression and chunks in encoding
    comp = dict(zlib=True, complevel=4)
    encoding = {'NPP': {**comp, 'chunksizes': (100, 100)}}

    # save dataset
    ds_trend_computed.to_netcdf(f"./NPP/NPP_calc/NPP_Spatial_Trend_{forest_type}.nc", encoding=encoding)
    print(f"Successfully saved trend for {forest_type}.")

  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.op

Original dataset for NL loaded with shape: (522, 2952, 3510)
Initial chunking of NPP: ((7, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 11, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12), (200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 152), (200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 110))
Applying calculate_trend function for NL...
Shape of da_trend for NL after apply_ufunc: (2952, 3510)
Chunking of da_trend after apply_ufunc: ((200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 152), (200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 110))


This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.


Finished explicit computation of trend for NL. Result is now in memory.
Successfully saved trend for NL.


  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.open_mfdataset(
  ds = xr.op

Original dataset for BL loaded with shape: (522, 2952, 3510)
Initial chunking of NPP: ((7, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 11, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12), (200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 152), (200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 110))
Applying calculate_trend function for BL...
Shape of da_trend for BL after apply_ufunc: (2952, 3510)
Chunking of da_trend after apply_ufunc: ((200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 152), (200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 110))


This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.


Finished explicit computation of trend for BL. Result is now in memory.
Successfully saved trend for BL.


In [20]:
# Close the Dask client and cluster when all processing is done.
if client:
    print("\nClosing Dask client and cluster.")
    client.close()
    cluster.close()


Closing Dask client and cluster.
