In [None]:
import numpy as np
import pandas as pd
import pyarrow as pa
import dask
import distributed
from distributed import Client
from cytoolz import partial
import streamz
import asyncio
import time

In [None]:
def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype("float64").itemsize)
    return pd.DataFrame({"c" + str(i): np.random.randn(nrows) for i in range(ncols)})

In [None]:
local_client = Client(processes=False, diagnostics_port=("0.0.0.0", 9500))

In [None]:
local_client

In [None]:
async def mock_stream_async(total_size, ncols, num):
    df = generate_data(total_size, ncols)
    for i in range(num):
        await source.emit((df, df, df))


def mock_stream(total_size, ncols, num):
    df = generate_data(total_size, ncols)
    for i in range(num):
        source.emit((df, df, df))


def pandas_to_arrow(dfs, sinks, writers):
    for i, df in enumerate(dfs):
        batch = pa.RecordBatch.from_pandas(df)
        if i not in writers:
            sinks[i] = pa.BufferOutputStream()
            writers[i] = pa.RecordBatchStreamWriter(sinks[i], batch.schema)
        writers[i].write_batch(batch)


source = streamz.Stream(asynchronous=False)

stream_sinks = {}
stream_writers = {}

source.rate_limit(0.004).timed_window(1).flatten().sink(
    partial(pandas_to_arrow, sinks=stream_sinks, writers=stream_writers)
)

# t = asyncio.get_event_loop().create_task(mock_stream_async(10**3, 10, 10**6))
t = asyncio.run_in_(mock_stream_async(10**3, 10, 10**6))

In [None]:
0

In [None]:
t

In [None]:
t.cancel()

In [None]:
stream_sinks

In [None]:
%%time
pa.open_stream(stream_sinks[2].getvalue()).read_pandas()

# Gather

In [None]:
async def gather_stream(source, futures):
    ac = distributed.as_completed(futures, with_results=False)
    async for future in ac:
        source.emit(future)


from tornado import gen
from distributed.client import default_client


@streamz.Stream.register_api()
class gather_and_cancel(streamz.Stream):
    def __init__(self, upstream, stream_name=None, client=None, cancel=True):
        if client is None:
            client = default_client()
        self.client = client
        self.cancel = cancel
        streamz.Stream.__init__(self, upstream, stream_name=stream_name)

    @gen.coroutine
    def update(self, x, who=None):
        result = yield self.client.gather(x, asynchronous=True)
        if self.cancel:
            self.client.cancel(x)
        result2 = yield self._emit(result)
        raise gen.Return(result2)


def do_task(pause, seed):
    1 / pause
    # asyncio.sleep(pause)
    time.sleep(pause)
    return pause


source = streamz.Stream(asynchronous=True)

stream_sinks = {}
stream_writers = {}

# source.rate_limit(0.0004).timed_window(1).map(partial(gather_and_cancel, cancel=True)).flatten().sink(partial(sink_to_arrow, sinks=stream_sinks, writers=stream_writers))
# source.rate_limit(0.0004).timed_window(1).gather_and_cancel(client=client, cancel=False).flatten().sink(partial(sink_to_arrow, sinks=stream_sinks, writers=stream_writers))
source.gather_and_cancel(cancel=True).sink(print)

futures = [
    local_client.submit(do_task, np.random.randint(5), np.random.random())
    for i in range(100)
]

t = asyncio.get_event_loop().create_task(gather_stream(source, futures))