Skip to content

Conversation

@martindurant
Copy link
Member

Attempts to extend #216 following feedback

Still contains a gen-sleep in the dask test. This could be thought OK usage, since that's not the way that people will use it in the wild. Good chances of these tests failing now again on Travis, we'll see.

@codecov-io
Copy link

codecov-io commented Mar 17, 2019

Codecov Report

❗ No coverage uploaded for pull request base (master@1486318). Click here to learn what that means.
The diff coverage is 100%.

Impacted file tree graph

@@            Coverage Diff            @@
##             master     #230   +/-   ##
=========================================
  Coverage          ?   93.38%           
=========================================
  Files             ?       13           
  Lines             ?     1481           
  Branches          ?        0           
=========================================
  Hits              ?     1383           
  Misses            ?       98           
  Partials          ?        0
Impacted Files Coverage Δ
streamz/sources.py 94.76% <100%> (ø)

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 1486318...7fe9651. Read the comment docs.

@martindurant
Copy link
Member Author

@mrocklin , I am thinking that the use pattern of the Future in to_kafka could be wrong. The Future is created when data is provided (in .update()), and set immediately when the producer is called. That happens on the event loop, but does not mean the data has been delivered.

Should the Future only get set in .cb(), after successful delivery? If so, there can be more than one future in flight? They would need to be stored in a queue (order matters, content does not).

How would one test that backpressure was working as it should?

Copy link
Collaborator

@mrocklin mrocklin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking that the use pattern of the Future in to_kafka could be wrong. The Future is created when data is provided (in .update()), and set immediately when the producer is called. That happens on the event loop, but does not mean the data has been delivered.

Should the Future only get set in .cb(), after successful delivery?

Yes, that seems more sensible to me

If so, there can be more than one future in flight? They would need to be stored in a queue (order matters, content does not).

Yes, and I think it's ok for there to be more than one future in flight. This is, for example, the purpose of buffer. Consider two cases where we just want to dump a bunch of data we have in a list into kafka

source = Stream()
k = source.to_kafka(...)

for message in list_of_data:
    await source.emit(message)

The emit function here returns the future provided by update. So we submit one message, get back the future, wait until C-K gives us the OK, then submit another message. No buffering of data in memory at all. Very conservative and safe

source = Stream()
k = source.buffer(10).to_kafka(...)

for message in list_of_data:
    await source.emit(message)

How we throw a buffer in the middle. This adds a queue much like you suggest. It holds onto ten futures provided by to_kafka in a queue and sends back its own futures (the first ten of which are already set). We now have up to ten messages in flight in the system without Kafka having finished them. This is nice because Kafka has a bit of a backlog, and so is likely to remain busy, but can get troublesome if those ten messages are larger than memory. We think that it's likely that we can hold ten messages in the memory of our Streamz process, so we're comfortable with this decision.

You may want to go look at the buffer implementation to get a sense for how it does this. I think it's nice that the queuing approach you suggested can be implemented orthogonally here, and possibly shared with other sources and sinks.

while True:
try:
# this runs asynchronously, in C-K's thread
self.producer.produce(self.topic, x, callback=self.cb)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you verify that this doesn't block if the producer is busy?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, does the callback get run on the event loop's thread, or C-K's thread?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it gets run in C-K's thread then you may have to use self.loop.add_callback, which is the only threadsafe mechanism to create coroutines from other threads.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should have been more verbose here. The doc says that the method call is always async (returns immediately), and I have found no case where it isn't.

The actual marshalling of the message to kafka happens in C-K's own thread, a C-level place. But the python callbacks for delivery are triggered upon poll(), and so that call and the coroutine they refer to must happen in the same event loop - so poll() is also in a coroutine.

streamz/core.py Outdated


@Stream.register_api()
class to_kafka_batched(Stream):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

read_kafka_batched was oriented around supporting Dask. Is this the same?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know. I can see if it works with Dask. I'm trying to fix the existing code rather than design it's purpose :| I gather better performance was expected, in terms of messages/s with batches.

@martindurant
Copy link
Member Author

I'll have a loop at to_kafka_batched yet, and maybe remove it if I can't figure out what to do with it; it doesn't appear to be tested. It doesn't use futures, so it doesn't currently make a good sink.

@mrocklin
Copy link
Collaborator

We could also drop that for a first pass and just get a solid to_kafka in?

@martindurant
Copy link
Member Author

OK, removed it. May come back to that - the goal here from my point of view was to get general streamz PRs green again.

streamz/core.py Outdated
self.producer.produce(self.topic, x, callback=self.cb)
return
except BufferError:
yield gen.sleep(0.1)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be self.poll_time?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right

return future

@gen.coroutine
def cb(self, err, msg):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does confluent_kafka run the callback in a separate thread? If so then we can't make this a coroutine.

That might be OK. to_kafka is likely to be used as a sink, and so we don't expect to get a future from downstream that we're supposed to wait on. It might be enough here to just set the result and be done.

If cb is called in the thread where update is called, then it might be enough to just set the result and be done

def cb(self, err, msg):
    if good:
        future.set_result(...)
    else:
        future.set_exception(...)

However, if it's going to be called within a separate thread, then we'll need to use loop.add_callback, which is the only threadsafe method in torando

def cb(...):
    def _():
        future.set_result(...)
        ...
    self.loop.add_callback(_)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does confluent_kafka run the callback in a separate thread?

No, it runs in the thread which calls producer.poll(), which is here being called in a coroutine on the current loop. Only the actual low-level delivery runs in another thread.

