In [1]:
import xarray as xr
from dict_nawdexsims import simdictionary
import glob, sys, time
import numpy as np
import matplotlib.pyplot as plt

### Set up da dask

In [1]:
# Trying out this dask thing from Aiko's GitHub repository :p
from tempfile import NamedTemporaryFile, TemporaryDirectory # Creating temporary Files/Dirs
import dask # Distributed data libary
from dask_jobqueue import SLURMCluster # Setting up distributed memories via slurm
from distributed import Client, progress, wait # Libaray to orchestrate distributed resources

In [2]:
# Set some user specific variables
account_name = 'bb1018'
partition = 'compute'
job_name = 'cloud3d' # Job name that is submitted via sbatch
memory = '64GiB' # Max memory per node that is going to be used - this depends on the partition
cores = 48 # Max number of cores per that are reserved - also partition dependent
walltime = '01:00:00' #'12:00:00' # Walltime - also partition dependent

In [3]:
scratch_dir = '/scratch/b/b380873/' # Define the users scratch dir
# Create a temp directory where the output of distributed cluster will be written to, after this notebook
# is closed the temp directory will be closed
dask_scratch_dir = TemporaryDirectory(dir=scratch_dir, prefix=job_name)
cluster = SLURMCluster(memory=memory,
                       cores=cores,
                       project=account_name,
                       walltime=walltime,
                       queue=partition,
                       name=job_name,
                       processes=8,
                       scheduler_options={'dashboard_address': ':12435'},
                       local_directory=dask_scratch_dir.name,
                       job_extra=[f'-J {job_name}', 
                                  f'-D {dask_scratch_dir.name}',
                                  f'--begin=now',
                                  f'--output={dask_scratch_dir.name}/LOG_cluster.%j.o',
                                  f'--output={dask_scratch_dir.name}/LOG_cluster.%j.o'
                                 ],
                       interface='ib0')

In [4]:
print(cluster.job_script())

#!/usr/bin/env bash

#SBATCH -J dask-worker
#SBATCH -p compute
#SBATCH -A bb1018
#SBATCH -n 1
#SBATCH --cpus-per-task=48
#SBATCH --mem=64G
#SBATCH -t 01:00:00
#SBATCH -J cloud3d
#SBATCH -D /scratch/b/b380873/cloud3d7mcu9u8d
#SBATCH --begin=now
#SBATCH --output=/scratch/b/b380873/cloud3d7mcu9u8d/LOG_cluster.%j.o
#SBATCH --output=/scratch/b/b380873/cloud3d7mcu9u8d/LOG_cluster.%j.o

JOB_ID=${SLURM_JOB_ID%;*}

/pf/b/b380459/conda-envs/Nawdex-Hackathon/bin/python3 -m distributed.cli.dask_worker tcp://10.50.40.118:34779 --nthreads 6 --nprocs 8 --memory-limit 8.59GB --name name --nanny --death-timeout 60 --local-directory /scratch/b/b380873/cloud3d7mcu9u8d --interface ib0



In [5]:
cluster.scale(jobs=1)
cluster

