Permalink
540 lines (464 sloc) 14.1 KB
<?php
namespace PhpAmqpLib\Wire;
use PhpAmqpLib\Exception\AMQPInvalidArgumentException;
use PhpAmqpLib\Exception\AMQPOutOfBoundsException;
use PhpAmqpLib\Exception\AMQPRuntimeException;
use PhpAmqpLib\Exception\AMQPTimeoutException;
use PhpAmqpLib\Exception\AMQPIOWaitException;
use PhpAmqpLib\Helper\MiscHelper;
use PhpAmqpLib\Wire\IO\AbstractIO;
/**
* This class can read from a string or from a stream
*
* TODO : split this class: AMQPStreamReader and a AMQPBufferReader
*/
class AMQPReader extends AbstractClient
{
const BIT = 1;
const OCTET = 1;
const SHORTSTR = 1;
const SHORT = 2;
const LONG = 4;
const SIGNED_LONG = 4;
const READ_PHP_INT = 4; // use READ_ to avoid possible clashes with PHP
const LONGLONG = 8;
const TIMESTAMP = 8;
/** @var string */
protected $str;
/** @var int */
protected $str_length;
/** @var int */
protected $offset;
/** @var int */
protected $bitcount;
/** @var bool */
protected $is64bits;
/** @var int */
protected $timeout;
/** @var int */
protected $bits;
/** @var \PhpAmqpLib\Wire\IO\AbstractIO */
protected $io;
/**
* @param string $str
* @param AbstractIO $io
* @param int $timeout
*/
public function __construct($str, AbstractIO $io = null, $timeout = 0)
{
parent::__construct();
$this->str = is_string($str) ? $str : '';
$this->str_length = mb_strlen($this->str, 'ASCII');
$this->io = $io;
$this->offset = 0;
$this->bitcount = $this->bits = 0;
$this->timeout = $timeout;
}
/**
* Resets the object from the injected param
*
* Used to not need to create a new AMQPReader instance every time.
* when we can just pass a string and reset the object state.
* NOTE: since we are working with strings we don't need to pass an AbstractIO
* or a timeout.
*
* @param string $str
*/
public function reuse($str)
{
$this->str = $str;
$this->str_length = mb_strlen($this->str, 'ASCII');
$this->offset = 0;
$this->bitcount = $this->bits = 0;
}
/**
* Closes the stream
*/
public function close()
{
if ($this->io) {
$this->io->close();
}
}
/**
* @param int $n
* @return string
*/
public function read($n)
{
$this->bitcount = $this->bits = 0;
return $this->rawread($n);
}
/**
* Waits until some data is retrieved from the socket.
*
* AMQPTimeoutException can be raised if the timeout is set
*
* @throws \PhpAmqpLib\Exception\AMQPIOWaitException
* @throws \PhpAmqpLib\Exception\AMQPTimeoutException
*/
protected function wait()
{
if ($this->getTimeout() == 0) {
return null;
}
// wait ..
list($sec, $usec) = MiscHelper::splitSecondsMicroseconds($this->getTimeout());
$result = $this->io->select($sec, $usec);
if ($result === false) {
throw new AMQPIOWaitException('A network error occured while awaiting for incoming data');
}
if ($result === 0) {
throw new AMQPTimeoutException(sprintf(
'The connection timed out after %s sec while awaiting incoming data',
$this->getTimeout()
));
}
}
/**
* @param int $n
* @return string
* @throws \RuntimeException
* @throws \PhpAmqpLib\Exception\AMQPRuntimeException
*/
protected function rawread($n)
{
if ($this->io) {
$this->wait();
$res = $this->io->read($n);
$this->offset += $n;
return $res;
}
if ($this->str_length < $n) {
throw new AMQPRuntimeException(sprintf(
'Error reading data. Requested %s bytes while string buffer has only %s',
$n,
$this->str_length
));
}
$res = mb_substr($this->str, 0, $n, 'ASCII');
$this->str = mb_substr($this->str, $n, mb_strlen($this->str, 'ASCII') - $n, 'ASCII');
$this->str_length -= $n;
$this->offset += $n;
return $res;
}
/**
* @return bool
*/
public function read_bit()
{
if (empty($this->bitcount)) {
$this->bits = ord($this->rawread(1));
$this->bitcount = 8;
}
$result = ($this->bits & 1) == 1;
$this->bits >>= 1;
$this->bitcount -= 1;
return $result;
}
/**
* @return mixed
*/
public function read_octet()
{
$this->bitcount = $this->bits = 0;
list(, $res) = unpack('C', $this->rawread(1));
return $res;
}
/**
* @return mixed
*/
public function read_signed_octet()
{
$this->bitcount = $this->bits = 0;
list(, $res) = unpack('c', $this->rawread(1));
return $res;
}
/**
* @return mixed
*/
public function read_short()
{
$this->bitcount = $this->bits = 0;
list(, $res) = unpack('n', $this->rawread(2));
return $res;
}
/**
* @return mixed
*/
public function read_signed_short()
{
$this->bitcount = $this->bits = 0;
list(, $res) = unpack('s', $this->correctEndianness($this->rawread(2)));
return $res;
}
/**
* Reads 32 bit integer in big-endian byte order.
*
* On 64 bit systems it will return always unsigned int
* value in 0..2^32 range.
*
* On 32 bit systems it will return signed int value in
* -2^31...+2^31 range.
*
* Use with caution!
*/
public function read_php_int()
{
list(, $res) = unpack('N', $this->rawread(4));
if ($this->is64bits) {
return (int) sprintf('%u', $res);
}
return $res;
}
/**
* PHP does not have unsigned 32 bit int,
* so we return it as a string
*
* @return string
*/
public function read_long()
{
$this->bitcount = $this->bits = 0;
list(, $res) = unpack('N', $this->rawread(4));
return empty($this->is64bits) && self::getLongMSB($res) ? sprintf('%u', $res) : $res;
}
/**
* @return integer
*/
private function read_signed_long()
{
$this->bitcount = $this->bits = 0;
list(, $res) = unpack('l', $this->correctEndianness($this->rawread(4)));
return $res;
}
/**
* Even on 64 bit systems PHP integers are singed.
* Since we need an unsigned value here we return it
* as a string.
*
* @return string
*/
public function read_longlong()
{
$this->bitcount = $this->bits = 0;
list(, $hi, $lo) = unpack('N2', $this->rawread(8));
$msb = self::getLongMSB($hi);
if (empty($this->is64bits)) {
if ($msb) {
$hi = sprintf('%u', $hi);
}
if (self::getLongMSB($lo)) {
$lo = sprintf('%u', $lo);
}
}
return bcadd($this->is64bits && !$msb ? $hi << 32 : bcmul($hi, '4294967296', 0), $lo, 0);
}
/**
* @return string
*/
public function read_signed_longlong()
{
$this->bitcount = $this->bits = 0;
list(, $hi, $lo) = unpack('N2', $this->rawread(8));
if ($this->is64bits) {
return bcadd($hi << 32, $lo, 0);
} else {
return bcadd(bcmul($hi, '4294967296', 0), self::getLongMSB($lo) ? sprintf('%u', $lo) : $lo, 0);
}
}
/**
* @param int $longInt
* @return bool
*/
private static function getLongMSB($longInt)
{
return (bool) ($longInt & 0x80000000);
}
/**
* Read a utf-8 encoded string that's stored in up to
* 255 bytes. Return it decoded as a PHP unicode object.
*/
public function read_shortstr()
{
$this->bitcount = $this->bits = 0;
list(, $slen) = unpack('C', $this->rawread(1));
return $this->rawread($slen);
}
/**
* Read a string that's up to 2**32 bytes, the encoding
* isn't specified in the AMQP spec, so just return it as
* a plain PHP string.
*/
public function read_longstr()
{
$this->bitcount = $this->bits = 0;
$slen = $this->read_php_int();
if ($slen < 0) {
throw new AMQPOutOfBoundsException('Strings longer than supported on this platform');
}
return $this->rawread($slen);
}
/**
* Read and AMQP timestamp, which is a 64-bit integer representing
* seconds since the Unix epoch in 1-second resolution.
*/
public function read_timestamp()
{
return $this->read_longlong();
}
/**
* Read an AMQP table, and return as a PHP array. keys are strings,
* values are (type,value) tuples.
*
* @param bool $returnObject Whether to return AMQPArray instance instead of plain array
* @return array|AMQPTable
*/
public function read_table($returnObject = false)
{
$this->bitcount = $this->bits = 0;
$tlen = $this->read_php_int();
if ($tlen < 0) {
throw new AMQPOutOfBoundsException('Table is longer than supported');
}
$table_data = new AMQPReader($this->rawread($tlen), null);
$result = $returnObject ? new AMQPTable() : array();
while ($table_data->tell() < $tlen) {
$name = $table_data->read_shortstr();
$ftype = AMQPAbstractCollection::getDataTypeForSymbol($ftypeSym = $table_data->rawread(1));
$val = $table_data->read_value($ftype, $returnObject);
$returnObject ? $result->set($name, $val, $ftype) : $result[$name] = array($ftypeSym, $val);
}
return $result;
}
/**
* @return array|AMQPTable
*/
public function read_table_object()
{
return $this->read_table(true);
}
/**
* Reads the array in the next value.
*
* @param bool $returnObject Whether to return AMQPArray instance instead of plain array
* @return array|AMQPArray
*/
public function read_array($returnObject = false)
{
$this->bitcount = $this->bits = 0;
// Determine array length and its end position
$arrayLength = $this->read_php_int();
$endOffset = $this->offset + $arrayLength;
$result = $returnObject ? new AMQPArray() : array();
// Read values until we reach the end of the array
while ($this->offset < $endOffset) {
$fieldType = AMQPAbstractCollection::getDataTypeForSymbol($this->rawread(1));
$fieldValue = $this->read_value($fieldType, $returnObject);
$returnObject ? $result->push($fieldValue, $fieldType) : $result[] = $fieldValue;
}
return $result;
}
/**
* @return array|AMQPArray
*/
public function read_array_object()
{
return $this->read_array(true);
}
/**
* Reads the next value as the provided field type.
*
* @param int $fieldType One of AMQPAbstractCollection::T_* constants
* @param bool $collectionsAsObjects Description
* @return mixed
* @throws \PhpAmqpLib\Exception\AMQPRuntimeException
*/
public function read_value($fieldType, $collectionsAsObjects = false)
{
$this->bitcount = $this->bits = 0;
switch ($fieldType) {
case AMQPAbstractCollection::T_INT_SHORTSHORT:
//according to AMQP091 spec, 'b' is not bit, it is short-short-int, also valid for rabbit/qpid
//$val=$this->read_bit();
$val = $this->read_signed_octet();
break;
case AMQPAbstractCollection::T_INT_SHORTSHORT_U:
$val = $this->read_octet();
break;
case AMQPAbstractCollection::T_INT_SHORT:
$val = $this->read_signed_short();
break;
case AMQPAbstractCollection::T_INT_SHORT_U:
$val = $this->read_short();
break;
case AMQPAbstractCollection::T_INT_LONG:
$val = $this->read_signed_long();
break;
case AMQPAbstractCollection::T_INT_LONG_U:
$val = $this->read_long();
break;
case AMQPAbstractCollection::T_INT_LONGLONG:
$val = $this->read_signed_longlong();
break;
case AMQPAbstractCollection::T_INT_LONGLONG_U:
$val = $this->read_longlong();
break;
case AMQPAbstractCollection::T_DECIMAL:
$e = $this->read_octet();
$n = $this->read_signed_long();
$val = new AMQPDecimal($n, $e);
break;
case AMQPAbstractCollection::T_TIMESTAMP:
$val = $this->read_timestamp();
break;
case AMQPAbstractCollection::T_BOOL:
$val = $this->read_octet();
break;
case AMQPAbstractCollection::T_STRING_SHORT:
$val = $this->read_shortstr();
break;
case AMQPAbstractCollection::T_STRING_LONG:
$val = $this->read_longstr();
break;
case AMQPAbstractCollection::T_ARRAY:
$val = $this->read_array($collectionsAsObjects);
break;
case AMQPAbstractCollection::T_TABLE:
$val = $this->read_table($collectionsAsObjects);
break;
case AMQPAbstractCollection::T_VOID:
$val = null;
break;
default:
throw new AMQPInvalidArgumentException(sprintf(
'Unsupported type "%s"',
$fieldType
));
}
return isset($val) ? $val : null;
}
/**
* @return int
*/
protected function tell()
{
return $this->offset;
}
/**
* Sets the timeout (second)
*
* @param int $timeout
*/
public function setTimeout($timeout)
{
$this->timeout = $timeout;
}
/**
* @return int
*/
public function getTimeout()
{
return $this->timeout;
}
}