## To Do

- what is a good chunk size?
- how can I verify a better performance?
- check accuracy of results
- check computation time on cluster

In [None]:
from toad import TOAD
import numpy as np
import xarray as xr
from toad.shifts_detection.methods import ASDETECT as ASDETECT
#import dask

fp = "tutorials/test_data/garbe_2020_antarctica.nc"
#fp = "tutorials/test_data/global_mean_summer_tas.nc"
var = "thk"

data = xr.open_dataset(fp)
spatial_dims = list(data[var].dims)
spatial_dims.remove("time")

c = 5
c_dict = {dim: c for dim in spatial_dims}
c_dict["time"] = 3
data = data.coarsen(**c_dict,
                    boundary="trim").reduce(np.mean)

print(f"Dimensions after coarsening:\n{data.sizes}")

cs = None
print(data[var].chunk({'x': cs, 'y': cs}).data.nbytes / 1e6, "MB")


Dimensions after coarsening:
Frozen({'time': 116, 'y': 38, 'x': 38})
0.670016 MB


In [2]:
from dask.distributed import Client

client = Client(n_workers=5)
client


0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 5
Total threads: 10,Total memory: 15.29 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:43491,Workers: 0
Dashboard: http://127.0.0.1:8787/status,Total threads: 0
Started: Just now,Total memory: 0 B

0,1
Comm: tcp://127.0.0.1:34241,Total threads: 2
Dashboard: http://127.0.0.1:42651/status,Memory: 3.06 GiB
Nanny: tcp://127.0.0.1:45665,
Local directory: /tmp/dask-scratch-space/worker-1jdsnjjx,Local directory: /tmp/dask-scratch-space/worker-1jdsnjjx

0,1
Comm: tcp://127.0.0.1:38363,Total threads: 2
Dashboard: http://127.0.0.1:44027/status,Memory: 3.06 GiB
Nanny: tcp://127.0.0.1:45927,
Local directory: /tmp/dask-scratch-space/worker-5fkng8da,Local directory: /tmp/dask-scratch-space/worker-5fkng8da

0,1
Comm: tcp://127.0.0.1:38025,Total threads: 2
Dashboard: http://127.0.0.1:42727/status,Memory: 3.06 GiB
Nanny: tcp://127.0.0.1:45021,
Local directory: /tmp/dask-scratch-space/worker-8fw2huuy,Local directory: /tmp/dask-scratch-space/worker-8fw2huuy

0,1
Comm: tcp://127.0.0.1:43559,Total threads: 2
Dashboard: http://127.0.0.1:35665/status,Memory: 3.06 GiB
Nanny: tcp://127.0.0.1:41247,
Local directory: /tmp/dask-scratch-space/worker-l34ofvwv,Local directory: /tmp/dask-scratch-space/worker-l34ofvwv

0,1
Comm: tcp://127.0.0.1:41587,Total threads: 2
Dashboard: http://127.0.0.1:44701/status,Memory: 3.06 GiB
Nanny: tcp://127.0.0.1:33167,
Local directory: /tmp/dask-scratch-space/worker-_kv60a03,Local directory: /tmp/dask-scratch-space/worker-_kv60a03


In [5]:
#dask.config.set(scheduler='threads')

td_new = TOAD(data)
td_new.compute_shifts(var,
                  method=ASDETECT(),
                  overwrite=True,
                  chunk_size=10,
                  dask_compute=True)

[                                        ] | 0% Completed | 169.48 us

