blllib
is a brief library module that defines a lazy-evaluated, multiprocessing pipeline Pipeline
.
A Pipeline
is defined as a sequence of callable objects.
For example,
from blllib import Pipeline
operations = ... # a sequence of callables
inputs = ... # an iterable of inputs
with Pipeline(operations) as pipeline:
for output in pipeline.apply(inputs):
print(output)
The pipeline.apply
can be called only once.
blllib
also provides a sequential version of Pipeline
, called SequentialPipeline
.
Unlike Pipeline
(for now), it can be applied many times:
from blllib import SequentialPipeline
operations = ... # a sequence of callables
inputs = ... # an iterable of inputs
inputs2 = ... # another iterable of inputs
with SequentialPipeline(operations) as pipeline:
for output in pipeline.apply(inputs):
print(output)
for output in pipeline.apply(inputs2):
print(output)
Install via pip
:
pip install blllib
Python-3.5
or above
For each callable object, it may be
- stateful: must be run sequentially
- conditionally stateless: can be run in parallel if the inputs have been organized in batch
- stateless: can be run in parallel
For example, an accumulator, as defined below, is stateful.
The second-order difference operator, as defined below, is conditionally stateless providing a batch of three (comparing Difference2_stateful
and Difference2_stateless
).
The function that negates its input (i.e. converting 1
to -1
) is stateless.
class Accumulator(object):
def __init__(self):
self.acc = 0
def __call__(self, x):
self.acc += x
return self.acc
class Difference2_stateful(object):
def __init__(self):
self.cache = collections.deque(maxlen=3)
def __call__(self, x):
self.cache.append(x)
if len(self.cache) == 3:
return self.cache[-1] - self.cahce[0]
class Difference2_stateless(object):
"""Expecting batched inputs"""
def __call__(self, batch):
x, _, z = batch
return z - x
To run in parallel, a callable object must be pickleable. A notable example of a callable that's not pickleable is shown below:
def g():
yield from range(10)
class Function(object):
def __init__(self):
self.nums = g()
def __call__(self, *args, **kwargs):
pass
import pickle
pickle.dumps(Function())
# Traceback (most recent call last):
# File "<stdin>", line 1, in <module>
# TypeError: can't pickle generator objects
To assist scheduling of the processes, Pipeline
expects optionally one global metric when instantiation and two callable-specific metrics when parsing each callable object.
The global metric:
n_cpu
: The number of cores in total to be allocated to all non-stateful callables. Default tomax(1, N-K)
whereN
is the total number of CPU cores available as returned bymultiprocessing.cpu_count()
andK
the number of non-stateful callables.
The callable-specific metrics:
stateful
: IfTrue
, the callable is stateful, and should be of typeCallable[[S], T]
, mappingIterable[S]
toIterable[T]
. IfFalse
, the callable is stateless, and should be of typeCallable[[S], T]
, mappingIterable[S]
toIterable[T]
. If a positive integer, the callable is conditionally stateless, and should be of typeCallable[[Sequence[S]], T]
, mapping batchedIterable[S]
toIterable[T]
. Otherwise, error will be raised atPipeline
instantiation. This metric is default toTrue
if not found.batch_size
: The number of inputs or batches of inputs fed at once. For each stateful callable, at mostbatch_size
number of inputs are fed to the callable (in a separate process). Later inputs have to wait for the production of the output induced by the earliest input. For each non-stateful callable, at mostbatch_size
number of inputs are fed to the pool. Likewise, later inputs have to wait for the production of the output induced by the earliest input. Whenbatch_size = 1
, the underlying process/pool will essentially run jobs sequentially. This metric is default to1
if not found.run_in_master
: whever specified (whatever its value), it makes the callabe object run sequentially in the master process, in which casebatch_size
is ignored. This metric is by default not specified.
The callable-specific metrics can be specified as either the instance variable or the class variable. For example:
# specify as instance variables
def add(args):
x, y = args
return x + y
add.stateful = 2
add.batch_size = 10
# specify as instance variables
class Add(object):
def __init__(self):
self.stateful = 2
self.batch_size = 10
def __call__(self, args):
x, y = args
return x + y
# specify as class variables
class Add2(object):
stateful = 2
batch_size = 10
def __call__(self, args):
x, y = args
return x + y
When batch_size
is 1
, as said earlier, each worker process runs sequentially, under which circumstance non-stateful callable downgrades to stateful callables.
For non-stateful callables, once batch_size
is larger than the number of workers in the pool, it does no good but consumes more memory.
For stateful callables, when memory is sufficient, the larger batch_size
is, the more efficient the pipeline becomes.