Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 63 additions & 5 deletions pystreamapi/_streams/__base_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,19 @@ def __drop_while(self, predicate: Callable[[Any], bool]):
"""Drops elements from the stream while the predicate is true."""
self._source = list(itertools.dropwhile(predicate, self._source))

@abstractmethod
def filter(self, predicate: Callable[[K], bool]) -> 'BaseStream[_V]':
def filter(self, predicate: Callable[[K], bool]) -> 'BaseStream[K]':
"""
Returns a stream consisting of the elements of this stream that match the given predicate.

:param predicate:
"""
self._queue.append(Process(self._filter, predicate))
return self

@abstractmethod
def _filter(self, predicate: Callable[[K], bool]):
"""Implementation of filter. Should be implemented by subclasses."""

def flat_map(self, predicate: Callable[[K], Iterable[_V]]) -> 'BaseStream[_V]':
"""
Returns a stream consisting of the results of replacing each element of this stream with
Expand All @@ -88,6 +92,31 @@ def flat_map(self, predicate: Callable[[K], Iterable[_V]]) -> 'BaseStream[_V]':

:param predicate:
"""
self._queue.append(Process(self._flat_map, predicate))
return self

@abstractmethod
def _flat_map(self, predicate: Callable[[K], Iterable[_V]]):
"""Implementation of flat_map. Should be implemented by subclasses."""

def group_by(self, key_mapper: Callable[[K], Any]) -> 'BaseStream[K]':
"""
Returns a Stream consisting of the results of grouping the elements of this stream
by the given classifier and extracting the key/value pairs.

:param key_mapper:
"""
self._queue.append(Process(self.__group_by, key_mapper))
return self

def __group_by(self, key_mapper: Callable[[Any], Any]):
"""Groups the stream by the given key mapper. Uses the implementation of _group_to_dict."""
groups = self._group_to_dict(key_mapper)
self._source = groups.items()

@abstractmethod
def _group_to_dict(self, key_mapper: Callable[[K], Any]) -> dict[K, list]:
"""Groups the stream into a dictionary. Should be implemented by subclasses."""

def limit(self, max_size: int) -> 'BaseStream[_V]':
"""
Expand All @@ -103,37 +132,57 @@ def __limit(self, max_size: int):
"""Limits the stream to the first n elements."""
self._source = itertools.islice(self._source, max_size)

@abstractmethod
def map(self, mapper: Callable[[K], _V]) -> 'BaseStream[_V]':
"""
Returns a stream consisting of the results of applying the given function to the elements
of this stream.

:param mapper:
"""
self._queue.append(Process(self._map, mapper))
return self

@abstractmethod
def _map(self, mapper: Callable[[K], _V]):
"""Implementation of map. Should be implemented by subclasses."""

def map_to_int(self) -> 'BaseStream[_V]':
"""
Returns a stream consisting of the results of converting the elements of this stream to
integers.
"""
self._queue.append(Process(self.__map_to_int))
return self

def __map_to_int(self):
"""Converts the stream to integers."""
self._map(int)

@abstractmethod
def map_to_str(self) -> 'BaseStream[_V]':
"""
Returns a stream consisting of the results of converting the elements of this stream to
strings.
"""
self._queue.append(Process(self.__map_to_str))
return self

def __map_to_str(self):
"""Converts the stream to strings."""
self._map(str)

@abstractmethod
def peek(self, action: Callable) -> 'BaseStream[_V]':
"""
Returns a stream consisting of the elements of this stream, additionally performing the
provided action on each element as elements are consumed from the resulting stream.

:param action:
"""
self._queue.append(Process(self._peek, action))
return self

@abstractmethod
def _peek(self, action: Callable):
"""Implementation of peek. Should be implemented by subclasses."""

def reversed(self) -> 'BaseStream[_V]':
"""
Expand Down Expand Up @@ -296,6 +345,15 @@ def to_set(self):
self._trigger_exec()
return set(self._source)

@abstractmethod
def to_dict(self, key_mapper: Callable[[K], Any]) -> dict:
"""
Returns a dictionary consisting of the results of grouping the elements of this stream
by the given classifier.

:param key_mapper:
"""

def _trigger_exec(self):
"""Triggers execution of the stream."""
self._queue.execute_all()
60 changes: 23 additions & 37 deletions pystreamapi/_streams/__parallel_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from optional import Optional

import pystreamapi._streams.__base_stream as stream
from pystreamapi._lazy.process import Process
from pystreamapi._parallel.fork_and_join import Parallelizer

_identity_missing = object()
Expand All @@ -23,11 +22,7 @@ def all_match(self, predicate: Callable[[Any], bool]):
return all(Parallel(n_jobs=-1, prefer="threads")(delayed(predicate)(element)
for element in self._source))

def filter(self, predicate: Callable[[Any], bool]):
self._queue.append(Process(self.__filter, predicate))
return self

def __filter(self, predicate: Callable[[Any], bool]):
def _filter(self, predicate: Callable[[Any], bool]):
self._set_parallelizer_src()
self._source = self.parallelizer.filter(predicate)

Expand All @@ -37,50 +32,37 @@ def find_any(self):
return Optional.of(self._source[0])
return Optional.empty()

def flat_map(self, predicate: Callable[[Any], stream.BaseStream]):
self._queue.append(Process(self.__flat_map, predicate))
return self

def __flat_map(self, predicate: Callable[[Any], stream.BaseStream]):
def _flat_map(self, predicate: Callable[[Any], stream.BaseStream]):
new_src = []
for element in Parallel(n_jobs=-1, prefer="threads")(delayed(predicate)(element)
for element in self._source):
new_src.extend(element.to_list())
self._source = new_src

def _group_to_dict(self, key_mapper: Callable[[Any], Any]):
groups = {}

def process_element(element):
key = key_mapper(element)
if key not in groups:
groups[key] = []
groups[key].append(element)

Parallel(n_jobs=-1, prefer="threads")(delayed(process_element)(element)
for element in self._source)
return groups

def for_each(self, predicate: Callable):
self._trigger_exec()
Parallel(n_jobs=-1, prefer="threads")(delayed(predicate)(element)
for element in self._source)

def map(self, mapper: Callable[[Any], Any]):
self._queue.append(Process(self.__map, mapper))
return self

def __map(self, predicate: Callable[[Any], Any]):
self._source = Parallel(n_jobs=-1, prefer="threads")(delayed(predicate)(element)
def _map(self, mapper: Callable[[Any], Any]):
self._source = Parallel(n_jobs=-1, prefer="threads")(delayed(mapper)(element)
for element in self._source)

def map_to_int(self):
self._queue.append(Process(self.__map_to_int))
return self

def __map_to_int(self):
self.__map(int)

def map_to_str(self):
self._queue.append(Process(self.__map_to_str))
return self

def __map_to_str(self):
self.__map(str)

def peek(self, action: Callable):
self._queue.append(Process(self.__peek, action))
return self

def __peek(self, predicate: Callable):
Parallel(n_jobs=-1, prefer="threads")(delayed(predicate)(element)
def _peek(self, action: Callable):
Parallel(n_jobs=-1, prefer="threads")(delayed(action)(element)
for element in self._source)

def reduce(self, predicate: Callable[[Any, Any], Any], identity=_identity_missing,
Expand All @@ -97,5 +79,9 @@ def reduce(self, predicate: Callable[[Any, Any], Any], identity=_identity_missin
def __reduce(self, pred, _):
return self.parallelizer.reduce(pred)

def to_dict(self, key_mapper: Callable[[Any], Any]) -> dict:
self._trigger_exec()
return dict(self._group_to_dict(key_mapper))

def _set_parallelizer_src(self):
self.parallelizer.set_source(self._source)
56 changes: 19 additions & 37 deletions pystreamapi/_streams/__sequential_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from optional import Optional

import pystreamapi._streams.__base_stream as stream
from pystreamapi._lazy.process import Process

_identity_missing = object()

Expand All @@ -16,11 +15,7 @@ def all_match(self, predicate: Callable[[Any], bool]):
self._trigger_exec()
return all(predicate(element) for element in self._source)

def filter(self, predicate: Callable[[Any], bool]):
self._queue.append(Process(self.__filter, predicate))
return self

def __filter(self, predicate: Callable[[Any], bool]):
def _filter(self, predicate: Callable[[Any], bool]):
self._source = [element for element in self._source if predicate(element)]

def find_any(self):
Expand All @@ -29,49 +24,32 @@ def find_any(self):
return Optional.of(self._source[0])
return Optional.empty()

def flat_map(self, predicate: Callable[[Any], stream.BaseStream]):
self._queue.append(Process(self.__flat_map, predicate))
return self

def __flat_map(self, predicate: Callable[[Any], stream.BaseStream]):
def _flat_map(self, predicate: Callable[[Any], stream.BaseStream]):
new_src = []
for element in [predicate(element) for element in self._source]:
new_src.extend(element.to_list())
self._source = new_src

def _group_to_dict(self, key_mapper: Callable[[Any], Any]):
groups = {}
for element in self._source:
key = key_mapper(element)
if key not in groups:
groups[key] = []
groups[key].append(element)
return groups

def for_each(self, predicate: Callable):
self._trigger_exec()
for element in self._source:
predicate(element)

def map(self, mapper: Callable[[Any], Any]):
self._queue.append(Process(self.__map, mapper))
return self

def __map(self, predicate: Callable[[Any], Any]):
self._source = [predicate(element) for element in self._source]

def map_to_int(self):
self._queue.append(Process(self.__map_to_int))
return self

def __map_to_int(self):
self.__map(int)
def _map(self, mapper: Callable[[Any], Any]):
self._source = [mapper(element) for element in self._source]

def map_to_str(self):
self._queue.append(Process(self.__map_to_str))
return self

def __map_to_str(self):
self.__map(str)

def peek(self, action: Callable):
self._queue.append(Process(self.__peek, action))
return self

def __peek(self, predicate: Callable):
def _peek(self, action: Callable):
for element in self._source:
predicate(element)
action(element)

def reduce(self, predicate: Callable, identity=_identity_missing, depends_on_state=False):
self._trigger_exec()
Expand All @@ -80,3 +58,7 @@ def reduce(self, predicate: Callable, identity=_identity_missing, depends_on_sta
return reduce(predicate, self._source)
return Optional.of(reduce(predicate, self._source))
return identity if identity is not _identity_missing else Optional.empty()

def to_dict(self, key_mapper: Callable[[Any], Any]) -> dict:
self._trigger_exec()
return self._group_to_dict(key_mapper)
2 changes: 1 addition & 1 deletion tests/date_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from datetime import datetime, timedelta, timezone
from unittest import TestCase


# skipcq: PTC-W0046
class DateTest(TestCase):

def setUp(self):
Expand Down
34 changes: 33 additions & 1 deletion tests/test_stream_implementation.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
from optional import Optional
from optional.something import Something
from parameterized import parameterized_class

from pystreamapi._streams.__base_stream import BaseStream
from pystreamapi._streams.__parallel_stream import ParallelStream
from pystreamapi._streams.__sequential_stream import SequentialStream
from pystreamapi._streams.__base_stream import BaseStream
from pystreamapi._streams.numeric.__sequential_numeric_stream import SequentialNumericStream


Expand Down Expand Up @@ -126,6 +127,37 @@ def test_reduce_empty_stream_with_identity(self):
result = self.stream([]).reduce(lambda x, y: x + y, identity=0)
self.assertEqual(result, 0)

def test_group_by(self):
class Point:
def __init__(self, x, y):
self.x = x
self.y = y

pt1, pt2, pt3, pt4 = Point(1, 2), Point(1, 3), Point(2, 3), Point(2, 4)
result = self.stream([pt1, pt2, pt3, pt4]) \
.group_by(lambda p: p.x) \
.to_list()
self.assertListEqual(result, [(1, [pt1, pt2]), (2, [pt3, pt4])])

def test_group_by_empty(self):
result = self.stream([]).group_by(lambda x: x).to_list()
self.assertListEqual(result, [])

def test_to_dict(self):
class Point:
def __init__(self, x, y):
self.x = x
self.y = y

pt1, pt2, pt3, pt4 = Point(1, 2), Point(1, 3), Point(2, 3), Point(2, 4)
result = self.stream([pt1, pt2, pt3, pt4]) \
.to_dict(lambda p: p.x)
self.assertDictEqual(result, {1: [pt1, pt2], 2: [pt3, pt4]})

def test_to_dict_empty(self):
result = self.stream([]).to_dict(lambda x: x)
self.assertDictEqual(result, {})


if __name__ == '__main__':
unittest.main()