Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AsyncioConnection with a custom_ioloop raises exception #1296

Open
arbel03 opened this issue Jan 9, 2021 · 4 comments
Open

AsyncioConnection with a custom_ioloop raises exception #1296

arbel03 opened this issue Jan 9, 2021 · 4 comments
Assignees
Milestone

Comments

@arbel03
Copy link

arbel03 commented Jan 9, 2021

Hi, I am trying to create a Pika wrapper using AsyncioConnection, I want it to be async and reconnect in the background. If no connection is up, don't produce or consume events.
however, when providing my own asyncio.new_event_loop() to AsyncioConnection(custom_ioloop), I get an exception.
Code:

_on_connection_open_error is being called.
err is equal to = RabbitMQ Connection error AMQPConnectionError: (AMQPConnectionWorkflowFailed: 1 exceptions in all; last exception - AMQPConnectorSocketConnectError: NotImplementedError(); first exception - None,)

I extracted the following backtrace _unused_connection:

Traceback (most recent call last):
  File "D:\Python39\lib\site-packages\pika\adapters\utils\io_services_utils.py", line 316, in _start_async
    self._nbio.set_writer(self._sock.fileno(), self._on_writable)
  File "D:\Python39\lib\site-packages\pika\adapters\asyncio_connection.py", line 194, in set_writer
    self._loop.add_writer(fd, on_writable)
  File "D:\Python39\lib\asyncio\events.py", line 510, in add_writer
    raise NotImplementedError
NotImplementedError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "D:\Python39\lib\site-packages\pika\adapters\utils\connection_workflow.py", line 815, in _try_next_resolved_address
    addr_record = next(self._addrinfo_iter)
StopIteration

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "D:\Python39\lib\site-packages\pika\connection.py", line 1363, in basic_nack
    return self.server_capabilities.get('basic.nack', False)
AttributeError: 'NoneType' object has no attribute 'get'

Code:

import json
import traceback
import functools
import threading

import asyncio
import pika
from pika.adapters.asyncio_connection import AsyncioConnection

import config

class RabbitMQConnection:
    def __init__(self, connection_string, on_open_callback, on_error_callback):
        self._on_open_callback = on_open_callback
        self._on_error_callback = on_error_callback

        self._ioloop = asyncio.new_event_loop()

        self._connection_string = connection_string
        self._connection = None
        self._connect()

        self._reconnect_delay = 0

    def _connect(self):
        self._connection = AsyncioConnection(
            parameters=pika.URLParameters(self._connection_string),
            on_open_callback=self._on_connection_open,
            on_open_error_callback=self._on_connection_open_error,
            on_close_callback=self._on_connection_closed,
            custom_ioloop=self._ioloop)

    def _on_connection_open(self, connection):
        self._on_open_callback(connection)
        self._reconnect_delay = 0

    def _on_connection_open_error(self, _unused_connection, err):
        """This method is called by pika if the connection to RabbitMQ
        can't be established.
        :param pika.SelectConnection _unused_connection: The connection
        :param Exception err: The error
        """
        print("RabbitMQ Connection error", repr(err))
        self._on_error_callback(_unused_connection, err)
        self._stopping = False
        self._stop()

    def _on_connection_closed(self, _unused_connection, reason):
        """This method is invoked by pika when the connection to RabbitMQ is
        closed unexpectedly. Since it is unexpected, we will reconnect to
        RabbitMQ if it disconnects.
        :param pika.connection.Connection connection: The closed connection obj
        :param Exception reason: exception representing reason for loss of
            connection.
        """
        self._on_error_callback(_unused_connection, reason)
        self._stop()

    def start(self):
        def start_loop(loop):
            asyncio.set_event_loop(loop)
            while True:
                try:
                    loop.run_forever()
                    self._connection.ioloop.run_forever()
                except: traceback.print_exc()

        thread = threading.Thread(target=start_loop, args=(self._ioloop,), daemon=True)
        thread.start()

    def stop(self):  
        self._stopping = True
        self._stop()

    def _stop(self):
        self._connection.ioloop.stop()

        if self._connection.is_open:
            self._connection.close()

        if not self._stopping:
            self._ioloop.create_task(self._reconnect())
        else:
            # TODO: Close thread
            # asyncio.create_task()
            pass

    async def _reconnect(self):
        connection_delay = self._get_reconnect_delay()
        print("RabbitMQ Disconnected. Reconnecting in %d seconds." % (connection_delay))
        await asyncio.sleep(connection_delay)
        self._connect()

    def _get_reconnect_delay(self):
        self._reconnect_delay = min(self._reconnect_delay + 1, 30)
        return self._reconnect_delay


class RabbitMQ:
    def __init__(self, 
        rabbit_host=config.RABBIT_HOST,
        rabbit_port=config.RABBIT_PORT, 
        rabbit_username=config.RABBIT_USERNAME, 
        rabbit_password=config.RABBIT_PASSWORD,
        queue=config.RABBIT_QUEUE):

        # Initialization
        self._queue = queue
        self._channel = None
        self._channel_open_callbacks = []

        connection_string = f'amqp://{rabbit_username}:{rabbit_password}@{rabbit_host}:{rabbit_port}/%2f'
        print("Connecting to " + connection_string)
        self._connection = RabbitMQConnection(connection_string, self._on_connection_open, self._on_connection_closed)

    def _on_connection_open(self, connection):
        connection.channel(on_open_callback=self._on_channel_open)  

    def _on_connection_closed(self, _unused_connection, reason_or_err):
        self._channel = None

    def _on_channel_open(self, channel):
        self._channel = channel
        self._channel.queue_declare(queue=self.queue)

        for callback in self._channel_open_callbacks:
            callback(self._channel)
        self._channel_open_callbacks = []

    def start(self):
        self._connection.start()

    def stop(self):
        # TODO: Stop consuming
        self._connection.stop()

    def consume_keywords(self, callback):
        kwargs = dict(queue=self._queue_name, 
            on_message_callback=callback, 
            auto_ack=True)
            
        if not self._channel:
            self._channel_open_callbacks.append(lambda channel: channel.basic_consume(**kwargs))
            return

        self._channel.basic_consume(**kwargs)

    def publish_keywords(self, keywords):
        keywords_list = list(iter(keywords))
        if len(keywords_list) == 0: return

        if self._channel: 
            self._channel.basic_publish(exchange="", 
                routing_key=self._queue_name, 
                body=json.dumps(keywords_list), 
                properties=pika.BasicProperties(content_type='application/json', delivery_mode=1))

Thank you!

@arbel03
Copy link
Author

arbel03 commented Jan 9, 2021

I found this which might be related https://groups.google.com/g/pika-python/c/-m8KpWI6wlo/m/Ym1rioGsAgAJ

@lukebakken
Copy link
Member

lukebakken commented May 24, 2022

Sorry, I was unsubscribed from this repository and have missed new issues from the past year. I will investigate this before the next major release, thank you!

@lukebakken lukebakken added this to the 1.3.0 milestone May 24, 2022
@lukebakken lukebakken self-assigned this May 24, 2022
@lukebakken
Copy link
Member

https://github.com/lukebakken/pika-1296

I'm looking into this now. Better late than never!

@lukebakken lukebakken modified the milestones: 1.3.0, 2.0.0 Jun 29, 2022
@hisabimbola
Copy link

any update on this? I'm also having issue with custom loop

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants