Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiprocessing for stream processing #296

Closed
learnermaxRL opened this issue Feb 15, 2019 · 3 comments
Closed

Multiprocessing for stream processing #296

learnermaxRL opened this issue Feb 15, 2019 · 3 comments

Comments

@learnermaxRL
Copy link

Any ideas how to use the stream data colected through batch to be utilised with Multiprocessing queue fpr further processing since its a blocking call?

@ask
Copy link
Contributor

ask commented Feb 28, 2019

Hmm, in Celery we have an async version of the multiprocessing pool. One of the things that I want to do is to port that to asyncio.

multiprocessing.Pool combines threads and fork, this can lead to deadlocks and other issues.
It also uses a POSIX semaphore to protect reads/writes from a shared pipe, and if a child process
is abruptly terminated while the lock is held the pool will deadlock.

I imagine you can already use multiprocessing.Pool with faust, as you can handle the blocking
call in a thread. You could also start a local cluster of Faust workers :) granted this will add some overhead as messages will go through Kafka.

@learnermaxRL
Copy link
Author

I was trying to push the data to multiprocessing queue to be processed by class inheriting mp.Process,it works with simple Queue,but with mp.Queue there is deadlock possibly because of the reason you mentioned.
I think you can add this functionality to faust where in kafka stream data can be dumped to mp.queue which can be utilized by other processes while non-blocking the asyncio loop.

@tarasmatsyk
Copy link

@learnermaxRL, I ended up using ThreadPool as message processing takes longer than required to speed up things.

CONCURRENCY = cpu_count()
thread_pool = ThreadPoolExecutor(max_workers=CONCURRENCY)

# then you do
@app.agent(new_topic, concurrency=CONCURRENCY)
async def new_message(stream):
    async for msg in stream:
            await app.loop.run_in_executor(thread_pool, method, args)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants