Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature request: Generators in processes #26

Open
andresriancho opened this issue Apr 13, 2018 · 4 comments
Open

Feature request: Generators in processes #26

andresriancho opened this issue Apr 13, 2018 · 4 comments

Comments

@andresriancho
Copy link

andresriancho commented Apr 13, 2018

User story

As a developer I would like to run a generator in another process, and get the results one by one to be able to process them in the main thread.

The function I'm running generates results which uses a lot of RAM (a list with many objects) and each object of the list can be processed individually. If I wouldn't be using a different process to run the function that generates these results, I would certainly use a generator function.

At the moment the functions being run in the sub-processes can only return one result: the list with all the objects. In an ideal scenario, if this gets implemented, future.result() would return a generator which I could iterate over. Each time a result is produced by the sub-process function, it is pickled, sent to the pipe, unpickled and yield in the generator.

Notes

  • I know I could do this manually somehow: use files to store the objects, use memcached, use an external queue system, etc.
  • If it is already implemented, please let me know where 👍
  • If there is an easier way to do this, please let me know how 👍
@noxdafox
Copy link
Owner

Are you aiming to reach any parallelism in here or is just for isolation purposes? In other words: is this a sequential operation ran in another process or do you need to split it into parallel computations?

@andresriancho
Copy link
Author

andresriancho commented Apr 16, 2018 via email

@noxdafox
Copy link
Owner

noxdafox commented Apr 20, 2018

I see what you mean and I think it's a pretty interesting feature. I am currently deep into another thing so I will start working on this a bit later (beginning of June?). Ofc pull requests are welcome.

In the meantime, you can use this code snippet as a reference for implementing such functionality in your application.

from pebble import concurrent
from multiprocessing import Queue

@concurrent.process(timeout=60)
def process_url(queue, url):
    """Processes a URL splitting the report in chunks and putting it in the given Queue."""
    for chunk in process(url):
        queue.put(chunk)

def process(url):
    """A function seemingly producing large data split in chunks."""
    while True:
        yield None
       
queue = Queue()
future = process_url(queue, "www.example.com")

while not future.done():
    chunk = queue.get()

if future.exception() is not None:
    print("Error while processing the URL")

@andresriancho
Copy link
Author

Ah! I like the trick of using a queue!

The example code looks good and is a good inspiration for writing the PR. Depending on my client's needs, I might be writing the PR before June.

What about the interface? Do you think we should have two new methods in ProcessPool?

  • schedule_generator()
  • map_generator()

Both of them returning a tuple with future and generator? The generator would be a wrapper around the queue that people could use like this:

future, generator = pool.schedule_generator(function)

for result in generator:
    print(result)

# Do some future error handling here

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants