Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

heartbeat and long running workers #301

Closed
matteosister opened this issue Sep 20, 2015 · 29 comments
Closed

heartbeat and long running workers #301

matteosister opened this issue Sep 20, 2015 · 29 comments

Comments

@matteosister
Copy link

We are trying to solve a problem we have in production. I'm here just to ask if someone has the same problem, or just confirm that this could be the case.

We have an heartbeat of 10s in the bundle configuration, and we have workers that takes long time to complete. The main reason is that they call a really slow external service that could take longer then 10 seconds. Possibly even 50 or 100 seconds.

At the end of the worker we need to publish another message to rabbitmq, and we have always error messages like:

Broken pipe or closed connection /app/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Wire/IO/StreamIO.php 190

The method is write, which (I suppose) gets called by heartbeat.

Now the question: is it possible that this library, while handling the worker's load, is busy and do not send any heartbeat? This could be the case because of the single thread php limitation, so it makes sense. But in this particular case, is it possible that the server closes the connection because of the lack of heartbeats, and when we try to publish again the server is gone?

Any help is really appreciated!
Thanks.

@videlalvaro
Copy link
Collaborator

Yes, due to the single thread limitation this is quite hard to fix with PHP. The latest release of php-amqplib has some fixes in the heartbeats area BTW. If you have any ideas how we could handle heartbeats on php-amqplib I would like to hear them.

@matteosister
Copy link
Author

@videlalvaro no idea at all...I was trying to reproduce the same problem with elixir, and obviously the base erlang library handles hearbeats in a separate process.

In php there are some libraries that try to address the probelm, such as https://github.com/kriswallsmith/spork
but, beside the quality of this project itself, forking in php sounds like a hack to me.

We solved the issue by raising the heartbeat to a reasonable value that should work for all our workers. Suboptimal, but it's working.

If someone has another idea, maybe we could join forces to work on it togheter!

@enleur
Copy link

enleur commented Jan 5, 2016

@matteosister here is a proposal https://github.com/videlalvaro/RabbitMqBundle/issues/325

Also we can start a timer at the beginning of message processing and after processing check if processing time more than socket timeout time then do reconnect.

For Doctrine's DBAL Connection we made a wrapper and for each method added try-catch logic where we catch timeout exceptions and do query re-run after that. I think that would work for amqp as well

@vcraescu
Copy link

@matteosister how did you solve the issue? I think I tried all kind of combination for heartbeat value but without any luck. The message remains unacked if the consumer takes more time to process (about 100s).

@kooliahmd
Copy link

@vcraescu , You can solve this issue as following:
within your consumer execute method:

  1. ack the message before executing your heavy call (by calling some low level method, not by returning true)

  2. do your heavy task.

  3. If heavy task failed, repush the same message to the queue. else just terminate your execution.

However, it's better to track the state of calls using database also. so you can recycle unexpected terminated execution without requeing the message. (lost messages)

@vcraescu
Copy link

vcraescu commented Sep 27, 2017

@Snake-Tn I can't repush the same message to the queue. That's the original problem. After ~100s i can't do any kind of communication with rabbitmq cause it will throw "Broken pipe or connection closed". If I move to a database solution then why I keep using a rabbitmq queue at all?
We use a rabbit queue exactly because the script will take some time to run and we cannot block the request so we schedule it. If I can't do that probably using only rabbitmq and I need a persistent storage then why I don't use the persistent storage solely.

@kooliahmd
Copy link

kooliahmd commented Sep 27, 2017

@vcraescu
You are going to push the same message using a new connection. (via producer)
database is just a way to track lost messages, because you can ack a message then your consumer die for some reason before pushing the same message again.
Using rabbitmq will still give you the power of having multi consumer processing in parallel.
Switching to a cron based solution will solve your issue but you will lose some scalability margin.

@vcraescu
Copy link

@Snake-Tn But I'm not even able to push the message via a producer because of the very same reason.

@kooliahmd
Copy link

@vcraescu , I checked with one of this repo maintainers, it's possible to define connection per producer/consumer.(per default they share the same default connection) however, the producer connection is going to be established on service creation. so it's going to be timed out also. (they are planing to make it lazy for future release)
I don't see any further solution beside using lower function calls to create new connection.

@vcraescu
Copy link

@Snake-Tn And how I'm supposed to use a lower function calls from consumer?

@andrefigueira
Copy link

@vcraescu had the same problems in production, our consumer, processes rows from a CSV and publishes the results to another queue at the end, we got around the issue using a few things:

  1. We gave the producer it's own lazy connection, and kept this connection in memory
  2. We implemented publisher confirms to get a better idea of what's published and what isn't
  3. We implemented publish retry mechanism, so when we get connection issues on publish, we catch the exception and recreate the connection and try again.
  4. For the heartbeat setting, we've got it at 60.0

These steps have sorted the issue for us, and we're now handling millions of publishes from consumers using this process, some of the consumers are busy for hours without connection failures, there is the odd one that dies, but that's then picked up by the retry mechanism.

@vcraescu
Copy link

vcraescu commented Oct 17, 2017

@andrefigueira Did you keep using the bundle or implemented everything using direct calls to rabbitmq?

Thank you for your detailed answer!

@Sander-Toonen
Copy link