VBox(children=(HTML(value='<h2>cloud3d</h2>'), HBox(children=(HTML(value='\n<div>\n  <style scoped>\n    .data…

In [6]:
dask_client = Client(cluster)
dask_client

0,1
Client  Scheduler: tcp://10.50.40.118:34779  Dashboard: http://10.50.40.118:8787/status,Cluster  Workers: 0  Cores: 0  Memory: 0 B


In [7]:
# remove first day from dataset
def drop_first_day(ds):
    ntime = ds.time.size                   # number of time steps
    firstday = ds.isel(time=0).time.dt.day # first day 
    t_list = []                            # list of timesteps that do not belong to first day
    for i in range(ntime):
        if ds.isel(time=i).time.dt.day != firstday:
            t_list.append(i)
    return ds.isel(time=t_list)

In [8]:
# Iterate through all simulations
basedir = '/work/bb1018/b380459/NAWDEX/ICON_OUTPUT_NWP/'
simdict = simdictionary()

# Directories to be used below
classdir = '/pf/b/b380796/scratch/hackathon/george/'
openoceandir = '/work/bb1018/nawdex-hackathon_pp/openoceanmask/'
cloud3ddir = '/work/bb1018/b380459/NAWDEX/ICON_OUTPUT_NWP/'
griddir = '/work/bb1018/icon_4_hackathon/grids/'
griddict = {'80km':'R80000m','40km':'R40000m','20km':'R20000m','10km':'R10000m','5km':'R5000m','2km':'R2500m'}

for s in list(simdict.keys())[:1]:
    print(s)
    
    # Generate the file in which we'll save everything for this simulation.
    ds = xr.Dataset({
        "qi": (("lev", "classes"), np.zeros((75,8))),
        "qc": (("lev", "classes"), np.zeros((75,8))),
        "qv": (("lev", "classes"), np.zeros((75,8)))
        },
        coords={"lev": np.arange(75),"classes": np.arange(1,9)},
    )
    
    # Load the cloud classifications for this simulation.
    classfi = classdir + 'nawdexnwp_' + simdict[s]['res'] + '_cloudclass_mis_' + s[-4:] + '.nc'
    
    # Take every second value from the classes as 3dcloud variables are only hourly.
    print('Read cloud classification file')
    classes = xr.open_dataset(classfi).drop({'lev','lev_2','lev_3','time'}).clch[::2]
    print('Dimensions of classifications: ' + str(classes.shape))
    
    # Load the open ocean mask for this simulation and extract open ocean points.
    print('Read ocean mask file')
    oceanmask = xr.open_dataset(openoceandir + 'nawdexnwp-' + simdict[s]['res'] + '-mis-' + 
                                s[-4:] + '_openoceanmask.nc').mask_openocean.values
    oo_idx = np.argwhere(oceanmask == 1)[:,0]
    del oceanmask
    
    # Load the cloud3d values for this simulation.
    print('Load cloud3d values')
    cloud3d = xr.open_mfdataset(cloud3ddir + s + '/' + '*3dcloud*.nc',combine='by_coords',
                               parallel=True, engine='h5netcdf', chunks={'time': 1})
    
    # Remove the first day from the values.
    cloud3d = drop_first_day(cloud3d)
    
    # Extract the three cloud variables you need at the open ocean points.
    qi = cloud3d.tot_qi_dia.isel(ncells=oo_idx)
    qc = cloud3d.tot_qc_dia.isel(ncells=oo_idx)
    qv = cloud3d.tot_qv_dia.isel(ncells=oo_idx)
    
    # Read in the grid file to area weight
    print('Read grid file')
    cell_area = xr.open_dataset(griddir + 'icon-grid_nawdex_78w40e23n80n_' + griddict[simdict[s]['res']] + '.nc')
    cell_area = cell_area['cell_area'].rename({'cell': 'ncells'})
    weights = cell_area / (cell_area).sum(dim=['ncells'])
    weights = weights.isel(ncells=oo_idx)
        
    # Extract where the fields equal a certain cloud class and take a temporal mean.
    print('Perform classification extraction and temporal mean')
    qi_a = []; qc_a = []; qv_a = []
    
    for i in np.arange(1,9):
        print(i)
        qi = qi.where(classes==i).mean('time').mean('ncells')
        qc = qc.where(classes==i).mean('time').mean('ncells')
        qv = qv.where(classes==i).mean('time').mean('ncells')
        
        # Area weighting
        #qi = (qi * weights).sum(dim='ncells')
        #qc = (qc * weights).sum(dim='ncells')
        #qv = (qv * weights).sum(dim='ncells')

        #plt.plot(qi)
        #qi_a.append(qi)
        #qc_a.append(qc)
        #qv_a.append(qv)
        ds = xr.Dataset( data_vars=dict( qi=("lev", qi), qc=("lev", qc),
            qv=("lev", qv)))
        ds.to_netcdf('q_mean-' + s + '_class' + str(i) + '.nc')
    
    #print('Update and save dataset')
    #print(qi_a[0].values)
    #print(qi_a[3].values)
    #ds['qi'] = xr.concat(qi_a,"classes")
    #ds['qc'] = xr.concat(qc_a,"classes")
    #ds['qv'] = xr.concat(qv_a,"classes")
    #ds.upate: (qii, 'qc': qcc, 'qv': qvv})
    #ds.to_netcdf('q_mean-' + s + '.nc')
    print('~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~')
        

NameError: name 'simdictionary' is not defined