[########################################] | 100% Completed | 1.67 sms


## Chunk Size

In [6]:
import time

# Define the chunk sizes to test
chunk_sizes = [None, 5, 20, 30, 40, 50]
sample_size = 5

# Run tests
results = [0] * len(chunk_sizes)

print("Benchmarking chunk sizes...\n")
for i in range(len(chunk_sizes)):
    size = chunk_sizes[i]
    for j in range(sample_size):
        # get test data
        td = TOAD(data)

        # Time the execution
        lazy_shifts = td_new.compute_shifts(var,
                                            method=ASDETECT(),
                                            overwrite=True,
                                            return_results_directly=True,
                                            chunk_size=size,
                                            dask_compute=False)
        
        start_time = time.time()
        _ = lazy_shifts.compute()
        elapsed = time.time() - start_time

        results[i] += elapsed
    results[i] /= sample_size
    print(f"Chunk size {size}x{size}: {results[i]:.2f} seconds")

Benchmarking chunk sizes...

Chunk size NonexNone: 1.29 seconds
Chunk size 5x5: 1.42 seconds
Chunk size 20x20: 1.41 seconds
Chunk size 30x30: 1.33 seconds
Chunk size 40x40: 1.43 seconds
Chunk size 50x50: 1.33 seconds


## Artificial Dataset

- as an example we aim for 2 chunks, each with 50 MB of data (recommendation: 10MB-1GB, source: https://docs.dask.org/en/latest/array-chunks.html)
- `float64` uses 8 bytes to store one value (8 bits/byte * 8 bytes = 64 bits)
- each stored value is 1 byte
- 50 MB = 50.000.000 bytes per chunk
- we assume two spatial dimensions of the same chunk size and a temporal dimension of 100 data points (close to real world examples); time should not be chunked!
- question: what is the number of datapoints $c$ per spatial dimension for a given chunk-memory-size $m$?
- answer:

$$ m = 100 \cdot c \cdot c \cdot 8\text{ bytes} = 800 \cdot c^2 \text{ bytes}$$ 
$$ c = \sqrt{\frac{m}{800}} $$

- for m = 50.000.000 bytes: $ \enspace \enspace c = 250 $



In [12]:
import dask.array as da
import xarray as xr
import numpy as np

# Parameters
size = 500
shape = (100, 1000, 1000)
chunks = (-1, size, size)

# Create Dask array lazily
data_dask = da.random.random(shape, chunks=chunks)

# Wrap in xarray
time = np.arange(shape[0])
lat = np.linspace(-90, 90, shape[1])
lon = np.linspace(-180, 180, shape[2])

data = xr.DataArray(
    data_dask,
    dims=["time", "x", "y"],
    coords={"time": time, "x": lat, "y": lon}
)

print(data.dtype)
sizes = [np.prod(c) * data.dtype.itemsize for c in [data.chunks[0] + item for item in list(zip(*data.chunks[1:]))]]
print("Chunk sizes (MB):", [float(s) / 1e6 for s in sizes[:]])

print(f"Expected number of chunks: {shape[1] // size}")
print(f"Actual number of chunks: {len(sizes)}")


float64
Chunk sizes (MB): [200.0, 200.0]
Expected number of chunks: 2
Actual number of chunks: 2


In [None]:
import sys

sys.getsizeof(data)

104

In [None]:

from toad.shifts_detection.methods import ASDETECT as ASDETECT
from toad import TOAD

import matplotlib.pyplot as plt
import dask.array as da
import pandas as pd
import xarray as xr
import numpy as np
import time

# Parameters
shape = (100,1000,1000)
tim = np.arange(shape[0])
lat = np.linspace(-90, 90, shape[1])
lon = np.linspace(-180, 180, shape[2])
var = "var"

chunk_sizes = [None, 250]#[None, 250, 2500, 25000]   # chunk sizes for spatial dimensions
sample_size = 1

print("> Running test...")
results = [0] * len(chunk_sizes)                    # create space for results
memory_chunk = [0] * len(chunk_sizes)               # create space for memory usage
for i in range(len(chunk_sizes)):
    print(f">> Chunk size: {chunk_sizes[i]}")
    size = chunk_sizes[i]
    chunks = (-1, size, size)
    # Create Dask array - lazy
    data_dask = da.random.random(shape, chunks=chunks)
    data = xr.Dataset(
        {var: (["time", "lat", "lon"], data_dask)},
        coords={"time": tim, "lat": lat, "lon": lon}
    )
    td = TOAD(data)
    # call compute_shift once to let dask create the lazy dataframe object
    print("> Creating lazy dataframe object...")
    td.compute_shifts(var,
                    method=ASDETECT(),
                    overwrite=True,
                    chunk_size=None,)
    print("DONE.")

    # get memory size of one chunk
    memory_chunk[i] = float(np.prod(list(zip(*data[var].chunks))[0]) * data[var].dtype.itemsize ) / 1e6      # in MB

    for j in range(sample_size):
        print(f">>> Sample {j+1}/{sample_size}...",end="\n")
        # Time the execution
        start_time = time.time()
        td.compute_shifts(var,
                          method=ASDETECT(),
                          overwrite=True,
                          chunk_size=size,)
        elapsed = time.time() - start_time

        results[i] += elapsed
    results[i] /= sample_size

# store results in pandas dataframe
print("Storing results in pandas dataframe...",end="")
chunk_str = [str(i) for i in chunk_sizes]
df = pd.DataFrame(results, columns=["Time (s)"])
df["Chunk Size"] = chunk_str
df["Memory Size (MB)"] = memory_chunk
print(df)

df.to_pickle("chunk_size_performance.pkl")
print("DONE.")

# plot results
print("Plotting results...",end="")
plt.figure(figsize=(10, 5))
plt.bar(df["Chunk Size"], df["Time (s)"], color="lightblue")
plt.xlabel("Memory Size (MB)")
plt.ylabel("Time (s)")
plt.title(f"Chunk Size Performance\nData Size: {shape}\n Time Sample Size: {sample_size}")
plt.xticks(df["Chunk Size"].unique(),df["Memory Size (MB)"].unique(),rotation=45)
plt.tight_layout()
plt.savefig("chunk_size_performance.png")
print("DONE.")



> Running test...
>> Chunk size: None
> Creating lazy dataframe object...
[                                        ] | 0% Completed | 180.44 ss


In [4]:
import xarray as xr
import numpy as np
from toad import TOAD

# Define large shape (e.g., 500 time steps, 100x100 spatial grid)
shape = (100, 1000, 1000)  # (time, lat, lon)

# Create coordinate values
time = np.arange(shape[0])
lat = np.linspace(-90, 90, shape[1])
lon = np.linspace(-180, 180, shape[2])

# Create synthetic data
data = np.random.rand(*shape)

# Create a Dataset with a named variable
dataset = xr.Dataset(
    {"temperature": (["time", "lat", "lon"], data)},
    coords={"time": time, "lat": lat, "lon": lon}
)

# If TOAD expects a DataArray, extract the named variable
td_art = TOAD(dataset["temperature"])  # 'temperature' is the variable name

print(td_art.data)


<xarray.DataArray 'temperature' (time: 100, lat: 1000, lon: 1000)> Size: 800MB
array([[[0.45244427, 0.48663247, 0.17611345, ..., 0.02277043,
         0.87496755, 0.14338072],
        [0.25982414, 0.26346329, 0.69602031, ..., 0.30828042,
         0.41753337, 0.42609539],
        [0.91167078, 0.42303265, 0.40895033, ..., 0.56137883,
         0.22764649, 0.66449479],
        ...,
        [0.04539018, 0.56158138, 0.24804021, ..., 0.33382633,
         0.38129622, 0.0201217 ],
        [0.71306811, 0.59268477, 0.90389439, ..., 0.08648879,
         0.2066183 , 0.08494006],
        [0.11945693, 0.98706842, 0.31252884, ..., 0.26919736,
         0.43294081, 0.13581645]],

       [[0.8772039 , 0.77607513, 0.35198362, ..., 0.67935323,
         0.78032563, 0.33370483],
        [0.8318307 , 0.3525827 , 0.43400293, ..., 0.20513175,
         0.02413925, 0.36144613],
        [0.34961881, 0.46877138, 0.00803813, ..., 0.17253645,
         0.69510436, 0.359389  ],
...
        [0.44588618, 0.60849406, 0.378

In [None]:
import sys

sys.getsizeof(data) / 1e6, "MB"

(800.000144, 'MB')

In [None]:
td_art.compute_shifts(td_art.data,
                  method=ASDETECT(),
                  overwrite=True,)

ValueError: The truth value of a Array is ambiguous. Use a.any() or a.all().

In [None]:
import time

# Define the chunk sizes to test
chunk_sizes = [50, 100, 500]

# Run tests
results = []

print("Benchmarking chunk sizes...\n")
for size in chunk_sizes:
    # get test data
    td = TOAD(data)

    # Time the execution
    start_time = time.time()
    td_art.compute_shifts(var,
                    method=ASDETECT(),
                    overwrite=True,
                    chunk_size=size,)
    elapsed = time.time() - start_time

    results.append((size, elapsed))
    print(f"Chunk size {size}x{size}: {elapsed:.2f} seconds")

# Summary
print("\nSummary:")
for size, elapsed in results:
    print(f"Chunk size {size}x{size}: {elapsed:.2f} s")


Benchmarking chunk sizes...

