Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for redis sentinel cluster #1979

Closed
wkedz opened this issue Apr 25, 2023 · 2 comments
Closed

Add support for redis sentinel cluster #1979

wkedz opened this issue Apr 25, 2023 · 2 comments
Labels

Comments

@wkedz
Copy link

wkedz commented Apr 25, 2023

Hi!

I would like to use SocketIO with a redis sentinel cluster. Unfortunately, right now this is not possible, because I cannot pass additional arguments to queue managers - https://github.com/miguelgrinberg/Flask-SocketIO/blob/main/src/flask_socketio/__init__.py#L200

Is your feature request related to a problem? Please describe.

Yes, my problem is related to one issue that is now closed. #359 I think that it shouldn't.

Basically, all implementation is already provided in python-socketio miguelgrinberg/python-socketio@7dbc470

Describe the solution you'd like

I need to SocketIO to pass additional arguments to managers. For example something like this

import inspect

class FakeSocketIO(object):

  def __init__(self, url, **kwargs):

    self.queue = None

    if url.startswith(('redis://', "rediss://")):
      queue_class = FakeRedisManager
    else:
      queue_class = FakeKombuManager

    init_arguments = self.filter_init_args(queue_class, kwargs)
    self.queue = queue_class(**init_arguments)

  def run(self):
    self.queue.run()

  def filter_init_args(self, queue_class, kwargs):
    #Get __init__ names 
    init_var_names = inspect.signature(queue_class.__init__).parameters
    init_arguments = {}
    #only get args for target queue manager except self.
    for name in init_var_names:
      if name != "self":
        init_arguments[name] = kwargs.get(name, None)
    return  init_arguments

class FakeKombuManager(object):
  
  def __init__(self, connection_options=None, exchange_options=None,
                 queue_options=None, producer_options=None):
    self.connection_options = connection_options or {}
    self.exchange_options = exchange_options or {}
    self.queue_options = queue_options or {}
    self.producer_options = producer_options or {}

  def run(self):
    print("self.connection_options: ", self.connection_options)
    print("self.exchange_options: ", self.exchange_options)
    print("self.queue_options: ", self.queue_options)
    print("self.producer_options: ", self.producer_options)

class FakeRedisManager(object):
  
  def __init__(self, redis_options=None):
    self.redis_options = redis_options or {}

  def run(self):
    print("self.redis_options: ", self.redis_options)

if __name__ == "__main__":

  #Redis test 
  print("RedisManager with redis options and wrong argument")
  sioR = FakeSocketIO("redis://", arg_not_for_RedisManager=11, redis_options={"arg1":"arg1", "arg2":"arg2"})
  sioR.run()
  
  print("RedisManager without redis options")
  sioR = FakeSocketIO("redis://")
  sioR.run()

  print("KombuManager with connection_options and wrong argument")
  sioK = FakeSocketIO("sentinel://", arg_not_for_KombuManager=11, connection_options={"master_name":"master", "arg2":"arg2"})
  sioK.run()

  print("KombuManager with only wrong argument")
  sioK = FakeSocketIO("sentinel://", arg_not_for_KombuManager=12)
  sioK.run()

Describe alternatives you've considered

I don't think that there are any alternatives.

