In [1]:
import glob
import os
import sys
from datetime import datetime
from pathlib import Path, WindowsPath

import pandas as pd
import pymannkendall as mk
import xarray as xr
from dask.distributed import Client, progress
import numpy as np
from pprint import pprint

In [2]:
""" 
Calculate Mann-Kendall trend (binary), p-value, Theil Sen slope and % change computed as (Yfit_fin - Yfit_in) / np.abs(Yfit_in) * 100
on time period 2000-2023 (24 years)
and at 1km resolution

Parameters:
base_path: path object
"""

def mk_trend_slope(vec):
    """Helper function to compute mean and standard deviation.
    Uses standard deviation to mask out NoData cells.

    Args:
        vec (1D array): cell time series

    Returns:
        tuple (integer, float, float, float): Returns tuple of Mann-Kendall trend (binary), p-value, Theil Sen slope and % change
        NoData value: -999
    """
    std = np.nanstd(vec)
    mk_trends = {"no trend": 0,
                 "increasing": 1,
                 "decreasing": -1}
    if(std == 0 or np.isnan(std)):
        trend = -999
        slope = -999
        pvalue = -999
        relative_change = -999
    else:
        mk_test = mk.original_test(vec)
        trend = mk_trends[mk_test.trend]
        slope = mk_test.slope
        pvalue = mk_test.p
        # define Yfit_in, Yfit_fin and overall period
        temporal_coverage = 24
        Yfit_in = mk_test.intercept
        if Yfit_in == 0:
            relative_change = -999
        else:
            relative_change = (slope * temporal_coverage) / np.abs(Yfit_in) * 100

    return (trend, slope, pvalue, relative_change)

In [3]:
def mk_wrapper(ds_chunked):
    """wrapper function of ``mk_trend_slope``. Stacks the output of ``mk_trend_slope`` into one DataArray-

    Args:
        gdmp_chunked (DaskArray): chunked input array, with x,y,time dimensions

    Returns:
        DataArray: trend, slope, pvalue and relative_change
    """
    trend, slope, pvalue, relative_change = xr.apply_ufunc(
        mk_trend_slope,ds_chunked,
        input_core_dims=[['time']],
        output_core_dims=[[],[],[],[]],
        vectorize=True, 
        dask="parallelized",
        output_dtypes=[float, float, float, float])
    return xr.concat([trend, slope, pvalue, relative_change], dim="statistics")

In [4]:
base_path = r'L:\f02_data\wildfires\spatial_data\output'
in_path = os.path.join(base_path, 'BA_1km_v23') 

resolution = '1km'
out_path  = os.path.join(base_path, 'BA_1km_trends')
os.makedirs(out_path, exist_ok=True)

 # load tiffs into xarray
tif_list = [os.path.basename(f) for f in glob.glob(in_path + '/*.tif')]
tif_list = tif_list[:-1] #to include 00-23 

 # Create variable used for time axis
time_var = xr.Variable('time', pd.to_datetime([f'{fname[-12:-8]}-01-01' for fname in tif_list]))
# Load in and concatenate all individual GeoTIFFs
ds= xr.concat([xr.open_dataset(os.path.join(in_path, i), engine='rasterio') for i in tif_list], dim=time_var)

# Rename the variable to a more useful name
ds = ds.rename({'band_data': 'BA'})

# load data into distributed cluster. this should avoid memory error
client = Client()
BA_future = client.scatter(ds)
futures = client.submit(mk_wrapper, BA_future)
result = futures.result()
result = result.compute() 

# assign coordinates (layer names) to statistics dimension
# prepare DataArrays for saving to disk
result = result.assign_coords(statistics=["trend", "slope", "pvalue", "relative_change"])
result = result.squeeze(dim="band")
result['BA'].rio.write_nodata(-999, inplace=True)
result.rio.write_crs("epsg:3035", inplace=True)
ds = ds.squeeze(dim="band")

# extract single layers
trend = result.sel(statistics="trend")
slope = result.sel(statistics="slope")
pvalue = result.sel(statistics="pvalue")
relative_change = result.sel(statistics="relative_change")

# rename layers 
trend = trend.rename({"BA": "BA_trend"})
slope = slope.rename({"BA": "BA_slope"})
pvalue = pvalue.rename({"BA": "BA_pvalue"})
relative_change = relative_change.rename({"BA": "BA_rel_ch"})

# save to disk
trend['BA_trend'].rio.to_raster(os.path.join(out_path, f'BA_{resolution}_00_23_trend.tif'), compress='LZW')
slope['BA_slope'].rio.to_raster(os.path.join(out_path, f'BA_{resolution}_00_23_slope.tif'), compress='LZW')
pvalue['BA_pvalue'].rio.to_raster(os.path.join(out_path, f'BA_{resolution}_00_23_pvalue.tif'), compress='LZW')
relative_change['BA_rel_ch'].rio.to_raster(os.path.join(out_path, f'BA_{resolution}_00_23_rel_change.tif'), compress='LZW')

Perhaps you already have a cluster running?
Hosting the HTTP server on port 62201 instead
