-
Notifications
You must be signed in to change notification settings - Fork 151
Added the to_kafka stream #216
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
cc @martindurant you might find this interesting |
|
I do! I'll try to look at it in the next couple of days. |
|
Is someone maintaining this project? The errors in the tests are not obviously caused by the new code, and there doesn't seem to have been much activity in several months. |
|
I think if pytest is pinned to 3.10 this might be ok. |
No |
|
Or at least, I'm not. Apparently no one else is either? |
|
Resolved the pytest things here: #217 |
|
I've resolved the pytest failures and restarted tests here. It looks like there are kafka failures in master. @martindurant I suspect that you are the most familiar with that testing infrastructure (although @jsmaupin if you have energy to spend it would be good to get your eyes on the kafka testing code as well as we'll need that to write tests for your work here as well) |
|
Sure, I'll take a look. |
|
The kafka-threads are the ones that appear to error (they first fail, timeout, and then don't gracefully end), so something with the event loop deadlocking, probably. All pass locally? Note that I don't see any new tests for the functionality in this PR. |
…pped parameter is in the upstream
|
Happy to see this fixed! You seem only to be failing on flake lint (lots of it, presumably not added by you). |
If it's unrelated to this work then I'll take care of it later. |
|
@martindurant thanks for reviewing this. Do you have any other concerns about the content of the implementation? |
|
I'll try to take a closer look at this later today |
martindurant
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A couple of quick questions, now that things are working.
streamz/core.py
Outdated
| Stream.__init__(self, upstream, ensure_io_loop=True, **kwargs) | ||
|
|
||
| def update(self, x, who=None): | ||
| self.producer.poll(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this necessary? If the data is not sent strictly synchronously, I think that's OK.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm also curious about this, and more generally how well this approach works in an asynchronous environment. It has been a while since I looked at confluent_kafka_python so I've forgotten some things
If the producer is busy then what happens with producer.poll(0)? Does it err, does it block? It may be that neither of these outcomes is desirable.
streamz/core.py
Outdated
| def flush(self, timeout=-1): | ||
| self.producer.flush(timeout) | ||
|
|
||
| def cb(self, err, msg): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this should be a sink, and not emit anything? Of course, I see the usefulness in testing.
|
I have managed to fix most of the tests. I added another test for the new .from_kafka() class. The main issue is described here from the author of the Confluent Python library: confluentinc/confluent-kafka-python#156 (comment). Basically, the .subscribe() method does not immediately subscribe. There is an asynchronous operation it must complete. Subscribing and then immediately producing messages from the same client is not a normal use-case. Most of the time, the subscriber and producer are on different locations on a network. I have added a _blocking_subscribe method that resolves this issue. We did not see this in the non-threading tests because they run first and the tests use the auto-create-topic functionality in Kafka. There is a slight delay when producing the first message while Kafka creates the topic. There was enough time here for the .subscribe() message to completely subscribe. Once the topic was created, the rest of tests failed. I have not been able to get the test_kafka_dask_batch test to pass. |
|
On second thought, if this is not a normal use-case, perhaps I should move the blocking subscribe functionality to the tests? |
|
Does sound like a test-helper only, but don't mind where it appears |
|
Here are my test results for various implementations. For each I used 10,000 records. Outside of the streams library to_kafka to_kafka_batched |
|
@jsmaupin , any objection if I push flake8 fixes here? Everything else seems to be in order, and other PRs are failing while this is unmerged. |
|
No objections here. |
|
Ah, sorry, I can't - I'm not a --- a/streamz/tests/test_core.py
+++ b/streamz/tests/test_core.py
@@ -942,7 +942,7 @@ def dont_test_stream_kwargs(clean):
@pytest.fixture
-def thread(loop):
+def thread(loop): # noqa: E811
from threading import Thread, Event
thread = Thread(target=loop.start)
thread.daemon = True
diff --git a/streamz/tests/test_dask.py b/streamz/tests/test_dask.py
index 5c007a3..9a415fd 100644
--- a/streamz/tests/test_dask.py
+++ b/streamz/tests/test_dask.py
@@ -74,7 +74,7 @@ def test_zip(c, s, a, b):
@pytest.mark.slow
-def test_sync(loop):
+def test_sync(loop): # noqa: E811
with cluster() as (s, [a, b]):
with Client(s['address'], loop=loop) as client: # flake8: noqa
source = Stream()
@@ -91,7 +91,7 @@ def test_sync(loop):
@pytest.mark.slow
-def test_sync_2(loop):
+def test_sync_2(loop): # noqa: E811
with cluster() as (s, [a, b]):
with Client(s['address'], loop=loop): # flake8: noqa
source = Stream()
@@ -132,7 +132,7 @@ def test_buffer(c, s, a, b):
@pytest.mark.slow
-def test_buffer_sync(loop):
+def test_buffer_sync(loop): # noqa: E811
with cluster() as (s, [a, b]):
with Client(s['address'], loop=loop) as c: # flake8: noqa
source = Stream()
@@ -158,7 +158,7 @@ def test_buffer_sync(loop):
@pytest.mark.xfail(reason='')
@pytest.mark.slow
-def test_stream_shares_client_loop(loop):
+def test_stream_shares_client_loop(loop): # noqa: E811
with cluster() as (s, [a, b]):
with Client(s['address'], loop=loop) as client: # flake8: noqa
source = Stream() |
|
(if you don't want to deal with this, @jsmaupin , happy to push to my fork instead) |
|
Sorry, apparently it takes a little more work! |
|
Yeah, np. It looks like one version had the noqa on the decorators and yours was on the function itself. |
|
It seems the other one was right! Ooops... (plus some whitespace has crept in) |
|
Some E811 codes should have been F811. I tested locally this time. |
|
Strangely, seeing syntax error now at https://github.com/mrocklin/streamz/pull/216/files#diff-a56707adbd2d2392ea8625521ed6edd0R156 - should be tornado gen/yield pattern? Otherwise, should be in py3_test_core. But this was passing all tests before, so what happened? |
|
There was some Python3 specific code in there. This should be a working commit. |
skmatti
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Possible reason for why you were not able to make test_kafka_dask_batch test to pass.
| stream = Stream.from_kafka_batched(TOPIC, ARGS) | ||
| out = stream.sink_to_list() | ||
| stream.start() | ||
| sleep(2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may cause this test to fail, because default poll_interval is 1 sec and the test would have polled 2 batches. The second batch would be empty list and out[-1][-1] would be null always.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure about that, I think the time accounts for the initial setting up of the consumer, and no data has been sent yet at this point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, if I had another way to do this without a sleep, I think that would be preferable, but the delay for the topic creation is why it is there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, you are right! Data hasn't been sent yet.
Codecov Report
@@ Coverage Diff @@
## master #216 +/- ##
=========================================
Coverage ? 93.12%
=========================================
Files ? 13
Lines ? 1483
Branches ? 0
=========================================
Hits ? 1381
Misses ? 102
Partials ? 0
Continue to review full report at Codecov.
|
|
Green! Let's merge this! |
|
(^ @mrocklin ) |
|
@mrocklin, let me know if there's anything I can do to help get this closed |
mrocklin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few questions about async behavior
| except Exception as e: | ||
| future.set_exception(e) | ||
|
|
||
| self.loop.add_callback(_) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function above, is it likely to take a non-trivial amount of time? If so then we probably can't place it onto the event loop.
| stream = Stream.from_kafka([TOPIC], ARGS, asynchronous=True) | ||
| out = stream.sink_to_list() | ||
| stream.start() | ||
| sleep(5) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this sleep?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can probably be removed. I'll take a closer look
| stream = Stream.from_kafka([TOPIC], ARGS) | ||
| out = stream.sink_to_list() | ||
| stream.start() | ||
| sleep(2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this sleep? In general sleeps can be troublesome because:
- They make tests take a long time if they are overly conservative
- If the system running the tests is abnormally slow then they can fail (Travis is often abnormally slow)
I'm also concerned about using time.sleep here in an async test. This is usually an anti-pattern.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I totally agree. I'll take some time to come up with a better solution.
|
|
||
| def _(): | ||
| while True: | ||
| self.producer.poll(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if we poll and the producer is still busy? How do we handle back pressure?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea is that this while loop will not exit if the producer queue is full. Am I breaking something in here with regards to how async' systems work? Will this block other parts of the program?
|
@mrocklin thank you much for the feedback. |
|
So, I can close this PR? |
|
This PR should now be closed |
|
Thanks @jsmaupin for your work here. |
I have added the to_kafka class.
The options were to either:
.flush()method after each.produce(...)call. This will seriously hurt performance as messages will be sent serially.atexitto call flush at the end. This requires the use ofatexit, of which I'm unsure of any impact. It also means that the result of the.emit(...)call by the client does not yield an immediate call to the downstream. Note that the Confluent library will flush when the batch size reaches a threshold. This is the method that is implemented here.