Permalink
Browse files

Initial locking support.

  • Loading branch information...
1 parent 06e6ab0 commit 616926c9356f89dcd67ee106fe4687f974413e5a @boenrobot boenrobot committed Feb 1, 2012
@@ -36,17 +36,21 @@
class Stream
{
/**
- * Used to apply settings to both sending and receiving.
+ * Used to stop settings in either direction being applied.
+ */
+ const DIRECTION_NONE = 0;
+ /**
+ * Used to apply settings only to receiving.
*/
- const DIRECTION_ALL = '|||';
+ const DIRECTION_RECEIVE = 1;
/**
* Used to apply settings only to sending.
*/
- const DIRECTION_SEND = '<<<';
+ const DIRECTION_SEND = 2;
/**
- * Used to apply settings only to receiving.
+ * Used to apply settings to both sending and receiving.
*/
- const DIRECTION_RECEIVE = '>>>';
+ const DIRECTION_ALL = 3;
/**
* @var resource The stream to wrap around.
@@ -21,6 +21,11 @@
namespace PEAR2\Net\Transmitter;
/**
+ * Used for managing persistent connections.
+ */
+use PEAR2\Cache\SHM;
+
+/**
* A socket transmitter.
*
* This is a convinience wrapper for socket functionality. Used to ensure data
@@ -45,9 +50,16 @@ class TcpClient extends NetworkStream
*/
protected $error_str = null;
- protected $persistentId = null;
-
+ /**
+ * @var SHM Persistent connection handler. Remains NULL for non-persistent
+ * connections.
+ */
protected $persistentHandler = null;
+
+ /**
+ * @var int A bitmask with the locked directions.
+ */
+ protected $lockState = self::DIRECTION_NONE;
/**
* Creates a new connection with the specified options.
@@ -85,27 +97,22 @@ public function __construct($host, $port, $persist = false,
) {
throw $this->createException('Invalid context supplied.', 6);
}
-
+ $uri = "tcp://{$host}:{$port}/{$key}";
try {
- $uri = "tcp://{$host}:{$port}/{$key}";
parent::__construct(
@stream_socket_client(
- $uri, $this->error_no,
- $this->error_str, $timeout, $flags, $context
+ $uri, $this->error_no, $this->error_str,
+ $timeout, $flags, $context
)
);
- $this->persistentId
- = str_replace(
- array('!' , '|', '/', '\\', '<', '>', '?', '*', '"'),
- array('~!', '!', '!', '!' , '!', '!', '!', '!', '!'),
- __NAMESPACE__ . '\TcpClient ' . $uri
- ) . ' ';
- if (version_compare(phpversion('wincache'), '1.1.0', '>=')) {
- $this->persistentHandler = 'wincache';
- }
} catch (\Exception $e) {
throw $this->createException('Failed to initialize socket.', 7);
}
+ if ($persist) {
+ $this->persistentHandler = new SHM(
+ 'PEAR2\Net\Transmitter\TcpClient ' . $uri . ' '
+ );
+ }
}
/**
@@ -125,74 +132,100 @@ protected function createException($message, $code = 0)
);
}
- protected function lock($key)
+ /**
+ * Locks transmission.
+ *
+ * Locks transmission in one or more directions. Useful when dealing with
+ * persistent connections. Note that every send/receive call implicitly
+ * calls this function and then restores it to the previous state. You only
+ * need to call this function if you need to do an uninterrputed sequence of
+ * such calls.
+ *
+ * @param int $direction The direction(s) to have locked. Acceptable values
+ * are the DIRECTION_* constants.
+ *
+ * @return int The previous state or FALSE on failure.
+ */
+ public function lock($direction = self::DIRECTION_ALL)
{
if ($this->persist) {
- switch($this->persistentHandler) {
- case 'wincache':
- return wincache_lock($this->persistentId . $key);
- default:
- throw $this->createException(
- 'Make sure WinCache is enabled.', 8
- );
+ $result = $this->lockState;
+ if ($direction & self::DIRECTION_RECEIVE) {
+ if (($this->lockState & self::DIRECTION_RECEIVE)
+ || $this->persistentHandler->lock(self::DIRECTION_RECEIVE)
+ ) {
+ $result |= self::DIRECTION_RECEIVE;
+ } else {
+ return false;
+ }
+ } else {
+ if ($this->persistentHandler->unlock(self::DIRECTION_RECEIVE)) {
+ $result |= ~self::DIRECTION_RECEIVE;
+ } else {
+ return false;
+ }
}
- }
- return true;
- }
-
- protected function unlock($key)
- {
- if ($this->persist) {
- switch($this->persistentHandler) {
- case 'wincache':
- return wincache_unlock($this->persistentId . $key);
- default:
- throw $this->createException(
- 'Make sure WinCache is enabled.', 8
- );
+
+ if ($direction & self::DIRECTION_SEND) {
+ if (($this->lockState & self::DIRECTION_SEND)
+ || $this->persistentHandler->lock(self::DIRECTION_SEND)
+ ) {
+ $result |= self::DIRECTION_SEND;
+ } else {
+ return false;
+ }
+ } else {
+ if ($this->persistentHandler->unlock(self::DIRECTION_SEND)) {
+ $result |= ~self::DIRECTION_SEND;
+ } else {
+ return false;
+ }
}
+ $oldState = $this->lockState;
+ $this->lockState = $result;
+ return $oldState;
}
- return true;
+ return false;
}
public function receive($length, $what = 'data')
{
- if ($this->lock('r')) {
- $result = parent::receive($length, $what);
- $this->unlock('r');
- return $result;
- } else {
+ $previousState = $this->lock(self::DIRECTION_RECEIVE);
+ if ($this->persist && false === $previousState) {
throw $this->createException(
'Unable to obtain receiving lock', 9
);
}
+ $result = parent::receive($length, $what);
+ $this->lock($previousState);
+ return $result;
}
public function receiveStream(
$length, FilterCollection $filters = null, $what = 'stream data'
) {
- if ($this->lock('r')) {
- $result = parent::receiveStream($length, $filters, $what);
- $this->unlock('r');
- return $result;
- } else {
+ $previousState = $this->lock(self::DIRECTION_RECEIVE);
+ if ($this->persist && false === $previousState) {
throw $this->createException(
'Unable to obtain receiving lock', 9
);
}
+ $result = parent::receiveStream($length, $filters, $what);
+ $this->lock($previousState);
+ return $result;
}
public function send($contents, $offset = null, $length = null)
{
- if ($this->lock('w')) {
- $result = parent::send($contents, $offset, $length);
- $this->unlock('w');
- return $result;
- } else {
+ $previousState = $this->lock(self::DIRECTION_SEND);
+ if ($this->persist && false === $previousState) {
throw $this->createException(
'Unable to obtain sending lock', 10
);
}
+ $result = parent::send($contents, $offset, $length);
+ $this->lock($previousState);
+ return $result;
}
}
View
@@ -126,6 +126,12 @@ public function testClientReceivingFilterCollection()
public function testPersistentClientConnection()
{
+// $this->client->close();
+// $phpt = new \PHPUnit_Extensions_PhptTestCase(
+// __DIR__ . '\PHPT\testNormalPersistentConnection.phpt'
+// );
+// $this->setResult($phpt->run());
+// $this->client = new TcpClient(REMOTE_HOST, REMOTE_PORT);
$this->client = new TcpClient(
REMOTE_HOSTNAME, REMOTE_PORT, true
);
@@ -0,0 +1,22 @@
+--TEST--
+Tests whether connections can be made.
+--CGI--
+--FILE--
+<?php
+namespace PEAR2\Net\Transmitter;
+
+$client1 = new TcpClient(
+ REMOTE_HOSTNAME, REMOTE_PORT, true
+);
+$client2 = new TcpClient(
+ REMOTE_HOSTNAME, REMOTE_PORT, true
+);
+echo $client1->isFresh();
+echo $client2->isFresh();
+echo $client1->receive(1);
+echo (int) $client1->isFresh();
+echo (int) $client2->isFresh();
+$client1->close();
+?>
+--EXPECT--
+11t00
View
@@ -1,2 +1,4 @@
<?php
-require_once '../src/PEAR2/Net/Transmitter/Autoload.php';
+require_once 'PEAR2/Autoload.php';
+PEAR2\Autoload::initialize('../src');
+PEAR2\Autoload::initialize('../../PEAR2_Cache_SHM.git/src');

0 comments on commit 616926c

Please sign in to comment.