Skip to content

Commit

Permalink
fix connection broken detection (on ssl sockets)
Browse files Browse the repository at this point in the history
The previous checks within `Connection::readFrame` often failed for ssl
connections on AWS. This was caused by the fact that `stream_select` will
fail to reliable determine if there is already processable data on the
encrypted stream. It returns true while the stream is not yet ready to
be consumed, which causes `fread` to return an empty string, which was
considered as failure before.
Related to #115

BC: As `fread` also returns an empty string for sockets that are broken,
we now need to make use of heartbeats to determine the status of the
connection. `HeartbeatEmitter` was adapted, so that a failing alive
notification now also causes an exception.
In addition `ServerAliveObserver` was added, which simplifies the usage
of server side triggered heartbeats to determine if the connection is
still stable.
  • Loading branch information
jmglsn committed Mar 26, 2019
1 parent cf2f3a1 commit 3f35438
Show file tree
Hide file tree
Showing 17 changed files with 1,068 additions and 221 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,9 @@ Thanks to @staabm for all the reviews :)
------
- add configuration for connection read/write may bytes (thanks to @ganeko for this enhancement, https://github.com/stomp-php/stomp-php/pull/113)

4.5.0
------
- refined broken connection detection, not longer failing when `fread` returns empty string (thanks to @mlamm for reporting this, https://github.com/stomp-php/stomp-php/issues/115 and https://github.com/stomp-php/stomp-php/issues/114
- Clients should now use either `ServerAliveObserver` or `HeartbeatEmitter` to verify the state of the connection, otherwise a broken connection might not be detected - especially when the socket is based on ssl.
- stabilize client heartbeat implementation, added awareness for failing `Connection::sendAlive` calls, now causing `HeartbeatException`
- added `ServerAliveObserver` in order to simplify detection of dead connections.
14 changes: 9 additions & 5 deletions src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,10 @@ public function setVhostname($host = null)
* within an interval - to indicate that the connection is still stable. If client and server agree on a beat and
* the interval passes without any data activity / beats the connection will be considered as broken and closed.
*
* If you define a heartbeat, you must assure that your application will send data within the interval.
* If you want to make sure that the server is still available, you should use the ServerAliveObserver in combination
* with an requested server heartbeat interval.
*
* If you define a heartbeat for client side, you must assure that your application will send data within the interval.
* You can add \Stomp\Network\Observer\HeartbeatEmitter to your connection in order to send beats automatically.
*
* If you don't use HeartbeatEmitter you must either send messages within the interval
Expand All @@ -178,12 +181,13 @@ public function setVhostname($host = null)
* @param int $receive
* Number of milliseconds between expected receipt of heartbeats. 0 means
* no heartbeats expected. (not yet supported by this client)
* @see \Stomp\Network\Observer\ServerAliveObserver
* @see \Stomp\Network\Observer\HeartbeatEmitter
* @see \Stomp\Network\Connection::sendAlive()
*/
public function setHeartbeat($send = 0, $receive = 0)
{
$this->heartbeat = [$send, $receive];
$this->heartbeat = [$send, $receive];
}

/**
Expand Down Expand Up @@ -230,13 +234,13 @@ public function connect()
* @throws ConnectionException
* @throws Exception\ErrorFrameException
*/
private function getConnectedFrame() {
$deadline = microtime(true) + ($this->getConnection()->getConnectTimeout() * 1000000);
private function getConnectedFrame()
{
$deadline = microtime(true) + $this->getConnection()->getConnectTimeout();
do {
if ($frame = $this->connection->readFrame()) {
return $frame;
}

} while (microtime(true) <= $deadline);

return null;
Expand Down
55 changes: 41 additions & 14 deletions src/Network/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,11 @@ class Connection
*/
private $maxReadBytes = 8192;

/**
* Alive Signal
*/
const ALIVE = "\n";

/**
* Initialize connection
*
Expand Down Expand Up @@ -219,7 +224,7 @@ private function parseUrl($url)
* Set the read timeout
*
* @param integer $seconds seconds
* @param integer $microseconds microseconds (1μs = 0.000001s)
* @param integer $microseconds microseconds (1μs = 0.000001s, ex. 500ms = 500000)
* @return void
*/
public function setReadTimeout($seconds, $microseconds = 0)
Expand All @@ -228,6 +233,18 @@ public function setReadTimeout($seconds, $microseconds = 0)
$this->readTimeout[1] = $microseconds;
}

/**
* Returns the read timeout
*
* First element contains full seconds, second the microseconds part.
*
* @return array
*/
public function getReadTimeout()
{
return $this->readTimeout;
}

/**
* Set the write timeout
*
Expand Down Expand Up @@ -295,8 +312,6 @@ public function setPersistentConnection($persistentConnection)
$this->persistentConnection = $persistentConnection;
}



/**
* Get a connection.
*
Expand Down Expand Up @@ -417,13 +432,13 @@ public function writeFrame(Frame $stompFrame)
/**
* Write passed data to the stream, respecting passed timeout.
*
* @param Frame $stompFrame
* @param int $timeout in seconds
* @param Frame|string $stompFrame
* @param float $timeout in seconds, supporting fractions
* @throws ConnectionException
*/
private function writeData(Frame $stompFrame, $timeout)
private function writeData($stompFrame, $timeout)
{
$data = $stompFrame->__toString();
$data = (string) $stompFrame;
$offset = 0;
$size = strlen($data);
$lastByteTime = microtime(true);
Expand Down Expand Up @@ -470,12 +485,21 @@ public function readFrame()
}

do {

$read = @fread($this->connection, $this->maxReadBytes);
if ($read === '' || $read === false) {
if ($read === false) {
throw new ConnectionException(sprintf('Was not possible to read data from stream.'), $this->activeHost);
}

// this can be caused by different events on the stream, ex. new data or any kind of signal
// it also happens when a ssl socket was closed on the other side... so we need to test
if ($read === '') {
$this->observers->emptyRead();
// again we give some time here
// as this path is most likely indicating that the socket is not working anymore
time_nanosleep(0, 5000000); // 5ms / 0.005s
return false;
}

$this->parser->addData($read);

if ($frame = $this->parser->nextFrame()) {
Expand Down Expand Up @@ -596,22 +620,25 @@ public function getHost()
/**
* Writes an "alive" message on the connection to indicate that the client is alive.
*
* @return bool
* @param float $timeout in seconds supporting fractions (microseconds)
*
* @return void
* @throws ConnectionException
*/
public function sendAlive()
public function sendAlive($timeout = 1.0)
{
if ($this->isConnected()) {
return (@fwrite($this->connection, "\n", 1) === 1);
$this->writeData(self::ALIVE, $timeout);
}
return false;
}

/**
* Immediately releases all allocated resources when the connection object gets destroyed.
*
* This is especially important for long running processes.
*/
public function __destruct() {
public function __destruct()
{
$this->disconnect();
}
}
Loading

0 comments on commit 3f35438

Please sign in to comment.