Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove the NACK during the stop_consuming #1389

Open
Gsantomaggio opened this issue Sep 16, 2022 · 8 comments
Open

Remove the NACK during the stop_consuming #1389

Gsantomaggio opened this issue Sep 16, 2022 · 8 comments
Assignees
Milestone

Comments

@Gsantomaggio
Copy link
Contributor

Per conversation with @michaelklishin @lukebakken

Remove the nack during the channel.stop_consuming when the channel is closing the nack is not necessary

See also the SO question.

Internal link

@lukebakken lukebakken self-assigned this Sep 16, 2022
@lukebakken lukebakken added this to the 2.0.0 milestone Sep 16, 2022
@tango-taylor
Copy link

Thanks for the report! I'm the author of the original SO question, let me know if you need anything.

@tango-taylor
Copy link

I edited the SO question with an even simpler example to trigger the bug, I'll post it here:

Client

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost')
)
channel_stream = connection.channel()

channel_stream.queue_declare(
    "stream-queue",
    durable=True,
    arguments={
        'x-queue-type': 'stream',
    }
)

for i in range(2):
    channel_stream.basic_publish(
        exchange='',
        routing_key='stream-queue',
        body=f"stream data".encode()
    )
connection.close()

Server

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost')
)
channel_stream = connection.channel()

channel_stream.queue_declare(
    "stream-queue",
    auto_delete=False, exclusive=False, durable=True,
    arguments={
        'x-queue-type': 'stream',
    }
)
channel_stream.basic_qos(
    prefetch_count=1,
)


class Server(object):
    def __init__(self):
        channel_stream.basic_consume(
            queue="stream-queue",
            on_message_callback=self.stream_callback,
        )

    def stream_callback(self, channel, method, props, body):
        print(f"received '{body.decode()}' via {method.routing_key}")
        channel_stream.stop_consuming()


server = Server()

try:
    channel_stream.start_consuming()
except KeyboardInterrupt:
    connection.close()

@Gsantomaggio
Copy link
Contributor Author

Thank you @tango-taylor for reporting.

@fizix137
Copy link

fizix137 commented Nov 2, 2022

I'm writing an app that consuming a streaming queue and posts the results to a streamlit app. I have to hand control back to streamlit by periodically starting and stopping message consumption. I'm still figuring out the best way to do this and along the way ran into the same problem when I call stop_consuming.

@michaelklishin
Copy link
Contributor

@fizix137 FYI, there is a separate Python client for streams that does not have Pika's limitations.

@michaelklishin
Copy link
Contributor

In fact, there is more than one Python client for RabbitMQ streams :)

@lukebakken
Copy link
Member

#1402 (reply in thread)

@rubancar
Copy link

rubancar commented Jun 1, 2023

I've tested SelectConnection and found the same behavior, if you setup a queue of type stream the error appears while processing a message and a basic_cancel is fired in the middle, the _on_deliver function doesn't take care that you're using a Channel for consuming a stream queue and sends the basic_reject method when it must not do it.
If a message comes in the middle while closing a Channel for stream queue type, the behavior should be to return the method, with no more actions.
Nevertheless, I'm new working with pika and Rabbit, I just dug a bit into the code, and from what I understand rejecting messages do not apply for stream queues.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants