In [None]:
import numba
import numpy as np
import cupy

def gather_from_detector():
    return cupy.random.random((1000, 1000))


@numba.cuda.jit
def smooth_gpu(x, out):
    i, j = numba.cuda.grid(2)
    n, m = x.shape
    if 1 <= i < n - 1 and 1 <= j < m - 1:
        out[i, j] = (x[i - 1, j - 1] + x[i - 1, j] + x[i - 1, j + 1] +
                     x[i    , j - 1] + x[i    , j] + x[i    , j + 1] +
                     x[i + 1, j - 1] + x[i + 1, j] + x[i + 1, j + 1]) // 9


def smooth(x):
    out = cupy.empty_like(x)
    smooth_gpu(x, out)
    return out


def save(x, filename):
    pass

In [None]:
from dask.distributded import Client, fire_and_forget
from dask_cuda import LocalCUDACluster

cluster = LocalCUDACluster()
client = Client(cluster)
client

In [None]:
for i in range(100):
    img = client.submit(gather_from_detector, pure=False)
    img = client.submit(smooth, img)
    img = client.submit(np.fft.fft2, img)
    future = client.submit(save, img, "file-" + str(i) + "-.dat")
    fire_and_forget(future)

In [None]:
import asyncio

async def gather():
    while True:
        img = client.submit(gather_from_detector, pure=False)
        img = client.submit(smooth, img)
        img = client.submit(np.fft.fft2, img)
        future = client.submit(save, img, "file-" + str(i) + "-.dat")
        fire_and_forget(future)
        await asyncio.sleep(0.01)
        
asyncio.ensure_future(gather())

In [None]:
cluster.scale(6)