# Dask Futures

Dask 支持扩展 Python 的concurrent.futures接口的实时任务框架

它是即时的而不是惰性的，这在计算可能随时间演变的情况下提供了更多的灵活性。

In [1]:
from dask.distributed import Client
client = Client(n_workers=4, threads_per_worker=4)
client

0,1
Client  Scheduler: tcp://127.0.0.1:60400  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 16  Memory: 16.00 GiB


## 简单使用

In [2]:
import time
import random

def inc(x):
    time.sleep(random.random())
    return x + 1

def double(x):
    time.sleep(random.random())
    return 2 * x

def add(x, y):
    time.sleep(random.random())
    return x + y

In [3]:
# 本地运行方式
inc(1)
double(2)

4

In [4]:
# Dask远程执行
future = client.submit(int, 1)
future

In [5]:
future

In [6]:
future.result()

1

## 链式依赖计算

In [7]:
# add函数依赖inc、double计算完后的值作为参数传入
x = client.submit(inc, 1)
y = client.submit(double, 2)
z = client.submit(add, x, y)
z

In [8]:
z.result()

6

## 多个任务同时提交

In [9]:
%%time

zs = []

for i in range(256):
    x = client.submit(inc, i)     # x = inc(i)
    y = client.submit(double, x)  # y = inc(x)
    z = client.submit(add, x, y)  # z = inc(y)
    zs.append(z)

CPU times: user 329 ms, sys: 36.9 ms, total: 366 ms
Wall time: 349 ms


In [10]:
total = client.submit(sum, zs)

In [11]:
from dask.distributed import wait, progress

In [12]:
# 异步化进度条
progress(total)

VBox()

In [14]:
total.result()

98688

## 树求和

In [16]:
L = zs
while len(L) > 1:
    new_L = []
    for i in range(0, len(L), 2):
        future = client.submit(add, L[i], L[i + 1])  # add neighbors
        new_L.append(future)
    L = new_L    

In [17]:
del zs, L, future, new_L, x, y

In [18]:
client.shutdown()

distributed.client - ERROR - Failed to reconnect to scheduler after 10.00 seconds, closing client
_GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
asyncio.exceptions.CancelledError
