In [1]:
from getpass import getuser # Libaray to copy things
from pathlib import Path # Object oriented libary to deal with paths
import os
from tempfile import NamedTemporaryFile, TemporaryDirectory # Creating temporary Files/Dirs
from subprocess import run, PIPE
import sys
 
import dask # Distributed data libary
from dask_jobqueue import SLURMCluster # Setting up distributed memories via slurm
from distributed import Client, progress, wait # Libaray to orchestrate distributed resources

import xarray as xr # Libary to work with labeled n-dimensional data and dask
import numpy as np
import skimage.util as sutil
import matplotlib.pyplot as plt

# sys.path.insert(0, os.path.abspath('/home/mpim/m300414/phd/'))
from org_metrics import Pairs, gen_regionprops_objects_all, gen_shapely_objects_all, gen_tuplelist, radar_organisation_metric

In [2]:
import warnings
warnings.filterwarnings(action='ignore')

In [3]:
# Set some user specific variables
scratch_dir = Path('/scratch') / getuser()[0] / getuser() # Define the users scratch dir

# Create a temp directory where the output of distributed cluster will be written to, after this notebook
# is closed the temp directory will be closed
dask_tmp_dir = TemporaryDirectory(dir=scratch_dir, prefix='rome_')
cluster = SLURMCluster(memory='500GiB',
                       cores=72,
                       project='mh0731',
                       walltime='00:25:00',
                       queue='gpu',
                       name='rome',
                       scheduler_options={'dashboard_address': ':12435'},
                       local_directory='/home/mpim/m300414/phd/Notebooks/',
                       job_extra=[f'-J rome', 
                                  f'-D /home/mpim/m300414/phd/Notebooks/',
                                  f'--begin=now',
                                  f'--output={dask_tmp_dir.name}/LOG_cluster.%j.o',
                                  f'--output={dask_tmp_dir.name}/LOG_cluster.%j.o'
                                 ],
                       interface='ib0')

cluster.scale(jobs=1) # requests whole nodes
dask_client = Client(cluster)
dask_client.wait_for_workers(9) # gpu-partition has 9 workers per node

In [4]:
data_path = Path('/work/mh0731/m300414/DyWinter_b10/Fake_Steiner/')
glob_pattern_2d = 'bool_*[0-9]_14mmhour.nc'
 
# Collect all file names with pathlib's rglob and list compressions 
# dont take first ten days, they are spin-up
file_names = sorted([str(f) for f in data_path.rglob(f'{glob_pattern_2d}')])[10:] #[1:]
file_names