Sander-Toonen commented Mar 29, 2018

Would it maybe be possible to pass a callback to the execute() method which can be called periodically during a long running process? That callback could then respond to the heartbeat messages after which the worker continues with its next step.

php-amqplib/php-amqplib#520

@naxo8628
Copy link

naxo8628 commented May 8, 2018

Same as @vcraescu, I used all combinations of heartbeat, read_write_timeout and lazy in the bundle but always received "unable to write to socket [104]: Connection reset by peer", when the consumer take more than 5s to do the process.

@srgkas
Copy link

srgkas commented Sep 27, 2018

Getting the same issue. Here is my workaround.
As the heartbeat is being sent by the broker we can't tune it (except the period), so we can emulate the heartbeat from the consumer process.
All consumer-broker interaction via the connected channel can be considered as the heartbeat because it means that the connection is alive.
So all I had to do is to create the method that "pings" the broker - sends to the broker default exchange the ping message with some system routing key that is not being used by any queue as the binding key (but still can be used in future). So just decompose your long-running task into blocks and call this method between these blocks. This is much cheaper than the reconnect.
Hope it helps.

@Perni1984
Copy link

@srgkas could you shed some light how you send the "ping", because unfortunately the solution based on https://blog.mollie.com/keeping-rabbitmq-connections-alive-in-php-b11cb657d5fb to call ->read(0) to trigger the heartbeat does not work for me anymore with the current version 2.9.2 of php-amqplib:

 protected function sendHeartbeat($connection)
 {
        $connection->getIO()->read(0);         // does not work anymore for version 2.9.2
 }

@abrinkman-polderworks
Copy link

I also have had a lot of problems with this. Since the solution from Mollie broke in 2.9.2 I gave up; now using timeouts that are so long our workers have finished before. Not what I want, but it's the only thing that works ok.

@philraj
Copy link

philraj commented Sep 3, 2019

$connection->getIO()->check_heartbeat(); did the trick for me.

Make sure to look at the logic of that method, because it will throw errors if the heartbeat time is not configured correctly on both the client and the server.

@srgkas
Copy link

srgkas commented Jan 16, 2020

@Perni1984 Hi.
Sorry for a bit late answer :)
To ping the broker I'm publishing a dummy message with the system.ping routing key. Be careful with this solution if you have any queue with # binding.

@maxiwheat
Copy link

@srgkas Can you post the code you use to replace the method sendHeartbeat used by Perni1984 please, that would help us.

@arturslogins
Copy link

Any progress on this issue?

@vcraescu
Copy link

@arturslogins don't hold your breath! :)
Technically this is not possible to be implemented in PHP.

@igrizzli
Copy link
Contributor

igrizzli commented Apr 23, 2020

@arturslogins don't hold your breath! :)
Technically this is not possible to be implemented in PHP.

@vcraescu
php-amqplib/php-amqplib#775
Implemented here, if you have time you can finish this PR

@github-actions
Copy link

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@ramunasd
Copy link
Member

Latest library version has improved heartbeat logic. Also there is unix signals based heartbeat checker, which might solve Your problems. Please report there if still issue still occurs.

@philraj
Copy link

philraj commented Mar 23, 2021

Latest library version has improved heartbeat logic. Also there is unix signals based heartbeat checker, which might solve Your problems. Please report there if still issue still occurs.

Can you explain the Unix signal heartbeat logic? Don't see anything in the docs about sending a heartbeat via Unix signal, and couldn't find it in the code either.

(Edit) nevermind, found it in the php-amqplib repo. https://github.com/php-amqplib/php-amqplib#unix-signals
Don't need to send any signals, the heartbeat checker raises a SIGALRM on a set interval.

@ramunasd
Copy link
Member

Please see demo here:
https://github.com/php-amqplib/php-amqplib/blob/master/demo/amqp_consumer_pcntl_heartbeat.php#L22

@semen-plekhanov
Copy link

Please see demo here: https://github.com/php-amqplib/php-amqplib/blob/master/demo/amqp_consumer_pcntl_heartbeat.php#L22

it doesn't work if i use long running blocking operations (database operation, http request, etc.)

/**
 * @param AMQPMessage $message
 */
function process_message($message)
{
    echo "\n--------\n";
    echo "You send: [$message->body] message";
    echo "\n--------\n";
    echo "\nprocessing message\n";

    // for example
    exec("php -r ' sleep(20); '");

    echo "\n[OK] message processed\n";

    $message->ack();

    // Send a message with the string "quit" to cancel the consumer.
    if ($message->body === 'quit') {
        $message->getChannel()->basic_cancel($message->getConsumerTag());
    }
}

I get an error:
PHP Fatal error: Uncaught PhpAmqpLib\Exception\AMQPConnectionClosedException: Broken pipe or closed connection in /vendor/php-amqplib/php-amqplib/PhpAmqpLib/Wire/IO/StreamIO.php:235

@no888name
Copy link

no888name commented May 29, 2022

Please see demo here: https://github.com/php-amqplib/php-amqplib/blob/master/demo/amqp_consumer_pcntl_heartbeat.php#L22

it doesn't work if i use long running blocking operations (database operation, http request, etc.)

agree. It doesn't . This only works if you have set of blocking operation and everyone of them is inside 'heartbeat' timeframe

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests