Skip to content

Implement timeout and key routing for Stream.partition#376

Merged
martindurant merged 6 commits intopython-streamz:masterfrom
roveo:master
Nov 13, 2020
Merged

Implement timeout and key routing for Stream.partition#376
martindurant merged 6 commits intopython-streamz:masterfrom
roveo:master

Conversation

@roveo
Copy link
Copy Markdown
Contributor

@roveo roveo commented Nov 5, 2020

Resolves #375

@roveo
Copy link
Copy Markdown
Contributor Author

roveo commented Nov 5, 2020

Looks like flake8 check failed, but not on the files that I changed. Should I fix these?

@roveo roveo marked this pull request as draft November 6, 2020 17:09
@roveo
Copy link
Copy Markdown
Contributor Author

roveo commented Nov 6, 2020

The callback flushing mechanism is flawed, data accumulates in pending callbacks, and this breaks backpressure. Will have to implement some kind of locking mechanism to prevent this.

@roveo
Copy link
Copy Markdown
Contributor Author

roveo commented Nov 7, 2020

Nope, it isn't, I was just building my test pipeline in an incorrect way. Decribed in #378

@roveo roveo marked this pull request as ready for review November 9, 2020 14:34
@martindurant
Copy link
Copy Markdown
Member

Looks like flake8 check failed, but not on the files that I changed. Should I fix these?

Please do fix - it may be due to a new version of flake8

Comment thread streamz/core.py Outdated
Comment thread streamz/core.py Outdated
Comment thread streamz/core.py
self._retain_refs(metadata)
self._buffer.append(x)
key = self._get_key(x)
buffer = self._buffer[key]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to create new variables buffer and metadata_buffer here?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc: @jsmaupin for review because this PR is changing how metadata is being handled in partition.

Copy link
Copy Markdown
Contributor Author

@roveo roveo Nov 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to create new variables buffer and metadata_buffer here?

We really don't have to, but since they're referenced multiple times, I figured it'd me more readable than self._metadata_buffer[key].

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the cc. The metadata changes look okay to me.

assert L == [(0, 2), (1, 3), (4, 6), (5, 7)]


def test_partition_size_one():
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the goal of this test?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here:

https://github.com/roveo/streamz/blob/3747d8b37c909ff53b16f9a3723140698a0dd8a8/streamz/core.py#L1028-L1036

I didn't have self.n > 1 condition at first and self._callbacks[key].cancel() raised KeyError when n=1. So it's kind of an edge case test. If you think it's unnecessary, I'll remove it.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking if we should add a check in partition (and update docstring) forcing n>1. If n=1, the user would be better off avoiding the use of partition altogether. What do you think @martindurant?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the sake of test and debug, I think it's OK to allow a redundant partition with n=1.

@martindurant
Copy link
Copy Markdown
Member

Feel free to push an empty commit here to restart the CI

@codecov
Copy link
Copy Markdown

codecov Bot commented Nov 11, 2020

Codecov Report

Merging #376 (bf2e356) into master (dffe408) will increase coverage by 0.19%.
The diff coverage is 97.05%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #376      +/-   ##
==========================================
+ Coverage   95.77%   95.96%   +0.19%     
==========================================
  Files          16       16              
  Lines        2508     2527      +19     
==========================================
+ Hits         2402     2425      +23     
+ Misses        106      102       -4     
Impacted Files Coverage Δ
streamz/dataframe/core.py 93.39% <ø> (+0.91%) ⬆️
streamz/core.py 95.85% <97.05%> (+0.10%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update e9a2545...bf2e356. Read the comment docs.

@martindurant martindurant merged commit 15cfe9c into python-streamz:master Nov 13, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Proposal: add timeout and key options to Stream.partition()

4 participants