it might be enough to just set the result and be done

There is no Sink type, though, right? I mean, it may be reasonable to expect events to be passed down a chain still.

Copy link
Collaborator

@mrocklin mrocklin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some additional comments. Thank you for your patience with this. I appreciate you slogging through here. I'm liking the progress, and happy to see you taking on things here :)

dask=True)
out = stream.gather().sink_to_list()
stream.start()
yield gen.sleep(5) # this frees the loop while dask workers report in
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would an await_for statement make more sense here? Do we know what keeps the dask workers so long?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not know what's happening in this time. I tried yield await_for(lambda: c.ncores()), but get
distributed/distributed/utils_test.py:945, AssertionError, an absolute timeout in gen_cluster and warnings like

distributed.core - INFO - Event loop was unresponsive in Worker for 10.98s.

i.e., the gen.sleep of a single seems to release where querying the cluster is actually blocking.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that c.ncores will be a coroutine in async mode, so you would need to yield the result.

However, I also don't understand the need here. Any workers that will arrive have arrived at the start of the function. gen_cluster waits until everyone is here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I yielded for ncores - didn't work :|
Sorry, I don't know more, I can't think how to investigate this further. The following also fails, with the same timeout in gen_cluster (not in the test itself)

        t = time.time()
        while True:
            yield gen.sleep(0.2)
            ncore = yield c.ncores()
            if ncore:
                break
            assert time.time() - t < 10, "Timeout"

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still confused about what this sleep is doing. I would be very surprised if the workers weren't already up and connected at this point.

In [1]: from distributed.utils_test import gen_cluster

In [2]: @gen_cluster(client=True)
   ...: def test_foo(c, s, a, b):
   ...:     assert len(s.workers) == 2
   ...:

In [3]: test_foo()
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at:     tcp://127.0.0.1:52172
distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:52173
distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:52173
distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:52172
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          1
distributed.worker - INFO -                Memory:                   17.18 GB
distributed.worker - INFO -       Local Directory: /Users/mrocklin/workspace/distributed/dask-worker-space/worker-c59n7r0g
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:52174
distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:52174
distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:52172
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          2
distributed.worker - INFO -                Memory:                   17.18 GB
distributed.worker - INFO -       Local Directory: /Users/mrocklin/workspace/distributed/dask-worker-space/worker-btnrzcte
distributed.worker - INFO - -------------------------------------------------
distributed.scheduler - INFO - Register tcp://127.0.0.1:52173
distributed.scheduler - INFO - Register tcp://127.0.0.1:52174
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:52173
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:52174
distributed.core - INFO - Starting established connection
distributed.worker - INFO -         Registered to:      tcp://127.0.0.1:52172
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -         Registered to:      tcp://127.0.0.1:52172
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Receive client connection: Client-3da15092-4b26-11e9-9923-acde48001122
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Remove client Client-3da15092-4b26-11e9-9923-acde48001122
distributed.scheduler - INFO - Remove client Client-3da15092-4b26-11e9-9923-acde48001122
distributed.scheduler - INFO - Close client connection: Client-3da15092-4b26-11e9-9923-acde48001122
distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:52173
distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:52174
distributed.scheduler - INFO - Scheduler closing...
distributed.scheduler - INFO - Scheduler closing all comms
distributed.scheduler - INFO - Remove worker tcp://127.0.0.1:52173
distributed.core - INFO - Removing comms to tcp://127.0.0.1:52173
distributed.batched - INFO - Batched Comm Closed: in <closed TCP>: Stream is closed
distributed.scheduler - INFO - Remove worker tcp://127.0.0.1:52174
distributed.core - INFO - Removing comms to tcp://127.0.0.1:52174
distributed.scheduler - INFO - Lost all workers
distributed.worker - INFO - Connection to scheduler broken.  Reconnecting...
distributed.worker - INFO - Connection to scheduler broken.  Reconnecting...
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - -------------------------------------------------

We may still need a sleep here, but it would be good to know why.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, but I don't know how to investigate.

@martindurant
Copy link
Member Author

Note that the finalize thing fails here, because it doesn't exist in py2. Distributed has an implementation, but I'm wondering whether it would be better to more to a __del__ implementation; or just revert.

Again, I am wanting to clean this up so that the other lingering PRs can get moving.

@mrocklin
Copy link
Collaborator

Note that the finalize thing fails here, because it doesn't exist in py2. Distributed has an implementation, but I'm wondering whether it would be better to more to a del implementation; or just revert.

FWIW I'd be quite happy to drop Python 2 in this project.

Again, I am wanting to clean this up so that the other lingering PRs can get moving.

OK. If you want to revert the previous commit I'd be happy to merge.

@martindurant
Copy link
Member Author

Done.

I'd be quite happy to drop Python 2

Great! That would be a separate PR.
Does tornado gen-type syntax and async/await mix OK, or would all of that stuff need to be rewritten (or just use gen forever)?

@mrocklin
Copy link
Collaborator

Does tornado gen-type syntax and async/await mix OK, or would all of that stuff need to be rewritten (or just use gen forever)?

They mostly mix, but there are some small semantic differences. Coroutines created with @gen.coroutine start once they're called. async def functions have to be awaited before they start.

@mrocklin mrocklin merged commit 090b9ba into python-streamz:master Mar 20, 2019
@martindurant
Copy link
Member Author

Hurray!

@martindurant martindurant deleted the kafka-fixes branch March 20, 2019 15:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants