In [70]:
def test(one, two):
    print(hex(id(one)))
    print(hex(id(two)))
    return (one, two)

args = ('one', 'two', )
print(hex(id(args[0])))
print(hex(id(args[1])))

_dsk = {
    ('out', 'out2'): (test, ) + args
}

_test = dask.threaded.get(_dsk, ('out', 'out2'))
print(hex(id(_test[0][0])))
print(hex(id(_test[0][1])))

0x7f749e92bfd0
0x7f749e959b48
0x7f749e92bfd0
0x7f749e959b48
0x7f750aa71d78
0x7f750aa8b788


In [194]:
import os

import dask
import numpy as np
import pandas as pd
import patsy
import six
import tables
from toolz import curry
import xarray as xr

from yatsm.config import validate_and_parse_configfile
from yatsm.io._api import get_reader, read_and_preprocess

os.environ['ROOT'] = '/home/ceholden/Documents/landsat_stack/p008r056_subset/images/'
config = validate_and_parse_configfile('/home/ceholden/Documents/yatsm/examples/topic_xarray.yaml')

readers = dict({name: get_reader(**cfg['reader']) for name, cfg in six.iteritems(config['data']['datasets'])})

if not os.path.exists('cache.nc'):
    dat = read_and_preprocess(config['data']['datasets'], readers, window=((0, 100), (0, 100)))
#     arr.to_netcdf('cache.nc')
else:
    print('Loading from cache')
    dat = xr.open_dataset('cache.nc').load()

bands = ['blue', 'green', 'red', 'nir', 'swir1', 'swir2', 'ndvi']

## Tasks

In [204]:
from time import sleep

@curry
def slowadd(x, y):
    """ Slow version of ``add`` to simulate work """
    sleep(1)
    print(x, y)
    return x + y

def slowsum(L):
    """ Slow version of ``sum`` to simulate work """
    sleep(0.5)
    return sum(L)

slowaddfive = slowadd(y=5)

data = [1, 2, 3]
A = [dask.delayed(slowadd)(a, 10) for a in data]
B = [dask.delayed(slowaddfive)(a) for a in A]

dask.delayed(slowsum)(B).compute()

(2, 10)
(1, 10)
(3, 10)
(12, 5)
(11, 5)
(13, 5)


51

In [470]:
TASKS = {
    'norm_diff': norm_diff,
    'CCDCesque': _CCDCesque,
}
exc_config = {
    'norm_diff': {
        'requires': {
            'data': ['nir', 'red'],
        },
        'provides': {
            'data': 'ndvi'
        }
    },
    'CCDCesque': {
        'requires': {
            'data': bands,
        },
        'provides': {
            'record': 'ccdc'
        },
        'config': {
            'fit': {
                'design': '1 + ordinal + np.cos(ordinal) + np.sin(ordinal)'
            }
        }
    }
}
exc = ['norm_diff', 'CCDCesque']


def pipe(exc, cfg):
    _pipe = []
    for _exc in exc:
        # Function
        f = TASKS[_exc]
        
        f_curried = curry(f)(
            provides=cfg[_exc]['provides'],
            requires=cfg[_exc]['requires'],
            config=cfg[_exc].get('config', {})
        )
                
        _pipe.append(f_curried)
    return _pipe

# Dask

In [469]:
def norm_diff(wrk, provides, requires, **config):
    one, two = requires['data']
    name = provides['data']
    
    arr = wrk['data']
    arr[name] = ((arr[one] - arr[two]) / (arr[one] + arr[two]))
    
    return wrk


# def _CCDCesque(data, cfg, design='1 + ordinal'):
def _CCDCesque(wrk, provides, requires, **config):
    bands = requires['data']
    arr = wrk['data'][bands].dropna('time', how='any')
    
    model = CCDCesque(**config.get('init', {}))
    model.py, model.px = arr.y, arr.x
    
    ordinal = arr.indexes['time'].map(lambda x: x.toordinal())
    design = config.get('fit', {}).get('design', '1 + ordinal')
    X = patsy.dmatrix(design, data=arr, eval_env=patsy.EvalEnvironment.capture())
    
    wrk['record'][provides['record']] = model.fit(X, arr.to_array('band'), ordinal)
    return wrk

In [548]:
def process(y, x):
    print('Working on: {}/{}'.format(y.values, x.values))
    dsk = {
        'wrk': {'record': {}, 'data': dat.sel(x=x, y=y)}
    }

    for name, task, dep in zip(exc, pipe(exc, exc_config), ['wrk'] + exc):
        dsk[name] = (task, dep)
    return dask.get(dsk, exc[-1])

out = [dask.delayed(process)(y, x) for x in dat.x for y in dat.y]

In [552]:
answer = dask.compute(*out, get=dask.async.get_sync)

Working on: 531285.0/704865.0
Working on: 530025.0/704445.0
Working on: 532215.0/704745.0
Working on: 531705.0/703875.0
Working on: 529695.0/704925.0
Working on: 530835.0/705255.0
Working on: 532335.0/706635.0
Working on: 531885.0/704385.0
Working on: 532125.0/704505.0
Working on: 531735.0/706605.0
Working on: 531195.0/706635.0
Working on: 531225.0/706305.0
Working on: 531735.0/704265.0
Working on: 532635.0/706515.0
Working on: 530505.0/706455.0
Working on: 531495.0/704865.0
Working on: 531345.0/706155.0
Working on: 530265.0/704505.0
Working on: 532605.0/703875.0
Working on: 529935.0/704235.0
Working on: 529815.0/706305.0
Working on: 531135.0/705765.0
Working on: 531915.0/704985.0
Working on: 530745.0/704385.0
Working on: 530685.0/704415.0
Working on: 529965.0/703935.0
Working on: 529905.0/705405.0
Working on: 532335.0/705285.0
Working on: 531795.0/706575.0
Working on: 531915.0/705885.0
Working on: 530235.0/704355.0
Working on: 531975.0/704595.0
Working on: 529695.0/704565.0
Working on

In [164]:
def pick(arr, y, x):
    return arr.isel(y=y, x=x)


def accumulate(rec):
    out = []
    for r in rec:
        out.extend(r)
    return np.asarray(out)

In [583]:
def start(dsk):
    print('start')
    import pdb; pdb.set_trace()
    
def start_state(dsk, state):
    print('start_state')
    import pdb; pdb.set_trace()
    
def pretask(key, dsk, state):
    print('pretask')
    import pdb; pdb.set_trace()

def posttask(key, result, dsk, state, worker_id):
    print('posttask')
    import pdb; pdb.set_trace()
    

cb = dask.callbacks.Callback(start=start, start_state=start_state, pretask=pretask, posttask=posttask)

In [585]:
def f(x, y):
    return x + y

d = {'hi': dat}

dsk = {
    'x': 5,
    'y': 10,
    'arr': dat,
    'd': d,
    ('z', 'w'): (f, 'd', 'x'),
    'q': (f, ('z', 'w'), 1)
}

# out = dask.threaded.get(dsk, 'q')

def printkeys(key, dask, state):
    print("Computing: {0}!".format(repr(key)))

# with dask.callbacks.Callback(pretask=printkeys):
with cb:
    out = dask.threaded.get(dsk, 'q')

start
--Return--
> <ipython-input-583-662ed486ffb9>(3)start()->None
-> import pdb; pdb.set_trace()
(Pdb) c
start_state
--Return--
> <ipython-input-583-662ed486ffb9>(7)start_state()->None
-> import pdb; pdb.set_trace()
(Pdb) state
{'released': set([]), 'waiting': {'q': set([('z', 'w')])}, 'dependencies': {'q': set([('z', 'w')]), 'x': set([]), 'd': set([]), ('z', 'w'): set(['x', 'd'])}, 'waiting_data': {'x': set([('z', 'w')]), 'd': set([('z', 'w')]), ('z', 'w'): set(['q'])}, 'ready': [('z', 'w')], 'dependents': {'q': set([]), 'x': set([('z', 'w')]), 'd': set([('z', 'w')]), ('z', 'w'): set(['q'])}, 'cache': {'x': 5, 'd': {'hi': <xarray.Dataset>
Dimensions:      (time: 225, x: 100, y: 100)
Coordinates:
  * time         (time) datetime64[ns] 1997-08-30 1997-11-18 1997-12-20 ...
  * y            (y) float64 5.297e+05 5.297e+05 5.298e+05 5.298e+05 ...
  * x            (x) float64 7.038e+05 7.039e+05 7.039e+05 7.039e+05 ...
Data variables:
    hh           (time, y, x) float64 nan nan nan nan 

TypeError: unsupported operand type(s) for +: 'dict' and 'int'

Traceback
---------
  File "/games/conda/conda2/envs/yatsm/lib/python2.7/site-packages/dask/async.py", line 267, in execute_task
    result = _execute_task(task, data)
  File "/games/conda/conda2/envs/yatsm/lib/python2.7/site-packages/dask/async.py", line 249, in _execute_task
    return func(*args2)
  File "<ipython-input-585-cf2b022482da>", line 2, in f
    return x + y


In [175]:
dsk = {
    'x': 5,
    'y': 10,
    'd': d,
    ('z', 'w'): (f, 'd', 'x', 'y'),
    'q': (f, ('z', 'w'), 1, 1)
}

10 loops, best of 3: 71.9 ms per loop


In [None]:
%timeit dask.threaded.get(dsk, 'record-ccdc')

In [176]:
%%timeit

_d = norm_diff(dat, 'ndvi', ('nir', 'red'))
_d = pick(_d, 0, 0)
_CCDCesque(select(_d, bands), {})

10 loops, best of 3: 74.4 ms per loop


In [None]:
for y in range(5):
    for x in range(5):
        