Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

treat system events and data events differently, began implementing S…

…endClient
  • Loading branch information...
commit 2dcd80f51e2c7334cb481fa3359eafe3164022f1 1 parent 537dc6a
max authored
View
42 src/FuseSource/Stomp/AbstractStompClient.php
@@ -49,18 +49,6 @@
*/
abstract class AbstractStompClient
{
- protected $_hosts = array();
- protected $_params = array();
- protected $_subscriptions = array();
-
- protected $_username = '';
- protected $_password = '';
- protected $_read_timeout_seconds = 60;
- protected $_read_timeout_milliseconds = 0;
- protected $_connect_timeout_seconds = 60;
-
-
-
protected $brokerUri;
protected $options;
protected $logger;
@@ -69,21 +57,14 @@
protected $_socket;
protected $_sessionId;
-
- /**
- * Constructor
- *
- * @param string $brokerUri Broker URL
- * @throws StompException
- */
public function __construct($uriString, array $options = [])
{
$defaultOptions = [
- 'username' => null,
- 'password' => null,
- 'clientId' => null,
- 'prefetchSize' => 1,
- 'readTimeout'
+ 'username' => null,
+ 'password' => null,
+ 'clientId' => null,
+ 'prefetchSize' => 1,
+ 'connectTimeout' => 60,
];
$this->brokerUri = new Uri($uriString);
@@ -98,23 +79,20 @@ public function setLogger(Logger $logger)
return $this;
}
- /**
- * Make socket connection to the server
- *
- * @throws StompException
- */
protected function openSocket()
{
$connected = false;
$connectionAttempts = 0;
- while (!$connected && $connectionAttempts < $this->_attempts) {
+ while (!$connected && $connectionAttempts < $this->_attempts) {
+ $connectionErrorNumber = $connectionError = null;
+
$this->_socket = @fsockopen(
'tcp://' . $this->brokerUri->getHost(),
$this->brokerUri->getPort(),
$connectionErrorNumber,
$connectionError,
- $this->_connect_timeout_seconds
+ $this->options['connectTimeout']
);
if (is_resource($this->_socket)) {
@@ -156,7 +134,7 @@ protected function _writeFrame(Frame $frame)
public function disconnect()
{
- $this->logger->debug('Shutting down gracefully');
+ $this->logger->info('Shutting down gracefully');
$headers = [];
View
11 src/FuseSource/Stomp/Event/EventType.php → ...useSource/Stomp/Event/SystemEventType.php
@@ -21,11 +21,20 @@
*
*/
-final class EventType
+final class SystemEventType
{
const FRAME_CONNECTED = 'connected';
const FRAME_ERROR = 'error';
const TRANSPORT_ERROR = '__transport/error';
+
+ public static function getValidEventTypes()
+ {
+ return [
+ self::FRAME_CONNECTED,
+ self::FRAME_ERROR,
+ self::TRANSPORT_ERROR,
+ ];
+ }
}
View
52 src/FuseSource/Stomp/ReceiveClient.php
@@ -6,7 +6,7 @@
use FuseSource\Stomp\Exception\FrameException;
use FuseSource\Stomp\Value\Uri;
use FuseSource\Stomp\Value\Frame;
-use FuseSource\Stomp\Event\EventType;
+use FuseSource\Stomp\Event\SystemEventType;
use FuseSource\Stomp\Event\FrameEvent;
use FuseSource\Stomp\Event\ErrorEvent;
use Symfony\Component\EventDispatcher\EventDispatcher;
@@ -81,7 +81,7 @@ protected function startEventLoop()
$errorCallback = function($buf, $what, $arg) {
$this->logger->debug('Error callback triggered', array('what' => $what, 'arg' => $arg));
- $this->dispatcher->dispatch(EventType::TRANSPORT_ERROR, new ErrorEvent($what));
+ $this->dispatcher->dispatch(SystemEventType::TRANSPORT_ERROR, new ErrorEvent($what));
};
$this->logger->info('Starting event loop');
@@ -95,10 +95,10 @@ protected function startEventLoop()
event_base_loop($this->base);
}
- public function addListener($eventName, callable $listener)
+ public function addDataListener($eventName, callable $listener)
{
if (!preg_match('#^\/(queue|topic|temp-queue|temp-topic)\/#i', $eventName)) {
- throw new InvalidArgumentException('Event Name for data listener must begin with one of /queue/ /topic/ /temp-queue/ /temp-topic/');
+ throw new InvalidArgumentException(sprintf('Event Name for data listener must begin with one of /queue/ /topic/ /temp-queue/ /temp-topic/ %s given', $eventName));
}
$this->dataListeners[] = [$eventName, $listener];
@@ -106,31 +106,35 @@ public function addListener($eventName, callable $listener)
return $this;
}
+ public function addSystemListener($eventName, callable $listener)
+ {
+ if (!in_array($eventName, SystemEventType::getValidEventTypes(), true)) {
+ throw new InvalidArgumentException(sprintf('Unknown system event %s given', $eventName));
+ }
+
+ $this->dispatcher->addListener($eventName, $listener);
+
+ return $this;
+ }
+
public function unsubscribe($eventName, callable $listener)
{
- // @TODO implement this
+ $this->dispatcher->removeListener($eventName, $listener);
+
if (preg_match('#^\/(queue|topic|temp-queue|temp-topic)\/#i', $eventName)) {
+ foreach ($this->dataListeners as $dataListener) {
+ list($theEventName, $theListener) = $dataListener;
- $headers = array();
- if (isset($properties)) {
- foreach ($properties as $name => $value) {
- $headers[$name] = $value;
+ if ($theEventName !== $eventName || $dataListener !== $listener) {
+ continue;
}
+
+ $frame = Frame::createNew('UNSUBSCRIBE', ['destination' => $eventName]);
+ $this->_writeFrame($frame);
}
- $headers['destination'] = $destination;
- $frame = new Frame('UNSUBSCRIBE', $headers);
- $this->_prepareReceipt($frame, $sync);
- $this->_writeFrame($frame);
}
- $this->dispatcher->removeListener($eventName, $listener);
-
- if ($this->_waitForReceipt($frame, $sync) == true) {
- unset($this->_subscriptions[$destination]);
- return true;
- } else {
- return false;
- }
+ return $this;
}
protected function addPreRegisteredListeners()
@@ -185,20 +189,20 @@ public function listen()
$this->openSocket();
- $this->dispatcher->addListener(EventType::FRAME_CONNECTED, function(FrameEvent $event) {
+ $this->addSystemListener(SystemEventType::FRAME_CONNECTED, function(FrameEvent $event) {
$this->logger->info('Successfully connected to server');
$this->_sessionId = $event->getFrame()->getHeaders()['session'];
$this->addPreRegisteredListeners();
});
- $this->dispatcher->addListener(EventType::FRAME_ERROR, function(FrameEvent $event) {
+ $this->addSystemListener(SystemEventType::FRAME_ERROR, function(FrameEvent $event) {
$this->logger->info('Got frame error');
throw new FrameException('Frame error', 0, null, $event->getFrame());
});
- $this->dispatcher->addListener(EventType::TRANSPORT_ERROR, function(ErrorEvent $event) {
+ $this->addSystemListener(SystemEventType::TRANSPORT_ERROR, function(ErrorEvent $event) {
$this->logger->info('Got transport error');
throw new TransportException($event->getMessage());
View
44 src/FuseSource/Stomp/SendClient.php
@@ -35,47 +35,15 @@
class SendClient extends AbstractStompClient
{
-
- /**
- * Send a message to a destination in the messaging system
- *
- * @param string $destination Destination queue
- * @param string|Frame $msg Message
- * @param array $properties
- * @param boolean $sync Perform request synchronously
- * @return boolean
- */
- public function send($destination, $msg, $properties = [], $sync = null)
+ public function send($destination, $msg, $properties = [], $waitForReceipt = false)
{
- $headers = $properties;
- $headers['destination'] = $destination;
+ $headers = array_merge(['destination' => $destination], $properties);
- $frame = Frame::createNew('SEND', $headers, $msg);
+ $frame = Frame::createNew('SEND', $headers, $msg, $waitForReceipt);
- $this->_prepareReceipt($frame, $sync);
$this->_writeFrame($frame);
- return $this->_waitForReceipt($frame, $sync);
- }
-
- /**
- * Prepair frame receipt
- *
- * @param Frame $frame
- * @param boolean $sync
- */
- protected function _prepareReceipt(Frame $frame, $sync)
- {
- $receive = $this->sync;
-
- if ($sync !== null) {
- $receive = $sync;
- }
-
- if ($receive == true) {
- $frame->headers['receipt'] = md5(microtime());
- }
+ return $this->_waitForReceipt($frame);
}
-
/**
* Wait for receipt
*
@@ -86,10 +54,6 @@ protected function _prepareReceipt(Frame $frame, $sync)
*/
protected function _waitForReceipt(Frame $frame, $sync)
{
- $receive = $this->sync;
- if ($sync !== null) {
- $receive = $sync;
- }
if ($receive == true) {
$id = (isset($frame->headers['receipt'])) ? $frame->headers['receipt'] : null;
if ($id == null) {
View
34 src/FuseSource/Stomp/Value/Frame.php
@@ -2,7 +2,7 @@
namespace FuseSource\Stomp\Value;
-use FuseSource\Stomp\Event\EventType;
+use FuseSource\Stomp\Event\SystemEventType;
/**
*
@@ -34,19 +34,14 @@ class Frame
private $command;
private $headers = [];
private $body;
-
- /**
- * Constructor
- *
- * @param string $command
- * @param array $headers
- * @param string $body
- */
- private function __construct($command, array $headers, $body)
+ private $waitForReceipt;
+
+ private function __construct($command, array $headers, $body, $waitForReceipt)
{
$this->command = $command;
$this->headers = $headers;
$this->body = $body;
+ $this->waitForReceipt = $waitForReceipt;
}
public static function unserializeFrom($data)
@@ -67,22 +62,26 @@ public static function unserializeFrom($data)
}
}
- return new static($command, $headers, $body);
+ return new static($command, $headers, $body, false);
}
- public static function createNew($command = null, $headers = null, $body = null)
+ public static function createNew($command = null, $headers = null, $body = null, $waitForReceipt = false)
{
- return new static($command, (array) $headers, $body);
+ if ($waitForReceipt) {
+ $headers['receipt'] = md5(microtime());
+ }
+
+ return new static($command, (array) $headers, $body, $waitForReceipt);
}
public function getEventName()
{
if ($this->command === 'CONNECTED') {
- return EventType::FRAME_CONNECTED;
+ return SystemEventType::FRAME_CONNECTED;
}
if ($this->command === 'ERROR') {
- return EventType::FRAME_ERROR;
+ return SystemEventType::FRAME_ERROR;
}
return $this->headers['destination'];
@@ -103,6 +102,11 @@ public function getBody()
{
return $this->body;
}
+
+ public function waitForReceipt()
+ {
+ return $this->waitForReceipt;
+ }
/**
* Convert frame to transportable string
Please sign in to comment.
Something went wrong with that request. Please try again.