Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

Timeouts not properly checking timeout status of stream connection when performing reading and writing. #56

Merged
merged 2 commits into from

6 participants

@seansullivan

When reading and writing to the socket, it is possible for the RabbitMQ server to block the connection, causing a 504 Gateway Error to be thrown by the server due to an infinite loop. This occurs when RabbitMQ's flow control kicks in and causes all of the connections to block. I specifically ran into this on my development server when disk-based flow control was triggered due to a lack of free disk space.

The best way to reproduce the issue is to set your rabbitmq.config file with the following rule and restart the server.

[{rabbit, [{disk_free_limit, 1000000000000000000000}]}].

Upon connecting to RabbitMQ from your PHP script, you will notice, in the RabbitMQ admin, that all of the connections coming from the producer are of the "blocked" type. This causes the PHP process to infinitely loop in the writing and reading logic.

The bad behavior manifests itself when attempting to close the channel & connection. This is not possible. By appropriately using the timeout value socket when the socket connection is created, the infinite loop goes away and a descriptive Exception is now thrown in both cases.

Also, the library now throws an Exception if it cannot properly set a timeout value for the socket.

This problem was also exposed in this issue: #43

@DrLongGhost

The company I work for pushes about 10 million messages per day through a rabbit queue and we recently ran into the above issue (php-amqplib ignoring the timeout and hanging indefinitely when attempting to close the connection).

I can verify the proposed commits solve the problem and are safe for use in production systems. Please merge them, so we won't need to leave this project forked on our end.

Thanks to seansullivan and videlalvaro for their work.

@videlalvaro videlalvaro merged commit c508ffa into videlalvaro:master
@videlalvaro
Owner

@DrLongGhost may I ask which company is that?

@DrLongGhost

AWeber Communcations: http://www.aweber.com

Thanks for the quick response!

@videlalvaro
Owner

Would you mind to get in touch via email? avidela at vmware.com

@seansullivan

Thank you for merging!

@schmittjoh

This patch seems to be problematic.

In general, the idea to not block forever is good, but at the moment any consumer crashes if there is no constant stream of new messages to arrive. With the previous behavior, the worker stayed alive until new data arrived, now it just exits if no new data arrives within a given time frame. This does not seem like a desirable behavior to me.

@fdeweger

I can confirm the issues that schmittjoh reported.

Not everyone is pushing ten million messages a day ;)

@videlalvaro
Owner
@schmittjoh

My suggestion would be to just not throw an exception. It is a normal use case that there are no new messages, in which case control could just return to the calling code.

Usually, you have code such as:

while (count($this->channel->callbacks) > 0) {
    $this->channel->wait();
}

This loop could just be cycled a bit more often.

@videlalvaro
Owner
@fdeweger

Yeah, I think in the current situation is bad for everyone... Since DrLongGhost commented it was safe, they must be running a very long living php process. I think it's not a good idea to have such a process blow up just because there are no more messages.

And for my use case it's not a real problem (small batches of messages get consumed by a cron job), but it's not very nice to see an exception every time the script runs.

@pulse00

i can confirm what @schmittjoh reported. running the amqp_consumer.php script in the demo folder throws an exception if no messages arrive. I guess this would not be a problem if you watch the consumer using something like supervisord.

I'd prefer throwing a ConnectionTimeoutException because, if there is some problem with the broker or network communication, the previous behaviour could bring down a complete server if messages are being published from within a webcontext and those processes stay alive indefinitely.

@DrLongGhost

To clarify the behavior we were seeing... We have a cron job that fires up every 5 minutes, processes a log file then publishes about 1000 messages to rabbit. We were seeing the cron job hanging indefinitely on the "connection->close()" call with the "waiting for a new frame" message mentioned in issue #43. This would happen maybe 0.05% of the time.

I'm not really knowledgeable enough to submit an amended pull request myself, but I can comment on the behavior. It seems to me that when you explicitly issue a connection->close() call, there should be a timeout by default on the "waiting for a new frame". The merged pull request introduces this behavior.

If the pull request also has the unfortunate side effect of timing out open connections that haven't been requested to be closed, then I can see how that is a problem. Is it possible to add the timeout only to connections that the client has requested to close, but not those that it hasn't?

