# Example on how to use time averaging with dask

In [1]:
# Jupyter Notebook with widget matplotlib plots
# %matplotlib notebook
# Jupyter Lab with widget matplotlib plots
%matplotlib widget 
# with Jupyter and Jupyter Lab but without widget matplotlib plots
# %matplotlib inline
%load_ext autoreload
%autoreload 2

In [2]:
%%time
import sys
import numpy as np
import matplotlib.pyplot as plt
from netCDF4 import Dataset, num2date
import pyicon as pyic
import cartopy.crs as ccrs
import glob, os
import xarray as xr
import pandas as pd
import cartopy
# import seawater as sw
import datetime

import multiprocessing
from dask_jobqueue import SLURMCluster # Setting up distributed memories via slurm
from dask.utils import format_bytes
from dask.distributed import Client, LocalCluster, progress # Libaray to orchestrate distributed resources

from tempfile import NamedTemporaryFile, TemporaryDirectory # Creating temporary Files/Dirs
from getpass import getuser # Libaray to copy things
from pathlib import Path # Object oriented libary to deal with paths

-----calc
sys glob os
numpy
netcdf
Done modules calc.
-----calc_xr
sys glob os
numpy
netcdf
xarray
Done modules calc.
-----tb
sys
json
numpy
scipy
netcdf datetime
matplotlib
mybreak
pnadas
xarray
done xarray
-----IconData
-----plotting
-----view
-----calc
-----calc_xr
-----tb
-----IconData
-----plotting
-----view
-----quickplots
-----quickplots
CPU times: user 1.95 s, sys: 1.72 s, total: 3.66 s
Wall time: 32.4 s


## Simulation details

In [3]:
run = 'exp.ocean_ncep6h_r2b8_hel20160-NCP'
path_data = f'/mnt/lustre01/work/mh0033/m211054/projects/icon/icon-oes-1.3.01/experiments/{run}/outdata/'

In [4]:
tave_int = ['2015-01-02', '2016-01-01']

In [5]:
path_scratch = f'/scratch/m/m300602/dask_tmp/{run}/'

In [6]:
res = 'res0.30'
fpath_ckdtree = f'/home/mpim/m300602/work/icon/grids/r2b9_oce_r0004/ckdtree/rectgrids/r2b9_oce_r0004_{res}_180W-180E_90S-90N.nc'
fpath_fx = '/pool/data/ICON/oes/input/r0004/OceanOnly_IcosSymmetric_4932m_rotatedZ37d_modified_srtm30_1min/ZSTAR/R2B9L128_fx.nc'
fpath_tgrid = f'/mnt/lustre01/work/mh0033/m300602/icon/grids/r2b9_oce_r0004/r2b9_oce_r0004_tgrid.nc'

In [7]:
mfdset_kwargs = dict(combine='nested', concat_dim='time', 
                     data_vars='minimal', coords='minimal', compat='override', join='override',
                     parallel=True,
                    )

## Start cluster

In [8]:
!echo $HOSTNAME

mlogin104


In [9]:
# Set some user specific variables
account_name = 'bm1102' # Account that is going to be 'charged' fore the computation
queue = 'gpu' # Name of the partition we want to use
job_name = 'PostProc' # Job name that is submitted via sbatch
memory = "100GiB" # Max memory per node that is going to be used - this depends on the partition
cores = 24 # Max number of cores per task that are reserved - also partition dependend
walltime = '8:00:00' # Walltime - also partition dependent

In [10]:
scratch_dir = Path('/scratch') / getuser()[0] / getuser() # Define the users scratch dir
# Create a temp directory where the output of distributed cluster will be written to, after this notebook
# is closed the temp directory will be closed
dask_tmp_dir = TemporaryDirectory(dir=scratch_dir, prefix=job_name)
cluster = SLURMCluster(memory=memory,
                       cores=cores,
                       project=account_name,
                       walltime=walltime,
                       queue=queue,
                       name=job_name,
                       scheduler_options={'dashboard_address': ':8787'},
                       local_directory=dask_tmp_dir.name,
                       job_extra=[f'-J {job_name}', 
                                  f'-D {dask_tmp_dir.name}',
                                  f'--begin=now',
                                  f'--output={dask_tmp_dir.name}/LOG_cluster.%j.o',
                                  f'--output={dask_tmp_dir.name}/LOG_cluster.%j.o'
                                 ],
                       interface='ib0')

Perhaps you already have a cluster running?
Hosting the HTTP server on port 36191 instead


In [11]:
print(cluster.job_script())

#!/usr/bin/env bash

#SBATCH -J dask-worker
#SBATCH -p gpu
#SBATCH -A bm1102
#SBATCH -n 1
#SBATCH --cpus-per-task=24
#SBATCH --mem=100G
#SBATCH -t 8:00:00
#SBATCH -J PostProc
#SBATCH -D /scratch/m/m300602/PostProcde4wgz43
#SBATCH --begin=now
#SBATCH --output=/scratch/m/m300602/PostProcde4wgz43/LOG_cluster.%j.o
#SBATCH --output=/scratch/m/m300602/PostProcde4wgz43/LOG_cluster.%j.o

/work/mh0033/m300602/miniconda3/envs/pyicon_py39/bin/python -m distributed.cli.dask_worker tcp://10.50.32.33:35556 --nthreads 4 --nprocs 6 --memory-limit 16.67GiB --name dummy-name --nanny --death-timeout 60 --local-directory /scratch/m/m300602/PostProcde4wgz43 --interface ib0 --protocol tcp://



In [12]:
cluster.scale(jobs=2)
cluster

Tab(children=(HTML(value='\n            <div class="jp-RenderedHTMLCommon jp-RenderedHTML jp-mod-trusted jp-Ou…

In [13]:
! squeue -u $USER

             JOBID PARTITION     NAME     USER ST       TIME  NODES NODELIST(REASON)
          33556463       gpu PostProc  m300602  R       0:01      1 mg102
          33556464       gpu PostProc  m300602  R       0:01      1 mg102


In [14]:
client = Client(cluster)
client

0,1
Connection method: Cluster object,Cluster type: SLURMCluster
Dashboard: http://10.50.32.33:36191/status,

0,1
Dashboard: http://10.50.32.33:36191/status,Workers: 0
Total threads:  0,Total memory:  0 B

0,1
Comm: tcp://10.50.32.33:35556,Workers: 0
Dashboard: http://10.50.32.33:36191/status,Total threads:  0
Started:  Just now,Total memory:  0 B


## Load data

In [15]:
%%time
ds_3d = xr.open_mfdataset(f'{path_data}{run}_P1D_201[5-6]*.nc', **mfdset_kwargs, chunks=dict(time=1, depth=1, depth_2=1))
ds_3d = ds_3d.sel(time=slice(*tave_int))

CPU times: user 1.06 s, sys: 168 ms, total: 1.23 s
Wall time: 29.2 s


In [16]:
ds_3d.time.data[0], ds_3d.time.data[-1], ds_3d.time.data.size

(numpy.datetime64('2015-01-02T00:00:00.000000000'),
 numpy.datetime64('2016-01-01T00:00:00.000000000'),
 365)

Correct time by shifting it by half the length of the averaging period. This facillitates finding the correct averaging boundaries. It is also necessary for averaging using the 'groupby' method.

In [18]:
ds_3d['time'] = ds_3d.time-np.timedelta64(12, 'h')

### Check which time steps are taken by groupby method later on

In [19]:
txt = ''
ds = ds_3d
mgroup = ds.groupby('time.month').groups
for mm in mgroup:
    txt += f'\n month: {mm}: size: {ds.time[mgroup[mm]].size}\n'
    for el in ds.time[mgroup[mm]].data:
        txt += str(el)+'\n'
print(txt)


 month: 1: size: 31
2015-01-01T00:00:00.000000000
2015-01-02T00:00:00.000000000
2015-01-03T00:00:00.000000000
2015-01-04T00:00:00.000000000
2015-01-05T00:00:00.000000000
2015-01-06T00:00:00.000000000
2015-01-07T00:00:00.000000000
2015-01-08T00:00:00.000000000
2015-01-09T00:00:00.000000000
2015-01-10T00:00:00.000000000
2015-01-11T00:00:00.000000000
2015-01-12T00:00:00.000000000
2015-01-13T00:00:00.000000000
2015-01-14T00:00:00.000000000
2015-01-15T00:00:00.000000000
2015-01-16T00:00:00.000000000
2015-01-17T00:00:00.000000000
2015-01-18T00:00:00.000000000
2015-01-19T00:00:00.000000000
2015-01-20T00:00:00.000000000
2015-01-21T00:00:00.000000000
2015-01-22T00:00:00.000000000
2015-01-23T00:00:00.000000000
2015-01-24T00:00:00.000000000
2015-01-25T00:00:00.000000000
2015-01-26T00:00:00.000000000
2015-01-27T00:00:00.000000000
2015-01-28T00:00:00.000000000
2015-01-29T00:00:00.000000000
2015-01-30T00:00:00.000000000
2015-01-31T00:00:00.000000000

 month: 2: size: 28
2015-02-01T00:00:00.00000000

## Time averaging

In the following, different averaging methods are considered where the data set is splitted in different ways before averaging. Depending on the size of the data one or the other method might be beneficial.

### Average month by month and split variables

In [20]:
%%time 
# 
# fpatho = '/scratch/m/m300602/tmp/test_pp_dymwin_time_ave-Copy1.nc'
# tslice = slice(0,2)   # for trials
# dslice = slice(0,20)  # for trials
tslice = slice(None, None)
dslice = slice(None,None)
# vars_ave = ['kin']
vars_ave = ['to', 'kin', 'vort', 'zos']

print('select vars')
# ds = xr.Dataset()
ds = ds_3d[vars_ave]
print('sel')
ds = ds.isel(time=tslice, depth=dslice)

print('groupby')
mgroup = ds.groupby('time.month').groups

print('Time ave')
for mm in mgroup:
    for var in vars_ave:
        print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
        ds_tave = ds[var].isel(time=mgroup[mm]).mean(dim='time')
        fpath = f'{path_scratch}/examp_time_averaging_with_dask_{var}_{mm:02d}.nc'
        print(f'Saving file {fpath}')
        ds_tave.to_netcdf(fpath)

print('All done!')

select vars
sel
groupby
Time ave
2021-12-01 10:24:15
Saving file /scratch/m/m300602/dask_tmp/exp.ocean_ncep6h_r2b8_hel20160-NCP//examp_time_averaging_with_dask_to_01.nc
2021-12-01 10:24:19
Saving file /scratch/m/m300602/dask_tmp/exp.ocean_ncep6h_r2b8_hel20160-NCP//examp_time_averaging_with_dask_kin_01.nc
2021-12-01 10:24:22
Saving file /scratch/m/m300602/dask_tmp/exp.ocean_ncep6h_r2b8_hel20160-NCP//examp_time_averaging_with_dask_vort_01.nc
2021-12-01 10:24:25
Saving file /scratch/m/m300602/dask_tmp/exp.ocean_ncep6h_r2b8_hel20160-NCP//examp_time_averaging_with_dask_zos_01.nc
2021-12-01 10:24:27
Saving file /scratch/m/m300602/dask_tmp/exp.ocean_ncep6h_r2b8_hel20160-NCP//examp_time_averaging_with_dask_to_02.nc
2021-12-01 10:24:31
Saving file /scratch/m/m300602/dask_tmp/exp.ocean_ncep6h_r2b8_hel20160-NCP//examp_time_averaging_with_dask_kin_02.nc
2021-12-01 10:24:35
Saving file /scratch/m/m300602/dask_tmp/exp.ocean_ncep6h_r2b8_hel20160-NCP//examp_time_averaging_with_dask_vort_02.nc
2021-12-

### Average one whole year but split by variable

In [21]:
%%time
vars_ave = ['to', 'kin', 'vort']
# vars_ave = ['kin']
for var in vars_ave:
    print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
    ds_tave = ds_3d[var].mean(dim='time')
    fpath = f'{path_scratch}/examp_time_averaging_with_dask_{var}.nc'
    print(f'Saving file {fpath}')
    ds_tave.to_netcdf(fpath)

2021-12-01 10:26:43
Saving file /scratch/m/m300602/dask_tmp/exp.ocean_ncep6h_r2b8_hel20160-NCP//examp_time_averaging_with_dask_to.nc
2021-12-01 10:27:00
Saving file /scratch/m/m300602/dask_tmp/exp.ocean_ncep6h_r2b8_hel20160-NCP//examp_time_averaging_with_dask_kin.nc
2021-12-01 10:27:17
Saving file /scratch/m/m300602/dask_tmp/exp.ocean_ncep6h_r2b8_hel20160-NCP//examp_time_averaging_with_dask_vort.nc
CPU times: user 25.7 s, sys: 1.82 s, total: 27.5 s
Wall time: 42.8 s


### Average one whole year for all variables

In [22]:
%%time
print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
ds_tave = ds_3d.mean(dim='time')
fpath = f'{path_scratch}/examp_time_averaging_with_dask.nc'
print(f'Saving file {fpath}')
ds_tave.to_netcdf(fpath)

2021-12-01 10:27:31
Saving file /scratch/m/m300602/dask_tmp/exp.ocean_ncep6h_r2b8_hel20160-NCP//examp_time_averaging_with_dask.nc
CPU times: user 25.2 s, sys: 1.73 s, total: 26.9 s
Wall time: 40.1 s