Logs

  File "/usr/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.10/threading.py", line 953, in run
    self._target(*self._args, **self._kwargs)
  File "/flask-socket-io-redis-sentinel/lib/python3.10/site-packages/socketio/pubsub_manager.py", line 152, in _thread
    for message in self._listen():
  File "/flask-socket-io-redis-sentinel/lib/python3.10/site-packages/socketio/kombu_manager.py", line 125, in _listen
    with connection.SimpleQueue(reader_queue) as queue:
  File "/flask-socket-io-redis-sentinel/lib/python3.10/site-packages/kombu/connection.py", line 787, in SimpleQueue
    return SimpleQueue(channel or self, name, no_ack, queue_opts,
  File "/flask-socket-io-redis-sentinel/lib/python3.10/site-packages/kombu/simple.py", line 135, in __init__
    consumer = messaging.Consumer(channel, queue, accept=accept)
  File "/flask-socket-io-redis-sentinel/lib/python3.10/site-packages/kombu/messaging.py", line 387, in __init__
    self.revive(self.channel)
  File "/flask-socket-io-redis-sentinel/lib/python3.10/site-packages/kombu/messaging.py", line 400, in revive
    channel = self.channel = maybe_channel(channel)
  File "/flask-socket-io-redis-sentinel/lib/python3.10/site-packages/kombu/connection.py", line 1052, in maybe_channel
    return channel.default_channel
  File "/flask-socket-io-redis-sentinel/lib/python3.10/site-packages/kombu/connection.py", line 895, in default_channel
    self._ensure_connection(**conn_opts)
  File "/flask-socket-io-redis-sentinel/lib/python3.10/site-packages/kombu/connection.py", line 433, in _ensure_connection
    return retry_over_time(
  File "/flask-socket-io-redis-sentinel/lib/python3.10/site-packages/kombu/utils/functional.py", line 312, in retry_over_time
    return fun(*args, **kwargs)
  File "/flask-socket-io-redis-sentinel/lib/python3.10/site-packages/kombu/connection.py", line 877, in _connection_factory
    self._connection = self._establish_connection()
  File "/flask-socket-io-redis-sentinel/lib/python3.10/site-packages/kombu/connection.py", line 812, in _establish_connection
    conn = self.transport.establish_connection()
  File "/flask-socket-io-redis-sentinel/lib/python3.10/site-packages/kombu/transport/virtual/base.py", line 949, in establish_connection
    self._avail_channels.append(self.create_channel(self))
  File "/flask-socket-io-redis-sentinel/lib/python3.10/site-packages/kombu/transport/virtual/base.py", line 927, in create_channel
    channel = self.Channel(connection)
  File "/flask-socket-io-redis-sentinel/lib/python3.10/site-packages/kombu/transport/redis.py", line 737, in __init__
    self.client.ping()
  File "/flask-socket-io-redis-sentinel/lib/python3.10/site-packages/kombu/utils/objects.py", line 30, in __get__
    return super().__get__(instance, owner)
  File "/usr/lib/python3.10/functools.py", line 981, in __get__
    val = self.func(instance)
  File "/flask-socket-io-redis-sentinel/lib/python3.10/site-packages/kombu/transport/redis.py", line 1239, in client
    return self._create_client(asynchronous=True)
  File "/flask-socket-io-redis-sentinel/lib/python3.10/site-packages/kombu/transport/redis.py", line 1195, in _create_client
    return self.Client(connection_pool=self.async_pool)
  File "/flask-socket-io-redis-sentinel/lib/python3.10/site-packages/kombu/transport/redis.py", line 1233, in async_pool
    self._async_pool = self._get_pool(asynchronous=True)
  File "/flask-socket-io-redis-sentinel/lib/python3.10/site-packages/kombu/transport/redis.py", line 1418, in _get_pool
    return self._sentinel_managed_pool(asynchronous)
  File "/flask-socket-io-redis-sentinel/lib/python3.10/site-packages/kombu/transport/redis.py", line 1408, in _sentinel_managed_pool
    raise ValueError(
ValueError: 'master_name' transport option must be specified.

@miguelgrinberg
Copy link
Owner

miguelgrinberg commented Apr 25, 2023

This is how you do it:

mgr = ManagerClass("your", "arguments", "here")
socketio = SocketIO(client_manager=mgr)

All you need to do is pick a manager class that does what you need. I assume the RedisManager or KombuManager classes are what makes more sense to adapt it to work with a cluster, but I'm not sure if you can make this work just by adding extra arguments. Worst case you need to create a custom class that inherits from these and makes the necessary changes to work with a cluster.

@wkedz
Copy link
Author

wkedz commented Apr 26, 2023

Hi @miguelgrinberg

I pass the manager through client_manager and it works as it should.

Code:

app.logger.info(f"Connecting SocketIO to redis at {REDIS_HOST}")
url = REDIS_HOST # self.server_options.get('message_queue', None)
channel = "flask-socketio" # self.server_options.pop('channel', 'flask-socketio')
connection_options={"transport_options" : {"master_name":"mymaster"}}
write_only = False # app is None
client_manager = socketio.KombuManager(url=url, channel=channel, write_only=write_only, connection_options=connection_options)
sio = SocketIO(app, client_manager=client_manager, logger=True, engineio_logger=True)

Logs:

[2023-04-26 11:01:25,031] INFO in server: Connecting SocketIO to redis at sentinel://0.0.0.0:26379
Server initialized for eventlet.
h2j59VBXCfZbUIPRAAAA: Sending packet OPEN data {'sid': 'h2j59VBXCfZbUIPRAAAA', 'upgrades': ['websocket'], 'pingTimeout': 20000, 'pingInterval': 25000}
kombu backend initialized.
h2j59VBXCfZbUIPRAAAA: Received packet MESSAGE data 0{}
h2j59VBXCfZbUIPRAAAA: Sending packet MESSAGE data 0{"sid":"cI-uIDemeBQ6rEnlAAAB"}
pubsub message: emit
h2j59VBXCfZbUIPRAAAA: Sending packet MESSAGE data 2["my event",{"data":1}]
pubsub message: emit
h2j59VBXCfZbUIPRAAAA: Sending packet MESSAGE data 2["my event",{"data":2}]

Thanks for your help!

Closing, because there is no need to pass additional args to SocketIO.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants