Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ Now you might be wondering why another library when there are already a few impl
* It boasts high speed and efficiency.
* The implementation achieves 100% test coverage.
* It follows Pythonic principles, resulting in clean and readable code.
* It adds some cool innovative features like conditions and an even more declarative look
* It adds some cool innovative features such as conditions or error handling and an even more declarative look.

Let's take a look at a small example:

Expand Down Expand Up @@ -102,6 +102,31 @@ Considering the above characteristics, a stream can be defined as follows:

Conditions provide a convenient means for performing logical operations within your Stream, such as using `filter()`, `take_while()`, `drop_while()`, and more. With PyStreamAPI, you have access to a staggering 111 diverse conditions that enable you to process various data types including strings, types, numbers, and dates. Additionally, PyStreamAPI offers a powerful combiner that allows you to effortlessly combine multiple conditions, facilitating the implementation of highly intricate pipelines.

## Error handling: Work with data that you don't know
PyStreamAPI offers a powerful error handling mechanism that allows you to handle errors in a declarative manner. This is especially useful when working with data that you don't know.

PyStreamAPI offers three different error levels:
- `ErrorLevel.RAISE`: This is the default error level. It will raise an exception if an error occurs.
- `ErrorLevel.IGNORE`: This error level will ignore any errors that occur and won't inform you.
- `ErrorLevel.WARN`: This error level will warn you about any errors that occur and logs them as a warning with default logger.


This is how you can use them:

```python
from pystreamapi import Stream, ErrorLevel

Stream.of([" ", '3', None, "2", 1, ""])
.error_level(ErrorLevel.IGNORE)
.map_to_int()
.sorted()
.for_each(print) # Output: 1 2 3
```

The code above will ignore all errors that occur during mapping to int and will just skip the elements.

For more details on how to use error handling, please refer to the documentation.

## Get started: Installation

To start using PyStreamAPI just install the module with this command:
Expand Down
5 changes: 3 additions & 2 deletions pystreamapi/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from pystreamapi.__stream import Stream
from pystreamapi._streams.error.__levels import ErrorLevel

__version__ = "0.2"
__all__ = ["Stream"]
__version__ = "0.3"
__all__ = ["Stream", "ErrorLevel"]
Empty file.
53 changes: 53 additions & 0 deletions pystreamapi/_itertools/tools.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# pylint: disable=protected-access
from pystreamapi._streams.error.__error import ErrorHandler, _sentinel


def dropwhile(predicate, iterable, handler: ErrorHandler=None):
"""
Drop items from the iterable while predicate(item) is true.
Afterward, return every element until the iterable is exhausted.
"""
it = iter(iterable)
for x in it:
if handler is not None:
res = handler._one(mapper=predicate, item=x)
else:
res = predicate(x)
if not res and res is not _sentinel:
yield x
break
yield from it


_initial_missing = object()


def reduce(function, sequence, initial=_initial_missing, handler: ErrorHandler=None):
"""
Apply a function of two arguments cumulatively to the items of a sequence
or iterable, from left to right, to reduce the iterable to a single
value. For example, reduce(lambda x, y: x+y, [1, 2, 3, 4, 5]) calculates
((((1+2)+3)+4)+5). If initial is present, it is placed before the items
of the iterable in the calculation, and serves as a default when the
iterable is empty.
"""
it = iter(sequence)

if initial is _initial_missing:
try:
value = next(it)
except StopIteration:
raise TypeError(
"reduce() of empty iterable with no initial value") from None
else:
value = initial

for element in it:
if handler is not None:
new_value = handler._one(mapper=lambda x, val=value: function(val, x), item=element)
if new_value is not _sentinel:
value = new_value
else:
value = function(value, element)

return value
50 changes: 33 additions & 17 deletions pystreamapi/_parallel/fork_and_join.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
# pylint: disable=protected-access
import os

from functools import reduce
from typing import Callable, Any
from typing import Callable, Any, Optional

from joblib import Parallel, delayed
from pystreamapi._parallel.parallelizer import Parallel, delayed
from pystreamapi._streams.error.__error import ErrorHandler
from pystreamapi._streams.error.__levels import ErrorLevel
from pystreamapi._itertools.tools import reduce


class Parallelizer:
Expand All @@ -21,31 +24,44 @@ class Parallelizer:

def __init__(self):
self.__src = None
self.__handler: Optional[ErrorHandler] = None

def set_source(self, src: list):
"""Set the source list
def set_source(self, src: list, handler: ErrorHandler=None):
"""
Set the source list
:param handler: The error handler to be used
:param src: The source list
"""
self.__src = src
self.__handler = handler

def filter(self, function):
"""Parallel filter function"""
parts = self.fork()
result = self.__run_job_in_parallel(parts, self.__filter, function)
if self.__handler is not None and self.__handler._get_error_level() != ErrorLevel.RAISE:
result = self.__run_job_in_parallel(parts, self._filter_ignore_errors, function)
else:
result = self.__run_job_in_parallel(parts, self.__filter, function)
return [item for sublist in result for item in sublist]

@staticmethod
def __filter(function, src):
"""Filter function used in the fork-and-join technology"""
return [element for element in src if function(element)]

def _filter_ignore_errors(self, function, src):
"""Filter function used in the fork-and-join technology using an error handler"""
return [self.__handler._one(condition=function, item=element) for element in src]

def reduce(self, function: Callable[[Any, Any], Any]):
"""Parallel reduce function using functools.reduce behind"""
if len(self.__src) < 2:
return self.__src
parts = self.fork(min_nr_items=2)
result = self.__run_job_in_parallel(parts, reduce, function)
return reduce(function, result)

@staticmethod
def __filter(function, src):
"""Filter function used in the fork-and-join technology"""
return [element for element in src if function(element)]
result = self.__run_job_in_parallel(
parts, lambda x, y: reduce(function=x, sequence=y, handler=self.__handler), function
)
return reduce(function, result, handler=self.__handler)

def fork(self, min_nr_items=1):
"""
Expand Down Expand Up @@ -77,8 +93,8 @@ def __calculate_number_of_parts(self, min_nr_items=1):
return round(len(self.__src) / min_nr_items)
return os.cpu_count() - 2 if os.cpu_count() > 2 else os.cpu_count()

@staticmethod
def __run_job_in_parallel(src, operation, op_function):
def __run_job_in_parallel(self, src, operation, op_function):
"""Run the operation in parallel"""
return Parallel(n_jobs=-1, prefer="processes")(delayed(operation)(op_function, part)
for part in src)
return Parallel(n_jobs=-1, prefer="processes", handler=self.__handler)(
delayed(operation)(op_function, part) for part in src
)
20 changes: 20 additions & 0 deletions pystreamapi/_parallel/parallelizer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from joblib import Parallel as _JoblibParallel, delayed # pylint: disable=unused-import

from pystreamapi._streams.error.__error import ErrorHandler
from pystreamapi._streams.error.__levels import ErrorLevel


class Parallel:
"""Wrapper for joblib.Parallel supporting error handling"""

def __init__(self, n_jobs=-1, prefer="processes", handler: ErrorHandler=None):
self.n_jobs = n_jobs
self.prefer = prefer
self.handler = handler

def __call__(self, iterable):
"""Call joblib.Parallel with error handling"""
res = _JoblibParallel(n_jobs=self.n_jobs, prefer=self.prefer)(iterable)
if self.handler and self.handler._get_error_level() != ErrorLevel.RAISE:
return ErrorHandler._remove_sentinel(res)
return res
Loading