['/work/mh0731/m300414/DyWinter_b10/Fake_Steiner/bool_20200210T0000_14mmhour.nc',
 '/work/mh0731/m300414/DyWinter_b10/Fake_Steiner/bool_20200211T0000_14mmhour.nc',
 '/work/mh0731/m300414/DyWinter_b10/Fake_Steiner/bool_20200212T0000_14mmhour.nc',
 '/work/mh0731/m300414/DyWinter_b10/Fake_Steiner/bool_20200213T0000_14mmhour.nc',
 '/work/mh0731/m300414/DyWinter_b10/Fake_Steiner/bool_20200214T0000_14mmhour.nc',
 '/work/mh0731/m300414/DyWinter_b10/Fake_Steiner/bool_20200215T0000_14mmhour.nc',
 '/work/mh0731/m300414/DyWinter_b10/Fake_Steiner/bool_20200216T0000_14mmhour.nc',
 '/work/mh0731/m300414/DyWinter_b10/Fake_Steiner/bool_20200217T0000_14mmhour.nc',
 '/work/mh0731/m300414/DyWinter_b10/Fake_Steiner/bool_20200218T0000_14mmhour.nc',
 '/work/mh0731/m300414/DyWinter_b10/Fake_Steiner/bool_20200219T0000_14mmhour.nc',
 '/work/mh0731/m300414/DyWinter_b10/Fake_Steiner/bool_20200220T0000_14mmhour.nc',
 '/work/mh0731/m300414/DyWinter_b10/Fake_Steiner/bool_20200221T0000_14mmhour.nc',
 '/work/mh0731/m

In [5]:
# fakesteiner = xr.open_dataarray(file_names[-1])

In [6]:
# classifier = fakesteiner.isel(time=0)

In [7]:
# domain_size = (117, 117)
# assert domain_size[0]     == domain_size[1] # domain is quadratic
# assert domain_size[0] % 2 == 1              # number of pixels is not even
# stride_between_domains = domain_size[0] // 2 + 1
 
# radar_domains = sutil.view_as_windows(
#     np.array(classifier),
#     window_shape=domain_size,
#     step=stride_between_domains
# )
    
# # define the array to contain ROME-values
# rome_map_shape = radar_domains.shape[:2]
# mid_point = domain_size[0] // 2
# rome_latitude  = fakesteiner['lat'][mid_point :: stride_between_domains][:rome_map_shape[0]]
# rome_longitude = fakesteiner['lon'][mid_point :: stride_between_domains][:rome_map_shape[1]]
# rome_map = xr.DataArray(
#     np.zeros(shape=rome_map_shape),
#     coords={'lat': rome_latitude, 'lon': rome_longitude},
#     dims=('lat', 'lon')
# )

In [8]:
# domain = radar_domains[-1, 0, :, :]

In [9]:
#     objects_as_regionprop = list(gen_regionprops_objects_all([domain]))[0]
#     objects_as_shapely    = list(gen_shapely_objects_all    ([domain]))[0]
    
    
#     pairs_regionprop = Pairs(
#         pairlist=list(gen_tuplelist(objects_as_regionprop))
#     )
    
#     pairs_shapely    = Pairs(
#         pairlist=list(gen_tuplelist(objects_as_shapely))
#     )

In [10]:
# radar_organisation_metric(s_pairs=pairs_shapely, r_pairs=pairs_regionprop)

In [53]:
@dask.delayed
def rome_per_domain(domain):
    
    # the generators expect time-dimension to loop over. We don't have it, so put list around domain.
    # the generators return a list each time, so only take [0]-element to avoid single list in list.
    objects_as_regionprop = list(gen_regionprops_objects_all([domain]))[0]
    objects_as_shapely    = list(gen_shapely_objects_all    ([domain]))[0]
    
    
    pairs_regionprop = Pairs(
        pairlist=list(gen_tuplelist(objects_as_regionprop))
    )
    
    pairs_shapely    = Pairs(
        pairlist=list(gen_tuplelist(objects_as_shapely))
    )

    return radar_organisation_metric(s_pairs=pairs_shapely, r_pairs=pairs_regionprop)

In [54]:
@dask.delayed
def rome(classifier):
    

    domain_size = (117, 117)
    assert domain_size[0]     == domain_size[1] # domain is quadratic
    assert domain_size[0] % 2 == 1              # number of pixels is not even
    stride_between_domains = domain_size[0] // 2 + 1
    
    radar_domains = sutil.view_as_windows(
        np.array(classifier),
        window_shape=domain_size,
        step=stride_between_domains
    )
    
    # define the array to contain ROME-values
    rome_map_shape = radar_domains.shape[:2]
    mid_point = domain_size[0] // 2
    rome_latitude  = classifier['lat'][mid_point :: stride_between_domains][:rome_map_shape[0]]
    rome_longitude = classifier['lon'][mid_point :: stride_between_domains][:rome_map_shape[1]]
    rome_map = xr.DataArray(
        np.zeros(shape=rome_map_shape),
        coords={'lat': rome_latitude, 'lon': rome_longitude},
        dims=('lat', 'lon')
    )
    
    # third parallelisation on multiple domains in large array
    for i in range(rome_map_shape[0]):
        for j in range(rome_map_shape[1]):
            rome_map[i, j] = rome_per_domain( radar_domains[i, j, :, :] )
             
    return rome_map

In [55]:
# @dask.delayed
def apply_rome(in_file):
    
    fakesteiner = xr.open_dataarray(in_file)
    
    # second parallelisation on time level
    rome_map_singletime = []
    for t in fakesteiner.time:
        rome_map_singletime.append( rome(fakesteiner.sel(time=str(t.values)) ))
    
    return rome_map_singletime 

In [56]:
# First parallelisation is on file level
rome_futures = []
for file in [file_names[-1]]:
    rome_futures.append( apply_rome(file) )

In [57]:
run_jobs = dask.persist(rome_futures)
progress(run_jobs, notebook=False)

[                                        ] | 0% Completed |  0.3s

In [58]:
dask.compute(*rome_futures)

ValueError: setting an array element with a sequence.

In [None]:
print('done')#rome_futures[0].plot()

In [None]:
list(dask.compute(rome_futures))