Permalink
Browse files

Added Registry, and made some progress towards making persistent conn…

…ections equivalent to their non-persistent counterparts.
  • Loading branch information...
1 parent 12ea824 commit 1a0af09e149d9d1d5a540bd25eca25afea30bd16 @boenrobot boenrobot committed Apr 7, 2012
@@ -2,4 +2,13 @@
<project-private xmlns="http://www.netbeans.org/ns/project-private/1">
<coverage xmlns="http://www.netbeans.org/ns/code-coverage/1" enabled="true"/>
<editor-bookmarks xmlns="http://www.netbeans.org/ns/editor-bookmarks/1"/>
+ <open-files xmlns="http://www.netbeans.org/ns/projectui-open-files/1">
+ <file>file:/D:/Projects/PEAR2_Net_RouterOS.git/src/PEAR2/Net/RouterOS/SocketException.php</file>
+ <file>file:/D:/Projects/PEAR2_Net_RouterOS.git/src/PEAR2/Net/RouterOS/Client.php</file>
+ <file>file:/D:/Projects/PEAR2_Net_RouterOS.git/src/PEAR2/Net/RouterOS/Communicator.php</file>
+ <file>file:/D:/Projects/PEAR2_Net_RouterOS.git/src/PEAR2/Net/RouterOS/Query.php</file>
+ <file>file:/D:/Projects/PEAR2_Net_RouterOS.git/src/PEAR2/Net/RouterOS/Request.php</file>
+ <file>file:/D:/Projects/PEAR2_Net_RouterOS.git/src/PEAR2/Net/RouterOS/Registry.php</file>
+ <file>file:/D:/Projects/PEAR2_Net_RouterOS.git/src/PEAR2/Net/RouterOS/PersistentRegistry.php</file>
+ </open-files>
</project-private>
@@ -82,10 +82,10 @@ class Client
protected $callbacks = array();
/**
- * @var SHM Handler for shared response buffer. Remains NULL when the
- * connection is not a persistent one.
+ * @var Registry A registry for the operations. Particularly helpful at
+ * persistent connections.
*/
- protected $shmHandler = null;
+ protected $registry = null;
/**
* @var bool Whether to stream future responses.
@@ -118,20 +118,21 @@ public function __construct($host, $username, $password = '', $port = 8728,
$host, $port, $persist, $timeout, $username, $context
);
//Login the user if necessary
- if ($this->com->getTransmitter()->isFresh()) {
+ if ((!$persist
+ || 0 == $this->com->getTransmitter()->lock(T\Stream::DIRECTION_ALL))
+ && $this->com->getTransmitter()->isFresh()
+ ) {
if (!static::login($this->com, $username, $password)) {
$this->com->close();
throw new DataFlowException(
'Invalid username or password supplied.', 10000
);
}
+ $this->com->getTransmitter()->lock(T\Stream::DIRECTION_NONE, true);
}
if ($persist) {
- $this->shmHandler = new SHM(
- 'PEAR2\Net\RouterOS\Client tcp://' .
- "{$host}:{$port}/{$username}"
- );
+ $this->registry = new Registry("{$host}:{$port}/{$username}");
}
}
@@ -155,7 +156,7 @@ public function __invoke($arg = null)
if (is_int($arg) || is_double($arg)) {
return $this->loop($arg);
} elseif ($arg instanceof Request) {
- return $arg->getTag() === null ? $this->sendSync($arg)
+ return '' == $arg->getTag() ? $this->sendSync($arg)
: $this->sendAsync($arg);
} elseif (null === $arg) {
return $this->completeRequest();
@@ -523,16 +524,37 @@ public function cancelRequest($tag = null)
{
$cancelRequest = new Request('/cancel');
$hasTag = !('' == $tag);
+ $hasReg = null !== $this->registry;
+ if ($hasReg && !$hasTag) {
+ $tags = array_merge(
+ array_keys($this->responseBuffer), array_keys($this->callbacks)
+ );
+ foreach ($tags as $t) {
+ $this->cancelRequest($t);
+ }
+ return $this;
+ }
+
if ($hasTag) {
if ($this->isRequestActive($tag)) {
- $cancelRequest->setArgument('tag', $tag);
+ if ($hasReg) {
+ $cancelRequest->setArgument(
+ 'tag', $this->registry->getOwnershipTag() . $tag
+ );
+ } else {
+ $cancelRequest->setArgument('tag', $tag);
+ }
} else {
throw new DataFlowException(
'No such request. Canceling aborted.', 11200
);
}
}
+
+ $regBackup = $this->registry;
+ $this->registry = null;
$this->sendSync($cancelRequest);
+ $this->registry = $regBackup;
if ($hasTag) {
if ($this->isRequestActive($tag, self::FILTER_BUFFER)) {
@@ -603,8 +625,21 @@ public function close()
}
$this->callbacks = array();
$this->pendingRequestsCount = 0;
+ if (null !== $this->registry) {
+ $this->registry->close();
+ }
return $result;
}
+
+ /**
+ * Closes the connection, unless it's a persistent one.
+ */
+ public function __destruct()
+ {
+ if (!$this->com->getTransmitter()->isPersistent()) {
+ $this->close();
+ }
+ }
/**
* Sends a request to RouterOS.
@@ -617,7 +652,7 @@ public function close()
*/
protected function send(Request $request)
{
- $request->send($this->com);
+ $request->send($this->com, $this->registry);
$this->pendingRequestsCount++;
return $this;
}
@@ -632,7 +667,9 @@ protected function send(Request $request)
*/
protected function dispatchNextResponse()
{
- $response = new Response($this->com, $this->_streamingResponses);
+ $response = new Response(
+ $this->com, $this->_streamingResponses, $this->registry
+ );
if ($response->getType() === Response::TYPE_FATAL) {
$this->pendingRequestsCount = 0;
$this->com->close();
@@ -0,0 +1,220 @@
+<?php
+
+/**
+ * ~~summary~~
+ *
+ * ~~description~~
+ *
+ * PHP version 5
+ *
+ * @category Net
+ * @package PEAR2_Net_RouterOS
+ * @author Vasil Rangelov <boen.robot@gmail.com>
+ * @copyright 2011 Vasil Rangelov
+ * @license http://www.gnu.org/copyleft/lesser.html LGPL License 2.1
+ * @version GIT: $Id$
+ * @link http://pear2.php.net/PEAR2_Net_RouterOS
+ */
+/**
+ * The namespace declaration.
+ */
+namespace PEAR2\Net\RouterOS;
+
+/**
+ * Uses shared memory to keep responses in.
+ */
+use PEAR2\Cache\SHM;
+
+/**
+ * A RouterOS registry.
+ *
+ * Provides functionality for managing the request/response flow. Particularly
+ * useful in persistent connections.
+ *
+ * @category Net
+ * @package PEAR2_Net_RouterOS
+ * @author Vasil Rangelov <boen.robot@gmail.com>
+ * @license http://www.gnu.org/copyleft/lesser.html LGPL License 2.1
+ * @link http://pear2.php.net/PEAR2_Net_RouterOS
+ */
+class Registry
+{
+ /**
+ * @var SHM The storage.
+ */
+ protected $shm;
+
+ /**
+ * @var int ID of request. Populated at first instance in request.
+ */
+ protected static $requestId = -1;
+
+ /**
+ * @var int ID to be given to next instance, after incrementing it.
+ */
+ protected static $instanceIdSeed = -1;
+
+ /**
+ * @var int ID of instance within the request.
+ */
+ protected $instanceId;
+
+ /**
+ * Creates a registry.
+ *
+ * @param string $uri An URI to bind the registry to.
+ */
+ public function __construct($uri)
+ {
+ $this->shm = new SHM('PEAR2\Net\RouterOS\Registry ' . $uri);
+ if (-1 === self::$requestId) {
+ self::$requestId = $this->shm->add('requestId', 0)
+ ? 0 : $this->shm->inc('requestId');
+ }
+ $this->instanceId = ++self::$instanceIdSeed;
+ $this->shm->add('responseBuffer_' . $this->getOwnershipTag(), array());
+ }
+
+ /**
+ * Parses a tag.
+ *
+ * Parses a tag to reveal the ownership part of it, and the original tag.
+ *
+ * @param string $tag The tag (as received) to parse.
+ *
+ * @return array An array with the first member being the ownership tag, and
+ * the second one being the original tag.
+ */
+ public static function parseTag($tag)
+ {
+ if (null === $tag) {
+ return array(null, null);
+ }
+ $result = explode('__', $tag, 2);
+ $result[0] .= '__';
+ if ('' === $result[1]) {
+ $result[1] = null;
+ }
+ return $result;
+ }
+
+ /**
+ * Get the ownership tag for this instance.
+ *
+ * @return string The ownership tag for this registry instance.
+ */
+ public function getOwnershipTag()
+ {
+ return self::$requestId . '_' . $this->instanceId . '__';
+ }
+
+ /**
+ * Add a response to the registry.
+ *
+ * @param Response $response The response to add. The caller of this
+ * function is responsible for ensuring that the ownership tag and the
+ * original tag are separated, so that only the original one remains in the
+ * response.
+ * @param string $ownershipTag The ownership tag that the response had.
+ *
+ * @return boolean TRUE if the request was added to its buffer, FALSE if
+ * this instance owns the response, and therefore doesn't need to add the
+ * response to its buffer.
+ */
+ public function add(Response $response, $ownershipTag)
+ {
+ if ($this->getOwnershipTag() === $ownershipTag) {
+ return false;
+ }
+
+ if (null === $ownershipTag) {
+ foreach ($this->shm->getIterator('/^(responseBuffer\_)/', true)
+ as $targetBufferName) {
+ $this->_add($response, $targetBufferName);
+ }
+ return true;
+ }
+
+ $this->_add($response, 'responseBuffer_' . $ownershipTag);
+ return true;
+ }
+
+ /**
+ * Adds a response to a buffer.
+ *
+ * @param Response $response The response to add.
+ * @param type $targetBufferName The name of the buffer to add the
+ * response to.
+ *
+ * @return void
+ */
+ private function _add(Response $response, $targetBufferName)
+ {
+ if ($this->shm->lock($targetBufferName)) {
+ $targetBuffer = $this->shm->get($targetBufferName);
+ $targetBuffer[] = $response;
+ $this->shm->set($targetBufferName, $targetBuffer);
+ $this->shm->unlock($targetBufferName);
+ }
+ }
+
+ /**
+ * Gets the next response from this instance's buffer.
+ *
+ * @return Response|null The next response, or NULL if there isn't one.
+ */
+ public function getNextResponse()
+ {
+ $response = null;
+ $targetBufferName = 'responseBuffer_' . $this->getOwnershipTag();
+ if ($this->shm->exists($targetBufferName)
+ && $this->shm->lock($targetBufferName)
+ ) {
+ $targetBuffer = $this->shm->get($targetBufferName);
+ if (!empty($targetBuffer)) {
+ $response = array_shift($targetBuffer);
+ $this->shm->set($targetBufferName, $targetBuffer);
+ }
+ $this->shm->unlock($targetBufferName);
+ }
+ return $response;
+ }
+
+ /**
+ * Closes the registry.
+ *
+ * Closes the registry, meaning that all buffers are cleared.
+ *
+ * @return void
+ */
+ public function close()
+ {
+ foreach ($this->shm->getIterator('/^(responseBuffer\_)/', true)
+ as $targetBufferName) {
+ $this->_close($targetBufferName);
+ }
+ }
+
+ /**
+ * Removes a buffer.
+ *
+ * @param string $targetBufferName The buffer to remove.
+ *
+ * @return void
+ */
+ private function _close($targetBufferName)
+ {
+ if ($this->shm->lock($targetBufferName)) {
+ $this->shm->delete($targetBufferName);
+ $this->shm->unlock($targetBufferName);
+ }
+ }
+
+ /**
+ * Removes this instance's buffer.
+ */
+ public function __destruct()
+ {
+ $this->_close('responseBuffer_' . $this->getOwnershipTag());
+ }
+}
@@ -237,13 +237,21 @@ public function removeAllArguments()
* Sends a request over a communicator.
*
* @param Communicator $com The communicator to send the request over.
+ * @param Registry $reg An optional registry to sync the request with.
*
* @return int The number of bytes sent.
* @see Client::sendSync()
* @see Client::sendAsync()
*/
- public function send(Communicator $com)
+ public function send(Communicator $com, Registry $reg = null)
{
+ if (null !== $reg) {
+ $originalTag = $this->getTag();
+ $this->setTag($reg->getOwnershipTag() . $originalTag);
+ $bytes = $this->send($com);
+ $this->setTag($originalTag);
+ return $bytes;
+ }
if ($com->getTransmitter()->isPersistent()) {
$old = $com->getTransmitter()->lock(T\Stream::DIRECTION_SEND);
$bytes = $this->_send($com);
Oops, something went wrong.

0 comments on commit 1a0af09

Please sign in to comment.