@seansullivan

@schmittjoh Thanks for bringing this to my attention! I apologize for my oversight on exceptions being thrown on the long running consumers. I can definitely reproduce this behavior on my end and have provided a patch and opened a pull request (#60) to remove the code that causes this

Removing the exception from the loop contained in the rawread() method of AMQPReader definitely unbreaks and now everything working as it was before this PR was merged.

I do, however, think it's important that the Exception continues to be thrown in the write() method of AMQPConnection as that's when the nasty behavior of blocking when calling connection->close() appears. There is more ideal solution-- that is for the write() method to have knowledge that the connection is in the process of being closed, and in that case only throw a special exception-- i.e. ConnectionCloseTimeoutException, then the consumer of this library can more easily target this condition and handle it appropriately.

Another idea is, AMQPConnection could also support specifying a separate timeout for both read and write. Setting read to null (or false, whichever is best) could cause the rawread() loop to never timeout while still providing that ability to applications/scripts that would like to have reads time out.

Further, it would be ideal if we could sort out a way for PHP to determine whether or not RabbitMQ is blocking the connection from PHP. It is possible to gain this knowledge through rabbitmqctl on the command line, but I couldn't find a way to do so with the PHP stream_* functions. My thought, is if we were able to do this, the code that is using this library could then determine how to handle the situation (continue to wait or exit) rather than it being handled in the library.

@schmittjoh

@seansullivan, thanks for the patch, very much appreciated. I'd be curious if you tried to just return from the channel->wait() method if a timeout occurs instead of throwing an exception.

For example, this would be useful when you make an RPC call and the RPC server has failed for some reason. At the moment, the RPC client basically hangs forever waiting for a message on its exclusive response queue. For these cases, it would be cool if control could return to the while loop so that you can just fail on the client-side after a specific amount of time (or try again if timeout has not yet been reached).

So, I think there is some merit in even applying it to the read method if we find a way that does not require us to throw an exception for these long running consumers. I don't really have a good overview of the library's internals, but maybe we could also throw an exception, and catch/ignore it in the wait method. What do you think?

@fdeweger

@seansullivan Awesome! Works in my use case, and I did a quick test with the settings you gave in the initial PR. A couple of weeks ago I had the same problem as you described in a VM with a very small HDD, but I never made the connection between that and what you described (I was only seeing failures in the consumers, but I didn't think about producers).

Both problems seem to be solved.

@videlalvaro
Owner

I like @schmittjoh idea, what do you guys think?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
This page is out of date. Refresh to see the latest.
View
12 PhpAmqpLib/Connection/AMQPConnection.php
@@ -84,7 +84,10 @@ public function __construct($host, $port,
throw new \Exception ("Error Connecting to server($errno): $errstr ");
}
- stream_set_timeout($this->sock, $read_write_timeout);
+ if(!stream_set_timeout($this->sock, $read_write_timeout)) {
+ throw new \Exception ("Timeout could not be set");
+ }
+
stream_set_blocking($this->sock, 1);
$this->input = new AMQPReader(null, $this->sock);
@@ -167,6 +170,13 @@ protected function write($data)
if ($written === 0) {
throw new \Exception ("Broken pipe or closed connection");
}
+
+ // get status of socket to determine whether or not it has timed out
+ $info = stream_get_meta_data($this->sock);
+ if($info['timed_out']) {
+ throw new \Exception("Error sending data. Socket connection timed out");
+ }
+
$len = $len - $written;
if ($len > 0) {
$data = substr($data,0-$len);
View
7 PhpAmqpLib/Wire/AMQPReader.php
@@ -53,6 +53,13 @@ private function rawread($n)
while ($read < $n && !feof($this->sock->real_sock()) &&
(false !== ($buf = fread($this->sock->real_sock(), $n - $read)))) {
+
+ // get status of socket to determine whether or not it has timed out
+ $info = stream_get_meta_data($this->sock->real_sock());
+ if($info['timed_out']) {
+ throw new \Exception('Error reading data. Socket connection timed out');
+ }
+
$read += strlen($buf);
$res .= $buf;
}
Something went wrong with that request. Please try again.