Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Broken pipe or closed connection #40

Closed
liushuangxi opened this issue Oct 14, 2016 · 19 comments
Closed

Broken pipe or closed connection #40

liushuangxi opened this issue Oct 14, 2016 · 19 comments

Comments

@liushuangxi
Copy link

liushuangxi commented Oct 14, 2016

Broken pipe or closed connection
on vendor/bunny/bunny/src/Bunny/AbstractClient.php 300
How Can I Do?

@honzatrtik
Copy link

Same problem here - using dockerhub RabbitMQ 3.6.5 on Erlang 19.0.7

@honzatrtik
Copy link

O-ou, my bad, I forgot to call $client->connect(), maybe there should be some more descriptive Exception thrown?

@Savageman
Copy link

Savageman commented Jan 27, 2017

Same error here. I'm suspecting the thread is lagging behind doing some other non fully asynchronous things resulting in a timeout on the connection.

I'd love a way to just catch this and reconnect it...

Ok actually #29 #30 #39 and #40 are all the same thing

@paveljanda
Copy link

paveljanda commented Aug 5, 2017

Hey, is there any fork with this issue fixed?
I can send a PR if I have more info about this problem. Really breaks our project right now.

Love this vendor though!

@paveljanda
Copy link

@jakubkulhan ping

@martinvenus
Copy link

@jakubkulhan Is there any plan to fix this issue please? Bunny is really excellent library, but this bug does not allow us to use it in production. Thank you in advance.

@ondrejmirtes
Copy link
Contributor

I think the problem is on your application's side because we're using RabbitMQ in production with millions of messages daily and this problem does not occur, meaning the problem is in your code which uses Bunny. Perhaps you can post some code example which causes this error for you?

@martinvenus
Copy link

You can be right, of course. I think that this stack trace could help:
Fatal error: Uncaught Bunny\Exception\ClientException: Broken pipe or closed connection. in /var/www/html/vendor/bunny/bunny/src/Bunny/AbstractClient.php:300 Stack trace: #0 /var/www/html/vendor/bunny/bunny/src/Bunny/Client.php(86): Bunny\AbstractClient->write() #1 /var/www/html/vendor/bunny/bunny/src/Bunny/ClientMethods.php(765): Bunny\Client->flushWriteBuffer() #2 /var/www/html/vendor/bunny/bunny/src/Bunny/Channel.php(220): Bunny\AbstractClient->channelClose(1, 0, '', 0, 0) #3 /var/www/html/vendor/bunny/bunny/src/Bunny/Client.php(147): Bunny\Channel->close() #4 /var/www/html/vendor/bunny/bunny/src/Bunny/Client.php(56): Bunny\Client->disconnect() #5 [internal function]: Bunny\Client->__destruct() #6 {main} thrown in /var/www/html/vendor/bunny/bunny/src/Bunny/AbstractClient.php on line 300

@ondrejmirtes
Copy link
Contributor

Nope, I need your code. I suspect that you're closing the connection first and trying to send a message later. Are you using the async client?

@martinvenus
Copy link

<?php declare(strict_types = 1);

namespace Lib\RabbitMq;

use Bunny\Client;

class Publisher
{

    /** @var Client */
    private $client;

    /** @var string */
    private $queueName;

    public function __construct(Client $client, string $queueName)
    {
        $this->client = $client;
        $this->queueName = $queueName;
    }

    public function publish(string $message): void
    {
        $this->connect();
        $this->client->channel()->publish($message, [], $this->queueName);
    }

    private function connect(): void
    {
        if (!$this->client->isConnected()) {
            $this->client->connect();
        }
    }
}

@ondrejmirtes
Copy link
Contributor

I think I see at least one problem. You're opening a new channel for each message.

// Creates and opens new channel.
$this->client->channel()

@ondrejmirtes
Copy link
Contributor

Also, about the publish method - the third parameter is supposed to be the exchange name, fourth parameter is the queue name, so I think the correct call should look like this:

$this->client->channel()->publish($message, [], '', $this->queueName); // use direct exchange

@paveljanda
Copy link

@ondrejmirtes This issue occurs when you send a message into the queue, then wait for about 5 minutes (with script running - e.g. ratchet server) and then try to send another message. I can make an example which will reproduce this issue. Hm?

@ondrejmirtes
Copy link
Contributor

@paveljanda Again, not a problem of Bunny in my opinion. Try to set a longer connection timeout (when connecting to RabbitMQ) and also try to fine-tune settings mentioned in RabbitMQ documentation here https://www.rabbitmq.com/networking.html.

@ondrejmirtes
Copy link
Contributor

@paveljanda Also, there's a RabbitMQ feature called Heartbeats which is also supported by Bunny, so try to look into that too. https://www.rabbitmq.com/heartbeats.html

@paveljanda
Copy link

@ondrejmirtes Ha, didn't know about bunny support. Thanks, will look into that

@jakubkulhan
Copy link
Owner

As suggested by @ondrejmirtes, the problem most probably is because broker (RabbitMQ) closed the connection as it didn't receive heartbeat AMQP frame. Default heartbeat is set to 60 seconds (https://github.com/jakubkulhan/bunny/blob/master/src/Bunny/AbstractClient.php#L132) - if client does not send any frame (e.g. by publishing a message, acking a message) within 60 seconds, server assumes the client is dead and closes TCP connection. Therefore the next call by client throws Broken pipe or closed connection exception.

If you want to write consumer process using synchronous blocking client (class Bunny\Client), you must use client's ->run() method. The code should be structured such as:

$client = new Client([
    "heartbeat" => 60.0,
]);
$client->connect();
$channel = $client->channel();

// initialize exchanges, queues, etc.
// initialize consumers, e.g.:
$channel->consume(function (Message $message, Channel $channel) {
    // BEWARE, time spent here must not exceed heartbeat specified in client's constructor config
}, "...");

// start consuming messages
$client->run();
// ->run() blocks indefinitely and also periodically sends heartbeat frames to keep connection alive

If all you want is to publish some messages, e.g. during processing of HTTP request and don't need confirmation from broker that the message has been accepted, ->run() doesn't need to be called. (cc @martinvenus)

Asynchronous client (class Bunny\Async\Client) doesn't have ->run() method. Periodic timers to send heartbeats are provided by ReactPHP's event loop. If you use other libraries (e.g. Ratchet, cc @paveljanda) that rely on event loop, all of them must use the same instance of event loop:

use React\EventLoop\Factory;
use Ratchet\App;
use Bunny\Async\Client;
use Bunny\Channel;

$loop = Factory::create();

$ratchet = new App(..., ..., ..., $loop);
// add handlers to ratchet

$client = new Client($loop);
$client->connect()->then(function (Client $client) {
    return $client->channel();
})->then(function (Channel $channel) {
    // work with async channel
});

// start ratchet server & bunny client
$loop->run();

The last thing to be aware of is if you're connecting to RabbitMQ through some kind of proxy/loadbalancer. Most proxies you some kind of idle timeout and then if no packets are being sent/received, they drop the connection. I've experienced this when the app on EC2 instance kept connecting to RabbitMQ through AWS ELB that had idle timeout lower than client's hearbeat timeout.

Hope this helps!

@paveljanda
Copy link

@jakubkulhan Thank you for your answer! You async example is working well without any timeouts.

@jayesbe
Copy link

jayesbe commented Jan 31, 2018

@jakubkulhan is there a way to reconnect if the event loop throws an exception?

like ->catch()?

$loop = Factory::create();
$x = function() {
  $client = new Client($loop);
  $client->connect()->
  ...
  ->catch(function() {
    $x();
  });

  $loop->run();
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

8 participants