In [1]:
import xarray as xr
import psutil
import os
from dask.distributed import Client

## Checking total memory with psutil

In [2]:
# Get total memory in bytes and convert to GB
total_memory = psutil.virtual_memory().total / (1024**3)
print(f"Total memory: {total_memory:.2f} GB")

Total memory: 63.25 GB


## Chunk Calculator

In [3]:
# Dimensions and data type
time_chunk = 365
latitude_chunk = 281
longitude_chunk = 441
data_size = 4  # float32 takes 4 bytes

# Calculate chunk size in bytes
chunk_size = time_chunk * latitude_chunk * longitude_chunk * data_size
chunk_size_GB = chunk_size / (1024**3)  # Convert to GB
print(f"Each chunk is approximately {chunk_size_GB:.6f} GB")

Each chunk is approximately 0.168499 GB


In [4]:
# Assuming you know your total memory (e.g., 16GB)
memory_limit = '8GB'  # Limit memory usage to 8GB per worker
client = Client(n_workers=4,memory_limit=memory_limit)  # Starts a local cluster with memory limits
print(client)

<Client: 'tcp://127.0.0.1:50694' processes=4 threads=24, memory=29.80 GiB>


In [10]:
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 0
Total threads: 0,Total memory: 0 B
Status: closed,Using processes: True

0,1
Comm: tcp://127.0.0.1:50694,Workers: 0
Dashboard: http://127.0.0.1:8787/status,Total threads: 0
Started: Just now,Total memory: 0 B


In [5]:
# Open the NetCDF file
fname = "era5_t2min_1970_2000.nc"
ds = xr.open_dataset("data/{}".format(fname),chunks={"latitude": "auto", "longitude": 25,"time": -1}) #can set up chunks, for example xr.open_dataset("data/{}".format(fname),chunks={'time': 365})
#if more than one file, then use xr.open_mfdataset

## Calculate seasonal anomalies

In [6]:
da = ds["daily_t2_min"]
season_mean = da.groupby("time.season").mean(dim='time')
season_stdev = da.groupby("time.season").std(dim='time')
ds['anomaly'] = (da - season_mean)
ds['anomaly_scaled'] = ds['anomaly']/season_stdev

In [7]:
ds

Unnamed: 0,Array,Chunk
Bytes,14.65 MiB,850.68 kiB
Shape,"(31, 281, 441)","(31, 281, 25)"
Dask graph,18 chunks in 2 graph layers,18 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 14.65 MiB 850.68 kiB Shape (31, 281, 441) (31, 281, 25) Dask graph 18 chunks in 2 graph layers Data type float32 numpy.ndarray",441  281  31,

Unnamed: 0,Array,Chunk
Bytes,14.65 MiB,850.68 kiB
Shape,"(31, 281, 441)","(31, 281, 25)"
Dask graph,18 chunks in 2 graph layers,18 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,14.65 MiB,850.68 kiB
Shape,"(31, 281, 441, 1)","(31, 281, 25, 1)"
Dask graph,18 chunks in 11 graph layers,18 chunks in 11 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 14.65 MiB 850.68 kiB Shape (31, 281, 441, 1) (31, 281, 25, 1) Dask graph 18 chunks in 11 graph layers Data type float32 numpy.ndarray",31  1  1  441  281,

Unnamed: 0,Array,Chunk
Bytes,14.65 MiB,850.68 kiB
Shape,"(31, 281, 441, 1)","(31, 281, 25, 1)"
Dask graph,18 chunks in 11 graph layers,18 chunks in 11 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,14.65 MiB,850.68 kiB
Shape,"(31, 281, 441, 1)","(31, 281, 25, 1)"
Dask graph,18 chunks in 18 graph layers,18 chunks in 18 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 14.65 MiB 850.68 kiB Shape (31, 281, 441, 1) (31, 281, 25, 1) Dask graph 18 chunks in 18 graph layers Data type float32 numpy.ndarray",31  1  1  441  281,

Unnamed: 0,Array,Chunk
Bytes,14.65 MiB,850.68 kiB
Shape,"(31, 281, 441, 1)","(31, 281, 25, 1)"
Dask graph,18 chunks in 18 graph layers,18 chunks in 18 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


In [8]:
ds.to_netcdf(os.path.join("temp/", 'era5_t2min_scaled_anomalies.nc'),compute=True,mode="w")

In [9]:
#Remember to close the dask client
client.close()