## Distributed Load / Merge / Concatenate

In [1]:
import iris

In [2]:
import os

import iris
import iris.quickplot as qplt
import matplotlib.pyplot as plt
import matplotlib.cm as mpl_cm

import dask
import dask.bag as db
import distributed
from dask_kubernetes import KubeCluster

import numpy as np

import cloudpickle

%matplotlib inline
%config InlineBackend.figure_format = 'retina'  # Optional for retina displays



In [3]:
cluster = KubeCluster()
cluster

In [9]:
cluster.scheduler_address

'tcp://100.96.128.114:40249'

In [13]:
s4.merge??

[0;31mSignature:[0m [0ms4[0m[0;34m.[0m[0mmerge[0m[0;34m([0m[0munique[0m[0;34m=[0m[0;32mTrue[0m[0;34m)[0m[0;34m[0m[0m
[0;31mSource:[0m   
    [0;32mdef[0m [0mmerge[0m[0;34m([0m[0mself[0m[0;34m,[0m [0munique[0m[0;34m=[0m[0;32mTrue[0m[0;34m)[0m[0;34m:[0m[0;34m[0m
[0;34m[0m        [0;34m"""[0m
[0;34m        Returns the :class:`CubeList` resulting from merging this[0m
[0;34m        :class:`CubeList`.[0m
[0;34m[0m
[0;34m        Kwargs:[0m
[0;34m[0m
[0;34m        * unique:[0m
[0;34m            If True, raises `iris.exceptions.DuplicateDataError` if[0m
[0;34m            duplicate cubes are detected.[0m
[0;34m[0m
[0;34m        This combines cubes with different values of an auxiliary scalar[0m
[0;34m        coordinate, by constructing a new dimension.[0m
[0;34m[0m
[0;34m        .. testsetup::[0m
[0;34m[0m
[0;34m            import iris[0m
[0;34m            c1 = iris.cube.Cube([0,1,2], long_name='some_parameter')[0m


In [4]:
client = distributed.Client(cluster.scheduler_address)
client

0,1
Client  Scheduler: tcp://100.96.128.114:40249  Dashboard: http://100.96.128.114:8787/status,Cluster  Workers: 4  Cores: 4  Memory: 12.00 GB


## Standard Load

In [5]:
samples = [
    '/s3/mogreps-g/prods_op_mogreps-g_20160101_00_00_003.nc',
    '/s3/mogreps-g/prods_op_mogreps-g_20160101_00_00_006.nc',
    '/s3/mogreps-g/prods_op_mogreps-g_20160101_00_00_009.nc',
    '/s3/mogreps-g/prods_op_mogreps-g_20160101_00_00_012.nc',
    '/s3/mogreps-g/prods_op_mogreps-g_20160101_00_00_015.nc',
    '/s3/mogreps-g/prods_op_mogreps-g_20160101_00_00_018.nc']

The S3 backed file system caches data locally, so be careful

In [7]:
%%time
_ = iris.load(samples[0])

CPU times: user 1.8 s, sys: 100 ms, total: 1.9 s
Wall time: 4.38 s


In [29]:
%%time
s1 = iris.load_raw(samples[0])

CPU times: user 268 ms, sys: 0 ns, total: 268 ms
Wall time: 263 ms


In [18]:
%%time
# repeated in case of cache
s2 = iris.load(samples[0])

CPU times: user 1.88 s, sys: 108 ms, total: 1.98 s
Wall time: 2.25 s


For these two files there's no difference in the result of load or load_raw

In [19]:
print(len(s1), len(s2))
print(s1)
print(s2)

28 28
0: wet_bulb_freezing_level_altitude / (m) (latitude: 600; longitude: 800)
1: fog_area_fraction / (1)             (latitude: 600; longitude: 800)
2: medium_type_cloud_area_fraction / (1) (latitude: 600; longitude: 800)
3: relative_humidity / (%)             (time: 2; pressure: 16; latitude: 600; longitude: 800)
4: y_wind / (m s-1)                    (latitude: 601; longitude: 800)
5: air_pressure_at_sea_level / (Pa)    (time: 2; latitude: 600; longitude: 800)
6: air_temperature / (K)               (latitude: 600; longitude: 800)
7: high_type_cloud_area_fraction / (1) (latitude: 600; longitude: 800)
8: wind_speed_of_gust / (m s-1)        (latitude: 600; longitude: 800)
9: visibility_in_air / (m)             (latitude: 600; longitude: 800)
10: y_wind / (m s-1)                    (time: 2; pressure: 16; latitude: 601; longitude: 800)
11: geopotential_height / (m)           (time: 2; pressure: 9; latitude: 600; longitude: 800)
12: y_wind / (m s-1)                    (time: 2; latitude

In [8]:
%%time
_ = iris.load(samples)

distributed.utils - ERROR - ('reshape-4f64cc2a657a1954d10939b8b6610f1b', 0, 0)
Traceback (most recent call last):
  File "/opt/conda/lib/python3.6/site-packages/distributed/client.py", line 1354, in _gather
    st = self.futures[key]
KeyError: "('reshape-4f64cc2a657a1954d10939b8b6610f1b', 0, 0)"

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/lib/python3.6/site-packages/distributed/utils.py", line 238, in f
    result[0] = yield make_coro()
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/opt/conda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/opt/conda/lib/python3.6/site-packages/distributed/client.py", line 

KeyboardInterrupt: 

In [10]:
%%time
# repeated for cache
s3 = iris.load(samples)

CPU times: user 10.7 s, sys: 940 ms, total: 11.6 s
Wall time: 29.3 s


In [7]:
%%time
s4 = iris.load_raw(samples)

CPU times: user 1.67 s, sys: 20 ms, total: 1.69 s
Wall time: 1.68 s


When loading multiple files we see that the benefit of .load() is that it merges the files

In [11]:
print(len(s3))
print(len(s4))

56
168


In [8]:
%%time
_ = s4.merge()

distributed.utils - ERROR - ('reshape-cffdd2215f8091a104f20873e31963cf', 0)
Traceback (most recent call last):
  File "/opt/conda/lib/python3.6/site-packages/distributed/client.py", line 1354, in _gather
    st = self.futures[key]
KeyError: "('reshape-cffdd2215f8091a104f20873e31963cf', 0)"

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/lib/python3.6/site-packages/distributed/utils.py", line 238, in f
    result[0] = yield make_coro()
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/opt/conda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/opt/conda/lib/python3.6/site-packages/distributed/client.py", line 1360, 

KeyboardInterrupt: 

The input cubes are modified in place, as previously lazy coordinates are realised. Once the coordinates are realised future merges are much faster.

In [13]:
%%time
print(len(s4))
_ = s4.merge()

168
CPU times: user 276 ms, sys: 12 ms, total: 288 ms
Wall time: 251 ms


If .load() is the usual way to load cubes, then are users getting any benefit from co-ordinates defaulting to lazy? The current behaviour causes a 20x slowdown, and the end result is realised co-ordinates anyway.

## Distributed Load

The above problem is exacerbated by running serially. One approach would be to use a map / reduce approach, which woudl allow us to make use of dask bags.

In [None]:
%%time
s6 = iris.cube.CubeList(mapcat(iris.load, samples)).merge()
print(len(s6))

In [None]:
print(x)
x2 = iris.cube.CubeList(x.compute()).merge()
len(x2)

In [17]:
!lscpu

Architecture:          x86_64
CPU op-mode(s):        32-bit, 64-bit
Byte Order:            Little Endian
CPU(s):                8
On-line CPU(s) list:   0-7
Thread(s) per core:    2
Core(s) per socket:    4
Socket(s):             1
NUMA node(s):          1
Vendor ID:             GenuineIntel
CPU family:            6
Model:                 79
Model name:            Intel(R) Xeon(R) CPU E5-2686 v4 @ 2.30GHz
Stepping:              1
CPU MHz:               2300.044
BogoMIPS:              4600.08
Hypervisor vendor:     Xen
Virtualization type:   full
L1d cache:             32K
L1i cache:             32K
L2 cache:              256K
L3 cache:              46080K
NUMA node0 CPU(s):     0-7
Flags:                 fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush mmx fxsr sse sse2 ht syscall nx pdpe1gb rdtscp lm constant_tsc rep_good nopl xtopology eagerfpu pni pclmulqdq ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe popcnt tsc_deadline_timer aes xsave avx f16c rdra