💡 Implement automatic stream implementation selection#102
Conversation
…elism recommendation
Reviewer's GuideImplements dynamic switching between sequential and parallel stream implementations by adding automatic selection logic in StreamConverter, extending BaseStream with recommendation and explicit-control methods, integrating this into terminal execution, and strengthening generator support and test coverage. File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Hey @garlontas - I've reviewed your changes and they look great!
Here's what I looked at during the review
- 🟡 General issues: 2 issues found
- 🟡 Testing: 1 issue found
- 🟢 Documentation: all looks good
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| if isinstance(self._source, Sized): | ||
| for item in self._queue.get_queue(): | ||
| if item.has_name(self._filter) and len(self._source) > 3000: | ||
| return True |
There was a problem hiding this comment.
suggestion: Extract the hardcoded 3000 threshold into a constant or config
Defining a named constant or making this value configurable will improve code clarity and maintainability.
| if isinstance(self._source, Sized): | |
| for item in self._queue.get_queue(): | |
| if item.has_name(self._filter) and len(self._source) > 3000: | |
| return True | |
| PARALLELISM_RECOMMENDATION_THRESHOLD = 3000 | |
| if isinstance(self._source, Sized): | |
| for item in self._queue.get_queue(): | |
| if item.has_name(self._filter) and len(self._source) > PARALLELISM_RECOMMENDATION_THRESHOLD: | |
| return True |
| def test_parallelization_recommended(self): | ||
| stream = Stream.of(range(4000)).filter(lambda x: x % 2 == 0) | ||
| self.assertTrue(stream._is_parallelism_recommended()) |
There was a problem hiding this comment.
suggestion (testing): Test _is_parallelism_recommended behavior with a non-Sized stream source.
Please add a test using a generator as the source to confirm that _is_parallelism_recommended returns False when the source is not Sized.
| def test_parallelization_recommended(self): | |
| stream = Stream.of(range(4000)).filter(lambda x: x % 2 == 0) | |
| self.assertTrue(stream._is_parallelism_recommended()) | |
| def test_parallelization_recommended(self): | |
| stream = Stream.of(range(4000)).filter(lambda x: x % 2 == 0) | |
| self.assertTrue(stream._is_parallelism_recommended()) | |
| def test_parallelization_not_recommended_with_generator(self): | |
| def gen(): | |
| for i in range(4000): | |
| yield i | |
| stream = Stream.of(gen()).filter(lambda x: x % 2 == 0) | |
| self.assertFalse(stream._is_parallelism_recommended()) |
pystreamapi/__stream_converter.py
Outdated
| if not stream._implementation_explicit: | ||
| if stream._is_parallelism_recommended(): | ||
| return StreamConverter.to_parallel_stream(stream) |
There was a problem hiding this comment.
suggestion (code-quality): Merge nested if conditions (merge-nested-ifs)
| if not stream._implementation_explicit: | |
| if stream._is_parallelism_recommended(): | |
| return StreamConverter.to_parallel_stream(stream) | |
| if not stream._implementation_explicit and stream._is_parallelism_recommended(): | |
| return StreamConverter.to_parallel_stream(stream) | |
Explanation
Too much nesting can make code difficult to understand, and this is especiallytrue in Python, where there are no brackets to help out with the delineation of
different nesting levels.
Reading deeply nested code is confusing, since you have to keep track of which
conditions relate to which levels. We therefore strive to reduce nesting where
possible, and the situation where two if conditions can be combined using
and is an easy win.
|



This pull request introduces enhancements to the
pystreamapilibrary, focusing on stream implementation flexibility, parallelization recommendations, and improved testing coverage. The key changes include adding functionality for automatic stream implementation selection, refining parallelism recommendations, and ensuring robust handling of generators.Stream Implementation Enhancements:
choose_implementationmethod toStreamConverterfor automatic selection between sequential and parallel stream implementations based on parallelism recommendations. (pystreamapi/__stream_converter.py)_is_parallelism_recommendedand_set_implementation_explicitmethods inBaseStreamto determine and control stream implementation switching. (pystreamapi/_streams/__base_stream.py)Parallelization Logic:
terminaldecorator to integrate automatic implementation selection during stream operations. (pystreamapi/_streams/__base_stream.py)tests/_streams/test_base_stream.py)Generator Handling:
tests/_streams/test_stream_implementation.py) [1] [2]Testing Enhancements:
choose_implementationmethod, including integration tests with stream operations and numeric streams. (tests/_streams/test_stream_converter.py)tests/_streams/test_base_stream.py,tests/_streams/test_stream_converter.py) [1] [2]Summary by Sourcery
Introduce automatic selection between sequential and parallel stream implementations based on runtime recommendations, refine parallelism logic, improve generator compatibility, and expand test coverage.
New Features:
Enhancements:
Tests: