Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AlreadyCalledError in twisted connection adapter #1282

Open
sekogan opened this issue Jul 30, 2020 · 7 comments
Open

AlreadyCalledError in twisted connection adapter #1282

sekogan opened this issue Jul 30, 2020 · 7 comments
Assignees
Milestone

Comments

@sekogan
Copy link

sekogan commented Jul 30, 2020

Hi!

I am having trouble with the following scenario:

from twisted.internet import defer, protocol, task
import pika
import pika.adapters.twisted_connection


@defer.inlineCallbacks
def test_the_issue(reactor):
    host = "192.168.8.8"
    port = 5672
    client_creator = protocol.ClientCreator(
        reactor,
        pika.adapters.twisted_connection.TwistedProtocolConnection,
        pika.ConnectionParameters(
            host=host,
            port=port,
            virtual_host="/",
            credentials=pika.PlainCredentials("guest", "guest"),
        ),
    )
    connection = yield client_creator.connectTCP(host, port)
    yield connection.ready
    channel = yield connection.channel()

    d = channel.queue_declare("foo")
    d.cancel()
    try:
        yield d
    except defer.CancelledError:
        pass

    yield channel.close()
    yield connection.close()


if __name__ == "__main__":
    task.react(test_the_issue)

It fails every time:

  File "C:\Python27\lib\site-packages\twisted\internet\selectreactor.py", line 149, in _doReadOrWrite
    why = getattr(selectable, method)()
  File "C:\Python27\lib\site-packages\twisted\internet\tcp.py", line 243, in doRead
    return self._dataReceived(data)
  File "C:\Python27\lib\site-packages\twisted\internet\tcp.py", line 249, in _dataReceived
    rval = self.protocol.dataReceived(data)
  File "C:\Python27\lib\site-packages\pika\adapters\twisted_connection.py", line 1209, in dataReceived
    self._impl.data_received(data)
  File "C:\Python27\lib\site-packages\pika\adapters\twisted_connection.py", line 1143, in data_received
    self._on_data_available(data)
  File "C:\Python27\lib\site-packages\pika\connection.py", line 1952, in _on_data_available
    self._process_frame(frame_value)
  File "C:\Python27\lib\site-packages\pika\connection.py", line 2105, in _process_frame
    if self._process_callbacks(frame_value):
  File "C:\Python27\lib\site-packages\pika\connection.py", line 2086, in _process_callbacks
    frame_value)  # Args
  File "C:\Python27\lib\site-packages\pika\callback.py", line 60, in wrapper
    return function(*tuple(args), **kwargs)
  File "C:\Python27\lib\site-packages\pika\callback.py", line 92, in wrapper
    return function(*args, **kwargs)
  File "C:\Python27\lib\site-packages\pika\callback.py", line 233, in process
    callback(*args, **keywords)
  File "C:\Python27\lib\site-packages\pika\channel.py", line 1121, in _on_closeok
    self._transition_to_closed()
  File "C:\Python27\lib\site-packages\pika\channel.py", line 1055, in _transition_to_closed
    self, self, self._closing_reason)
  File "C:\Python27\lib\site-packages\pika\callback.py", line 60, in wrapper
    return function(*tuple(args), **kwargs)
  File "C:\Python27\lib\site-packages\pika\callback.py", line 92, in wrapper
    return function(*args, **kwargs)
  File "C:\Python27\lib\site-packages\pika\callback.py", line 233, in process
    callback(*args, **keywords)
  File "C:\Python27\lib\site-packages\pika\adapters\twisted_connection.py", line 136, in _on_channel_closed
    d.errback(self._closed)
  File "C:\Python27\lib\site-packages\twisted\internet\defer.py", line 501, in errback
    self._startRunCallbacks(fail)
  File "C:\Python27\lib\site-packages\twisted\internet\defer.py", line 560, in _startRunCallbacks
    raise AlreadyCalledError(extra)

Full log with debug messages from pika

Using pika 1.1.0, twisted 19.10.0 and python 2.7.14.

@lukebakken lukebakken self-assigned this Jul 30, 2020
@lukebakken lukebakken added this to the 1.2.0 milestone Jul 30, 2020
@lukebakken
Copy link
Member

Hi I'm assuming this is your message on the mailing list - https://groups.google.com/d/topic/pika-python/OaHcH2tJtjo/discussion

I'm hoping @abompard can help out as I'm not familiar with the Twisted adapter.

How common is it to cancel an operation before running it? My guess is this is not a common scenario.

@abompard
Copy link
Contributor

I'll have a look!

@abompard
Copy link
Contributor

OK, I see where this comes from, and indeed cancelling those deferreds is not supported at the moment. I can make Pika ignore the error, but maybe we can do better. @lukebakken is there a way in Pika to cancel a call that has been made? Say I'm calling queue_declare and the server is taking too long to respond, as a client can I cancel it? If we can't tell the server to cancel the operation, maybe as a client we can remove the call from the RPC buffer and not wait for the replies? I'm not sure what's best to do.

@lukebakken
Copy link
Member

lukebakken commented Jul 31, 2020

There is no way that I know of to cancel a call that is made. If @vitaly-krugl is around he could chime in.

How common is this scenario? Does it really need to be fixed?

@abompard
Copy link
Contributor

@sekogan Could you tell us more about why you are cancelling this deferred, in which context, and what you would expect to happen?

@sekogan
Copy link
Author

sekogan commented Jul 31, 2020

@abompard For example I use cancellations to stop coroutines performing lengthy operations. Imagine a coroutine named "consumer" which declares a queue, calls basic_consume and handles messages somehow. To stop it I cancel the deferred returned by that coroutine. The deferred forwards cancellation to the deferred it is waiting for which forwards it again and so on. The last deferred in that chain typically raises defer.CancelledError which stops the coroutine.

Another example is adding a timeout to a coroutine which performs something that includes calls to pika API like basic_publish. This can be done by calling addTimeout on a deferred returned by the coroutine. A timed out deferred cancels itself and underlying operations.

The code above is just a minimal example reproducing the issue. It doesn't really make sense to cancel queue_declare explicitly like that.

I would expect pika to either fail explicitly and as early as possible if cancellations are not supported or abandon the underlying operation and discard its outcome if cancellations are supported. It would also be nice to have cancellations mentioned in the documentation.

@lukebakken lukebakken removed this from the 1.2.0 milestone Jan 27, 2021
@lukebakken lukebakken added this to the 1.3.0 milestone May 24, 2022
@lukebakken
Copy link
Member

lukebakken commented May 24, 2022

I will investigate this before the next major release, thank you!

@lukebakken lukebakken modified the milestones: 1.3.0, 2.0.0 Jun 29, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants