## Efficient calculations with Dask

Inside cubes, Iris already supports lazy data.
operations in various statistical functions and cube arithmetic.


In [None]:
# basic winds code (NOT FUNCTIONAL)
mean direction, mean + std-dev of windspeedwind_speed = (u*u + v*v) ** 0.5
time_mean_windspd = wind_speed.collapsed('time', iris.analysis.MEAN)
time_stdev_windspd = wind_speed.collapsed('time', iris.analysis.STD_DEV)
wind_dirs = u.copy()
wind_dirs.rename('wind_direction')
wind_dirs.data = da.atan2(u.lazy_data(), v.lazy_data())
wind_dirs.units = 'radians'
time_mean_wind_dirs = wind_dirs.collapsed('time', iris.analysis.MEAN)

In [None]:
# basic winds code (NOT FUNCTIONAL)

In [14]:
import iris
from iris import sample_data_path
import glob

'/home/h05/itpp/git/iris-test-data/test_data/'

In [89]:
filepaths = glob.glob(sample_data_path('UM', '*.pp'))
filepath = filepaths[0]
print filepath

/home/h05/itpp/git/iris-sample-data/iris_sample_data/sample_data/UM/northward_sea_ice_velocity.1890.01.01.00.00.pp


In [27]:
cube = iris.load_cube(filepath)
print cube

northward_sea_ice_velocity / (m s-1) (latitude: 215; longitude: 360)
     Dimension coordinates:
          latitude                            x               -
          longitude                           -               x
     Scalar coordinates:
          forecast_period: 262440.0 hours, bound=(262080.0, 262800.0) hours
          forecast_reference_time: 1859-09-01 00:00:00
          time: 1890-01-16 00:00:00, bound=(1890-01-01 00:00:00, 1890-02-01 00:00:00)
     Attributes:
          STASH: m02s00i149
          source: Data from Met Office Unified Model
          um_version: 6.6
     Cell methods:
          mean: time (1 hour)


In [28]:
print cube.has_lazy_data()

True


In [29]:
print type(cube.core_data())

<class 'dask.array.core.Array'>


In [50]:
# Define an wrapper to show us when the actual data is fetched.
import dask
import dask.array as da

class Arraylike_AccessFlagged(object):
    def __init__(self, array, name='A'):
        self.dtype = array.dtype
        self.shape = array.shape
        self._array = array
        self._name = name

    def __getitem__(self, keys):
        print "\nArray fetch {}[{}]".format(self._name, keys)
        return self._array[keys]

def showaccess_lazy(data, chunks=None):
    if chunks is None:
        chunks = data.shape
    return da.from_array(Arraylike_AccessFlagged(data), chunks)

In [51]:
import numpy as np
real_test_data = np.arange(8.).reshape((2, 4))
test_lazy_data = showaccess_lazy(real_test_data, chunks=(1, 4))
print test_lazy_data

dask.array<array, shape=(2, 4), dtype=float64, chunksize=(1, 4)>


In [52]:
# This just shows us when data fetches occur.
test_lazy_data.compute()


Array fetch A[(slice(0, 1, None), slice(0, 4, None))]

Array fetch A[(slice(1, 2, None), slice(0, 4, None))]


array([[ 0.,  1.,  2.,  3.],
       [ 4.,  5.,  6.,  7.]])

In [61]:
# Now make a cube, derive a statistical calculation, and show what happens when we compute results.
test_cube = cube.copy(data=showaccess_lazy(cube.data, chunks=(1, 360)))


In [62]:
test_cube

<iris 'Cube' of northward_sea_ice_velocity / (m s-1) (latitude: 215; longitude: 360)>

In [63]:
print test_cube

northward_sea_ice_velocity / (m s-1) (latitude: 215; longitude: 360)
     Dimension coordinates:
          latitude                            x               -
          longitude                           -               x
     Scalar coordinates:
          forecast_period: 262440.0 hours, bound=(262080.0, 262800.0) hours
          forecast_reference_time: 1859-09-01 00:00:00
          time: 1890-01-16 00:00:00, bound=(1890-01-01 00:00:00, 1890-02-01 00:00:00)
     Attributes:
          STASH: m02s00i149
          source: Data from Met Office Unified Model
          um_version: 6.6
     Cell methods:
          mean: time (1 hour)


In [64]:
test_cube.copy().data[:2, :4]


Array fetch A[(slice(0, 1, None), slice(0, 360, None))]
Array fetch A[(slice(1, 2, None), slice(0, 360, None))]


Array fetch A[(slice(2, 3, None), slice(0, 360, None))]
Array fetch A[(slice(3, 4, None), slice(0, 360, None))]


Array fetch A[(slice(4, 5, None), slice(0, 360, None))]
Array fetch A[(slice(5, 6, None), slice(0, 360, None))]
Array fetch A[(slice(6, 7, None), slice(0, 360, None))]

 

Array fetch A[(slice(7, 8, None), slice(0, 360, None))]
 
Array fetch A[(slice(8, 9, None), slice(0, 360, None))]

Array fetch A[(slice(9, 10, None), slice(0, 360, None))]

Array fetch A[(slice(10, 11, None), slice(0, 360, None))]
Array fetch A[(slice(11, 12, None), slice(0, 360, None))]


Array fetch A[(slice(12, 13, None), slice(0, 360, None))]

Array fetch A[(slice(13, 14, None), slice(0, 360, None))]
Array fetch A[(slice(14, 15, None), slice(0, 360, None))]

 
Array fetch A[(slice(15, 16, None), slice(0, 360, None))]
 
Array fetch A[(slice(16, 17, None), slice(0, 360, None))]
 
Array fetc

array([[ 0.,  0.,  0.,  0.],
       [ 0.,  0.,  0.,  0.]], dtype=float32)

In [74]:
test_zonal_average = test_cube.collapsed(['longitude'], iris.analysis.MEAN)

In [75]:
test_zonal_average[:3].copy().data


Array fetch A[(slice(0, 1, None), slice(0, 360, None))]

Array fetch A[(slice(1, 2, None), slice(0, 360, None))]

Array fetch A[(slice(2, 3, None), slice(0, 360, None))]


array([ 0.,  0.,  0.], dtype=float32)

In [76]:
test_zonal_stdev = test_cube.collapsed(['longitude'], iris.analysis.STD_DEV)

In [77]:
test_zonal_stdev[:2].copy().data


Array fetch A[(slice(0, 1, None), slice(0, 360, None))]
 
Array fetch A[(slice(1, 2, None), slice(0, 360, None))]


array([ 0.,  0.], dtype=float32)

In [78]:
test_cube.has_lazy_data()

True

In [90]:
test_cube = iris.load_cube(filepaths)
test_zonal_average = test_cube.collapsed(['longitude'], iris.analysis.MEAN)
test_zonal_stdev = test_cube.collapsed(['longitude'], iris.analysis.STD_DEV)

In [91]:
%%timeit
av = test_zonal_average.copy().data

1 loop, best of 3: 375 ms per loop


In [92]:
%%timeit
stdev = test_zonal_stdev.copy().data

1 loop, best of 3: 445 ms per loop


In [93]:
%%timeit
av, stdev = da.compute(test_zonal_average.lazy_data(), test_zonal_stdev.lazy_data())

1 loop, best of 3: 601 ms per loop


In [133]:
# BECAUSE
test_cube = iris.load_cube(filepaths)
test_cube = test_cube.copy(data=showaccess_lazy(test_cube.lazy_data(), chunks=test_cube.shape))
test_zonal_average = test_cube.collapsed(['longitude'], iris.analysis.MEAN)
test_zonal_stdev = test_cube.collapsed(['longitude'], iris.analysis.STD_DEV)

In [134]:
%%timeit -n 1 -r 1
av = test_zonal_average.copy().data
print av[100, 20:23]


Array fetch A[(slice(0, 120, None), slice(0, 215, None), slice(0, 360, None))]
[ 0.01491087  0.01489484  0.01378156]
1 loop, best of 1: 307 ms per loop


In [135]:
%%timeit -n 1 -r 1
stdev = test_zonal_stdev.copy().data
print stdev[100, 20:23]


Array fetch A[(slice(0, 120, None), slice(0, 215, None), slice(0, 360, None))]
[ 0.03805856  0.04456573  0.04579239]
1 loop, best of 1: 449 ms per loop


In [136]:
%%timeit -n 1 -r 1
av, stdev = da.compute(test_zonal_average.lazy_data(), test_zonal_stdev.lazy_data())
print av[100, 20:23]
print stdev[100, 20:23]


Array fetch A[(slice(0, 120, None), slice(0, 215, None), slice(0, 360, None))]
[ 0.01491087  0.01489484  0.01378156]
[ 0.03805856  0.04456573  0.04579239]
1 loop, best of 1: 468 ms per loop



Array fetch A[(slice(0, 120, None), slice(0, 215, None), slice(0, 360, None))]
[0 0 0 0]
[12 13 14 15]


array([ 0.01491087,  0.01489484,  0.01378156], dtype=float32)