Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flask-SocketIO does not handle Redis Sentinel failover with KombuManager #1262

Closed
saad-ashfaq opened this issue Oct 25, 2023 · 2 comments
Closed
Assignees
Labels

Comments

@saad-ashfaq
Copy link

I'm using multiple Flask-SocketIO workers with a Redis Sentinel message queue configured through KombuManger as client_manager (successfully set up following instructions from miguelgrinberg/Flask-SocketIO#1979).

There is an issue with the current _listen() implementation shown below from kombu_manager.py where a Sentinel failover is not properly detected when a Redis master instance goes down and a replica instance is promoted to new master.

    def _listen(self):
        reader_queue = self._queue()
        retry_sleep = 1
        while True:
            try:
                with self._connection() as connection:
                    with connection.SimpleQueue(reader_queue) as queue:
                        while True:
                            message = queue.get(block=True)
                            message.ack()
                            yield message.payload
                            retry_sleep = 1
            except (OSError, kombu.exceptions.KombuError):
                self._get_logger().error(
                    'Cannot receive from rabbitmq... '
                    'retrying in {} secs'.format(retry_sleep))
                time.sleep(retry_sleep)
                retry_sleep = min(retry_sleep * 2, 60)

The redis package raises the exception redis.exceptions.ConnectionError: Connection closed by server in such a scenario as shown in the exception stack trace originating at message = queue.get(block=True). Since the except block is only handling OSError and kombu.exceptions.KombuError exceptions, this error is not detected resulting in loop termination.

In my local environment, I was able to resolve the issue by simply adding redis.exceptions.ConnectionError to the except block but that might not be ideal as it also requires importing the redis-py package in kombu_manager.py. With this change, a Redis Sentinel setup works as a message queue for Flask-SocketIO and handles failover as expected since it detects the new master when connecting to the sentinel url again after the exception is caught. I tested this with a url of the form sentinel://:*@sentinel1:port/db;sentinel://:*@sentinel2:port/db so it seems to work well with a configuration consisting of multiple Sentinel nodes and multiple Redis instances in a master-replica setup.

This change should be good enough for my use case but just wanted to share my findings here so we might have an official fix to python-socketio for Redis Sentinel failover support in KombuManager.

@miguelgrinberg
Copy link
Owner

Kombu should not allow exceptions from the selected message queue to bubble up into the calling application, in my opinion. They should have wrapped this exception into one of theirs.

What I think would be the best is to add a catch-all except block here, so that any unexpected exceptions such as this one are handled, without having to add code that is specific to a particular queue.

@miguelgrinberg miguelgrinberg self-assigned this Oct 25, 2023
@miguelgrinberg miguelgrinberg transferred this issue from miguelgrinberg/Flask-SocketIO Oct 25, 2023
@saad-ashfaq
Copy link
Author

Yes, that would be a cleaner solution avoiding any queue specific logic. It might be beneficial to add the same exception handling to the _publish() method as well for consistency and for handling similar uncaught errors.

Thank you for your great work!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants