Skip to content

A Python multithreading library for data processing pipelines, data streaming, etc.

License

Notifications You must be signed in to change notification settings

nyoungstudios/multiflow

Repository files navigation

multiflow

tests codecov Gitpod ready PyPI version shields.io PyPI license

About

multiflow is a Python multithreading library for data processing pipelines/workflows, streaming, etc. It extends concurrent.futures by allowing the input and output to be generator objects. And, it makes it easy to string together multiple thread pools together to create a multithreaded pipeline.

Additionally, multiflow comes with periodic logging, automatic retries, error handling, and argument expansion.

Why?

The ability to accept an input generator object while yielding an output generator object makes it ideal for concurrently doing multiple jobs where the output of the first job is the input of the second job. This means that it can start doing work on the second job before the first job completes; thus, completing the total work faster.

A great use case for this is streaming data. For example, with multiflow and smart_open, you could stream images from S3 and process them in a multithreaded environment before exporting them elsewhere.

Install

pip install multiflow

Quickstart

from multiflow import MultithreadedFlow


image_paths = []  # list of images


def transform(image_path):
    # do some work
    return new_path


with MultithreadedFlow() as flow:
    flow.consume(image_paths)  # can accept generator object or iterable item (see examples below for generator)
    flow.add_function(transform)

    for output in flow:
        if output:  # if successful
            print(output)  # new_path
        else:
            e = output.get_exception()

    success = flow.get_successful_job_count()
    failed = flow.get_failed_job_count()

Examples

For a working program using multiflow, see this example which resizes a S3 bucket of images to 50% and saves the resized images locally.

Documentation

The documentation is still a work in progress, but for the most up to date documentation, please see this page.