In [1]:
import xarray as xr 
import numpy as np
from dask.distributed import Client
from dask_jobqueue import SLURMCluster
import pandas as pd
import dask.array as da
import glob 
import os

In [2]:
cluster = SLURMCluster(
    job_name="climt1",          # --job-name
    cores=16,                     # Number of cores per task (adjust if needed)
    processes=16,                 # One process per task
    memory="100GB",               # --mem
    walltime="01:00:00",         # --time
    queue="med",               # --partition
    log_directory=".",           # Logs will be saved to the current directory
)

In [3]:
cluster.scale(4)

client = Client(cluster)

In [4]:
client

0,1
Connection method: Cluster object,Cluster type: dask_jobqueue.SLURMCluster
Dashboard: http://10.42.239.61:8787/status,

0,1
Dashboard: http://10.42.239.61:8787/status,Workers: 0
Total threads: 0,Total memory: 0 B

0,1
Comm: tcp://10.42.239.61:42545,Workers: 0
Dashboard: http://10.42.239.61:8787/status,Total threads: 0
Started: Just now,Total memory: 0 B


In [5]:
def create_dft(in_dir,arm_data=True):
    files = glob.glob(in_dir+os.sep+'*.nc')
    if len(files) == 0:
        files = glob.glob(in_dir+os.sep+'*.cdf')
        if len(files) == 0:
            raise Exception('No files Found')
        else:
            pass
    else:
        pass
    
    dft = pd.DataFrame(files,columns=['filepath'])
    dft['filename'] = dft['filepath'].str.split(os.sep).str[-1]
    
    if arm_data:
        dft['datetime'] = pd.to_datetime(dft['filename'].str.split(
            '.',
            expand=True).iloc[:, 2] + dft['filename'].str.split(
                '.',  expand=True).iloc[:, 3],format='%Y%m%d%H%M%S')
    return dft

In [6]:
in_era = '/home1/nalex2023/Datasets/era5_manus'

dft = create_dft(in_era,arm_data=False)

dft['datetime'] = dft['filename'].str.split('_').str[-1].str[:6]

dft['datetime'] = pd.to_datetime(dft['datetime'],format='%Y%m')




dft['month'] = dft['datetime'].dt.month
dft['year'] = dft['datetime'].dt.year

In [7]:


def sub_dset(dset):
    dset_sub = dset.sel(latitude=slice(0,-5),longitude=slice(141,154),pressure_level=slice(1000,800))
    return dset_sub

dset_new = xr.open_mfdataset(dft['filepath'],preprocess=sub_dset,parallel=True,
                             chunks={'latitude':10,'longitude':10,'time':-1})

If you’re using a Dask cluster, you can also use Dataset.persist() for quickly accessing intermediate outputs. This is most helpful after expensive operations like rechunking or setting an index. It’s a way of telling the cluster that it should start executing the computations that you have defined so far, and that it should try to keep those results in memory. You will get back a new Dask array that is semantically equivalent to your old array, but now points to running data.

```python

ds = ds.persist()

``` 

In [8]:
dset_hourly = dset_new.groupby('valid_time.hour').mean()


In [9]:
dset_hourly_computed = dset_hourly.compute()

This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.


ValueError: Array chunk size or shape is unknown. Possible solution with x.compute_chunk_sizes()