In [None]:
%matplotlib notebook

# Iris/dask dataset loading investigation

## Introduction

This demos using dask functionality beyond the `array` module to help with Iris processing. Specifically, in this notebook we will demo alternative approaches for loading numerous and/or large datasets into Iris.

Three approaches will be compared:

* The standard Iris load
* ~~Wrapping Iris load calls in a **dask bag** generated from a sequence (this is slow so will not be investigated further)~~
* Wrapping Iris load calls in a **dask bag** generated from a **delayed** call

These options will be compared with two simple metrics:

- Ease of use
- Runtime

## Setup

Below are the functions used to load the dataset. There is one function for each of the standard Iris load and the bag generated from a sequence. The bag generated from a delayed call requires two functions; one which is delayed, one to call the delayed function.

### Imports

In [11]:
import os
import time

import dask
import dask.bag as db
import dask.delayed as delayed
import iris

### Dask processing options

Define options on how dask is to process computation of graphs. Choose one of these!

In [None]:
from multiprocessing.pool import ThreadPool

dask.set_options(pool=ThreadPool(8))

In [12]:
from distributed import Client

host_subnet = 55
s = '10.154.1.{}:8776'.format(host_subnet)

client = Client(s)

In [13]:
print dask.context._globals

defaultdict(<function <lambda> at 0x7ff3ea1ec8c0>, {'callbacks': set([]), 'shuffle': 'tasks', 'get': <bound method Client.get of <Client: scheduler="10.154.1.55:8776" processes=8 cores=8>>})


### Timer function

A simple function that records the runtime of a supplied function. This will be useful for capturing results; otherwise within this notebook we can just make use of the `%timeit` magic. 

In [14]:
def timer(func, *funcargs):
    t0 = time.time()
    func(*funcargs)
    t1 = time.time()
    return t1 - t0

### Graphs

Draw the graph of each of the distributed processing options.

In [None]:
%matplotlib inline

dlyd = delayed(iris.load)(os.path.join(fp, '*.pp'))
cs = db.from_delayed(dlyd)
cs.visualize()

In [None]:
%matplotlib inline

dlyds = [delayed(iris.load)(os.path.join(fp, pattern)) for pattern in seq]
cs = db.from_delayed(dlyds)
cs.visualize()

### Runner functions

In [15]:
@delayed
def loader(fn):
    return iris.load(os.path.join(fn))

def direct_load(fp, pattern):
    """Load datasets at the filepath `fp` using Iris."""
    iris.load(os.path.join(fp, pattern))

def delay_wrapper(fp, pattern):
    dlyd = delayed(iris.load)(os.path.join(fp, pattern))
    cs = db.from_delayed(dlyd)
    iris.cube.CubeList(cs.compute())

def delay_wrapper_v2(fp, seq):
    dlyds = [delayed(iris.load)(os.path.join(fp, pattern)) for pattern in seq]
    cs = db.from_delayed(dlyds)
    iris.cube.CubeList(cs.compute())

def delay_wrapper_v3(fp, seq):
    cs = db.from_delayed(map(loader, [os.path.join(fp, fn) for fn in seq]))
    iris.cube.CubeList(cs.compute())

## Test!

Run each loader on some sample data and print the output.

Using **sample PP data** at `/project/euro4_hindcast/WIND-ATLAS_EURO4-RERUN/2015/06/18Z`:

In [16]:
fp = '/project/euro4_hindcast/WIND-ATLAS_EURO4-RERUN/2015/06/18Z'
# fn = 'EURO4_2015060[1-3].pp'
fn = '*.pp'
seq = os.listdir(fp)
reps = 3

In [None]:
direct_vals = timer(direct_load, fp, fn)
print direct_vals

In [None]:
delay_vals = timer(delay_wrapper, fp, fn)
print delay_vals

In [17]:
delay_vals_v2 = timer(delay_wrapper_v2, fp, seq)
print delay_vals_v2

45.92821908


In [None]:
delay_vals_v3 = timer(delay_wrapper_v3, fp, seq)
print delay_vals_v3

In [None]:
fig = plt.figure(figsize=(9, 6))
plt.boxplot([direct_load_vals_pp, delay_vals_v2_pp],
            vert=True, labels=['direct', 'delay'])
plt.show()