Permalink
Browse files

add possibility to work with NO_DELAY sockets

Using sockets without NO_DELAY option causes extreme lack of
performance in some cases (for example RPC calls and responses).

There is no possibility to set NO_DELAY option when working with streams, so
AMQPSocketConnection class was added with some minor refactoring

See benchmark for performance comparison between socket and stream connections
  • Loading branch information...
1 parent 4aa3e87 commit 68a26730b46c9a87c1c76c297bc2c752f736be0e @m00t m00t committed Feb 1, 2013
View
@@ -6,3 +6,7 @@ benchmark:
php benchmark/producer.php 4000
@echo "Consuming 4000:"
php benchmark/consumer.php
+ @echo "Stream produce 100:"
+ php benchmark/stream_tmp_produce.php 100
+ @echo "Socket produce 100:"
+ php benchmark/socket_tmp_produce.php 100
@@ -2,7 +2,7 @@
namespace PhpAmqpLib\Channel;
-use PhpAmqpLib\Connection\AMQPConnection;
+use PhpAmqpLib\Connection\AbstractConnection;
use PhpAmqpLib\Exception\AMQPOutOfBoundsException;
use PhpAmqpLib\Exception\AMQPRuntimeException;
use PhpAmqpLib\Helper\MiscHelper;
@@ -23,7 +23,7 @@ class AbstractChannel
protected $debug;
/**
*
- * @var AMQPConnection
+ * @var AbstractConnection
*/
protected $connection;
@@ -38,10 +38,10 @@ class AbstractChannel
protected $channel_id;
/**
- * @param \PhpAmqpLib\Connection\AMQPConnection $connection
+ * @param \PhpAmqpLib\Connection\AbstractConnection $connection
* @param $channel_id
*/
- public function __construct(AMQPConnection $connection, $channel_id)
+ public function __construct(AbstractConnection $connection, $channel_id)
{
$this->connection = $connection;
$this->channel_id = $channel_id;
@@ -0,0 +1,8 @@
+<?php
+
+namespace PhpAmqpLib\Connection;
+
+class AMQPConnection extends AMQPStreamConnection
+{
+ // just for BC
+}
@@ -4,7 +4,7 @@
use PhpAmqpLib\Connection\AMQPConnection;
-class AMQPSSLConnection extends AMQPConnection
+class AMQPSSLConnection extends AMQPStreamConnection
{
public function __construct($host, $port, $user, $password,
$vhost="/", $ssl_options = array(), $options = array())
@@ -0,0 +1,21 @@
+<?php
+
+namespace PhpAmqpLib\Connection;
+
+use PhpAmqpLib\Wire\IO\SocketIO;
+
+class AMQPSocketConnection extends AbstractConnection
+{
+ public function __construct($host, $port,
+ $user, $password,
+ $vhost="/",$insist=false,
+ $login_method="AMQPLAIN",
+ $login_response=null,
+ $locale="en_US",
+ $timeout = 3)
+ {
+ $io = new SocketIO($host, $port, $timeout);
+
+ parent::__construct($user, $password, $vhost, $insist, $login_method, $login_response, $locale, $io);
+ }
+}
@@ -0,0 +1,37 @@
+<?php
+
+namespace PhpAmqpLib\Connection;
+
+use PhpAmqpLib\Wire\IO\StreamIO;
+
+class AMQPStreamConnection extends AbstractConnection
+{
+ protected $sock = null;
+
+ public function __construct($host, $port,
+ $user, $password,
+ $vhost="/",$insist=false,
+ $login_method="AMQPLAIN",
+ $login_response=null,
+ $locale="en_US",
+ $connection_timeout = 3,
+ $read_write_timeout = 3,
+ $context = null)
+ {
+ $io = new StreamIO($host, $port, $connection_timeout, $read_write_timeout, $context);
+ $this->sock = $io->get_socket();
+
+ parent::__construct($user, $password, $vhost, $insist, $login_method, $login_response, $locale, $io);
+ }
+
+ /**
+ * get socket from current connection
+ *
+ * @deprecated
+ */
+ public function getSocket()
+ {
+ return $this->sock;
+ }
+
+}
@@ -10,8 +10,9 @@
use PhpAmqpLib\Helper\MiscHelper;
use PhpAmqpLib\Wire\AMQPWriter;
use PhpAmqpLib\Wire\AMQPReader;
+use PhpAmqpLib\Wire\IO\AbstractIO;
-class AMQPConnection extends AbstractChannel
+class AbstractConnection extends AbstractChannel
{
public static $LIBRARY_PROPERTIES = array(
"library" => array('S', "PHP AMQP Lib"),
@@ -29,22 +30,21 @@ class AMQPConnection extends AbstractChannel
*/
protected $close_on_destruct = true ;
- public function __construct($host, $port,
- $user, $password,
+ protected $io = null;
+
+ public function __construct($user, $password,
$vhost="/",$insist=false,
$login_method="AMQPLAIN",
$login_response=null,
$locale="en_US",
- $connection_timeout = 3,
- $read_write_timeout = 3,
- $context = null)
+ AbstractIO $io)
{
$this->construct_params = func_get_args();
if ($user && $password) {
$login_response = new AMQPWriter();
$login_response->write_table(array("LOGIN" => array('S',$user),
- "PASSWORD" => array('S',$password)));
+ "PASSWORD" => array('S',$password)));
$login_response = substr($login_response->getvalue(),4); //Skip the length
} else {
$login_response = null;
@@ -59,28 +59,8 @@ public function __construct($host, $port,
$this->channel_max = 65535;
$this->frame_max = 131072;
- $errstr = $errno = null;
- $this->sock = null;
-
- //TODO clean up
- if ($context) {
- $remote = sprintf('ssl://%s:%s', $host, $port);
- $this->sock = @stream_socket_client($remote, $errno, $errstr, $connection_timeout, STREAM_CLIENT_CONNECT, $context);
- } else {
- $remote = sprintf('tcp://%s:%s', $host, $port);
- $this->sock = @stream_socket_client($remote, $errno, $errstr, $connection_timeout, STREAM_CLIENT_CONNECT);
- }
-
- if (!$this->sock) {
- throw new AMQPRuntimeException("Error Connecting to server($errno): $errstr ");
- }
-
- if (!stream_set_timeout($this->sock, $read_write_timeout)) {
- throw new \Exception ("Timeout could not be set");
- }
-
- stream_set_blocking($this->sock, 1);
- $this->input = new AMQPReader(null, $this->sock);
+ $this->io = $io;
+ $this->input = new AMQPReader(null, $this->io);
$this->write($this->amqp_protocol_header);
$this->wait(array($this->waitHelper->get_wait('connection.start')));
@@ -89,9 +69,9 @@ public function __construct($host, $port,
$this->wait_tune_ok = true;
while ($this->wait_tune_ok) {
$this->wait(array(
- $this->waitHelper->get_wait('connection.secure'),
- $this->waitHelper->get_wait('connection.tune')
- ));
+ $this->waitHelper->get_wait('connection.secure'),
+ $this->waitHelper->get_wait('connection.tune')
+ ));
}
$host = $this->x_open($vhost,"", $insist);
@@ -125,6 +105,12 @@ public function __destruct()
}
}
}
+
+ public function select($sec, $usec = 0)
+ {
+ return $this->io->select($sec, $usec);
+ }
+
/**
* allows to not close the connection
* it`s useful after the fork when you don`t want to close parent process connection
@@ -138,43 +124,19 @@ public function set_close_on_destruct($close = true)
protected function close_socket()
{
if ($this->debug) {
- MiscHelper::debug_msg("closing socket");
+ MiscHelper::debug_msg("closing socket");
}
- if (is_resource($this->sock)) {
- fclose($this->sock);
- }
- $this->sock = null;
+ $this->io->close();
}
protected function write($data)
{
if ($this->debug) {
- MiscHelper::debug_msg("< [hex]:\n" . MiscHelper::hexdump($data, $htmloutput = false, $uppercase = true, $return = true));
+ MiscHelper::debug_msg("< [hex]:\n" . MiscHelper::hexdump($data, $htmloutput = false, $uppercase = true, $return = true));
}
- $len = strlen($data);
- while (true) {
- if (false === ($written = fwrite($this->sock, $data))) {
- throw new AMQPRuntimeException("Error sending data");
- }
- if ($written === 0) {
- throw new AMQPRuntimeException("Broken pipe or closed connection");
- }
-
- // get status of socket to determine whether or not it has timed out
- $info = stream_get_meta_data($this->sock);
- if($info['timed_out']) {
- throw new AMQPTimeoutException("Error sending data. Socket connection timed out");
- }
-
- $len = $len - $written;
- if ($len > 0) {
- $data = substr($data,0-$len);
- } else {
- break;
- }
- }
+ $this->io->write($data);
}
protected function do_close()
@@ -274,7 +236,7 @@ protected function wait_frame($timeout = 0)
$frame_type = $this->input->read_octet();
$channel = $this->input->read_short();
$size = $this->input->read_long();
- $payload = $this->input->read($size, $timeout);
+ $payload = $this->input->read($size);
$ch = $this->input->read_octet();
} catch(AMQPTimeoutException $e) {
@@ -4,9 +4,9 @@
use PhpAmqpLib\Exception\AMQPTimeoutException;
use PhpAmqpLib\Wire\AMQPDecimal;
-use PhpAmqpLib\Wire\BufferedInput;
use PhpAmqpLib\Exception\AMQPRuntimeException;
use PhpAmqpLib\Exception\AMQPOutOfBoundsException;
+use PhpAmqpLib\Wire\IO\AbstractIO;
/**
@@ -17,26 +17,26 @@
class AMQPReader
{
protected $str;
- protected $sock;
protected $offset;
protected $bitcount;
protected $is64bits;
protected $timeout;
protected $bits;
+ protected $io = null;
/**
* @param string $str
* @param null $sock
* @param int $timeout
*/
- public function __construct($str, $sock = null, $timeout = 0)
+ public function __construct($str, AbstractIO $io = null, $timeout = 0)
{
if (!function_exists("bcmul")) {
throw new AMQPRuntimeException("'bc math' module required");
}
$this->str = $str;
- $this->sock = $sock !== null ? new BufferedInput($sock) : null;
+ $this->io = $io;
$this->offset = 0;
$this->bitcount = $this->bits = 0;
$this->timeout = $timeout;
@@ -49,8 +49,8 @@ public function __construct($str, $sock = null, $timeout = 0)
*/
public function close()
{
- if ($this->sock) {
- $this->sock->close();
+ if($this->io) {
+ $this->io->close();
}
}
@@ -68,23 +68,19 @@ public function read($n)
/**
* Wait until some data is retrieved from the socket.
- *
+ *
* AMQPTimeoutException can be raised if the timeout is set
*
* @throws \PhpAmqpLib\Exception\AMQPTimeoutException
*/
protected function wait()
{
- if ($this->timeout == 0 && $this->sock) {
+ if ($this->timeout == 0) {
return;
}
- $read = array($this->sock->real_sock());
- $write = null;
- $except = null;
-
// wait ..
- $result = stream_select($read, $write, $except, $this->timeout);
+ $result = $this->io->select($this->timeout, 0);
if ($result === false) {
throw new AMQPRuntimeException(sprintf("An error occurs", $this->timeout));
@@ -102,32 +98,12 @@ protected function wait()
* @throws \RuntimeException
* @throws \PhpAmqpLib\Exception\AMQPRuntimeException
*/
- private function rawread($n)
- {
- if ($this->sock) {
- $res = '';
- $read = 0;
-
- while (true) {
- $this->wait();
-
- $buf = fread($this->sock->real_sock(), $n - $read);
-
- if ($buf !== false) {
- $read += strlen($buf);
- $res .= $buf;
- }
-
- if (feof($this->sock->real_sock()) || ($n - $read) === 0) {
- break;
- }
- }
-
- if (strlen($res) != $n) {
- throw new AMQPRuntimeException("Error reading data. Received " .
- strlen($res) . " instead of expected $n bytes");
- }
+ protected function rawread($n)
+ {
+ if ($this->io) {
+ $this->wait();
+ $res = $this->io->read($n);
$this->offset += $n;
} else {
if (strlen($this->str) < $n) {
@@ -412,4 +388,4 @@ public function getTimeout()
{
return $this->timeout;
}
-}
+}
Oops, something went wrong.

0 comments on commit 68a2673

Please sign in to comment.