-
Notifications
You must be signed in to change notification settings - Fork 2.6k
Description
I use redis for distributed queues for workers started with the multiprocessing module. While it works flawlessly without load or only a single worker, you get weird behavior once multiple workers do stuff concurrently. My assumption is, that multiprocessing forks and the socket-connections are shared across multiple processes afterwards. If you use a ConnectionPool to avoid having to create a lots of connections, you end up with multiple concurrent clients using the same connection, which in turn confuses the protocol-parser (obviously...).
The solution is quite simple: check of os.getpid() changed, and if it did, disconnect all connections in the pool. The parents process gets to keep the connection while the child will make new ones.
This little piece of code solves my issue:
import redis
import os
class ForkSafeConnectionPool(redis.ConnectionPool):
def __init__(self, connection_class=redis.Connection, max_connections=None,
**connection_kwargs):
self._pid = os.getpid()
return super(ForkSafeConnectionPool, self).__init__(connection_class,
max_connections, **connection_kwargs)
def get_connection(self, command_name, *keys, **options):
if self._pid != os.getpid():
self._pid = os.getpid()
# the connections belong to the parent process
self.disconnect()
return super(ForkSafeConnectionPool, self).get_connection(
command_name, *keys, **options)
I believe my problem is somewhat generic and the solution pretty simple, so I propose to include this.