Permalink
Browse files

Progress with the test suite.

  • Loading branch information...
1 parent f59f8f5 commit 750cdd592c245ebb61369e0506848757786d1a71 @boenrobot boenrobot committed Oct 9, 2011
@@ -59,7 +59,11 @@ public function append($name, array $params = array())
}
/**
- * Inserts the filter before another filter at a specified position.
+ * Inserts the filter before a position.
+ *
+ * Inserts the specified filter before a filter at a specified position. The
+ * new filter takes the specified position, while previous filters are moved
+ * forward by one.
*
* @param int $position The position before which the filter will be
* inserted.
@@ -144,7 +148,17 @@ public function rewind()
public function seek($position)
{
$this->position = $position;
- return $this->current();
+ return $this->valid();
+ }
+
+ /**
+ * Gets the current position.
+ *
+ * @return int The current position.
+ */
+ public function getCurrentPosition()
+ {
+ return $this->position;
}
/**
@@ -155,7 +169,7 @@ public function seek($position)
public function next()
{
++$this->position;
- return $this->current();
+ return $this->valid();
}
/**
@@ -69,7 +69,7 @@ public function __construct($server, $timeout = null)
$hostPortCombo = explode(':', $peername);
$this->peerIP = $hostPortCombo[0];
$this->peerPort = (int) $hostPortCombo[1];
- } catch (\Exception $e) {
+ } catch (StreamException $e) {
throw $this->createException('Failed to initialize connection.', 9);
}
}
@@ -35,8 +35,18 @@
*/
class StreamTransmitter
{
+ /**
+ * Used in {@link setBuffer()} to apply the setting to both sending and
+ * receiving.
+ */
const DIRECTION_BOTH = '|||';
+ /**
+ * Used in {@link setBuffer()} to apply the setting only to sending.
+ */
const DIRECTION_SEND = '<<<';
+ /**
+ * Used in {@link setBuffer()} to apply the setting only to receiving.
+ */
const DIRECTION_RECEIVE = '>>>';
/**
@@ -48,6 +58,14 @@ class StreamTransmitter
* @var bool A flag that tells whether or not the stream is persistent.
*/
protected $persist;
+
+ /**
+ * @var array An associative array with the chunk size of each direction.
+ * Key is the direction, value is the size in bytes as integer.
+ */
+ protected $chunkSize = array(
+ self::DIRECTION_SEND => 0xFFFFF, self::DIRECTION_RECEIVE => 0xFFFFF
+ );
/**
* Wraps around the specified stream.
@@ -113,7 +131,8 @@ public function setTimeout($seconds, $microseconds = 0)
*
* @param int $size The desired size of the buffer, in bytes.
* @param string $direction The buffer of which direction to set. Valid
- * values are the DIRECTION_* constants.
+ * values are the DIRECTION_* constants. Any other value is treated as
+ * {@link DIRECTION_BOTH}.
*
* @return bool TRUE on success, FALSE on failure.
*/
@@ -129,6 +148,38 @@ public function setBuffer($size, $direction = self::DIRECTION_BOTH)
&& stream_set_read_buffer($this->stream, $size) === 0;
}
}
+
+ /**
+ * Sets the size of the chunk.
+ *
+ * To ensure data integrity, as well as to allow for lower memory
+ * consumption, data is sent/received in chunks. This function
+ * allows you to set the size of each chunk. The default is 0xFFFFF.
+ *
+ * @param int $size The desired size of the chunk, in bytes.
+ * @param string $direction The chunk of which direction to set. Valid
+ * values are the DIRECTION_* constants. Any other value is treated as
+ * {@link DIRECTION_BOTH}.
+ *
+ * @return bool TRUE on success, FALSE on failure.
+ */
+ public function setChunk($size, $direction = self::DIRECTION_BOTH)
+ {
+ $size = (int) $size;
+ if ($size <= 0) {
+ return false;
+ }
+ switch($direction) {
+ case self::DIRECTION_SEND:
+ case self::DIRECTION_RECEIVE:
+ $this->chunkSize[$direction] = $size;
+ return true;
+ default:
+ $this->chunkSize[self::DIRECTION_SEND]
+ = $this->chunkSize[self::DIRECTION_RECEIVE] = $size;
+ return true;
+ }
+ }
/**
* Sends a string over the wrapped stream.
@@ -141,10 +192,11 @@ public function send($string)
{
$bytes = 0;
$bytesToSend = (double) sprintf('%u', strlen($string));
+ $chunkSize = $this->chunkSize[self::DIRECTION_SEND];
while ($bytes < $bytesToSend) {
if ($this->isAcceptingData()) {
$bytesNow = @fwrite(
- $this->stream, substr($string, $bytes, 0xFFFFF)
+ $this->stream, substr($string, $bytes, $chunkSize)
);
if (0 != $bytesNow) {
$bytes += $bytesNow;
@@ -168,10 +220,11 @@ public function send($string)
public function sendStream($stream)
{
$bytes = 0;
+ $chunkSize = $this->chunkSize[self::DIRECTION_SEND];
while (!feof($stream)) {
if ($this->isAcceptingData()) {
$bytesNow = @stream_copy_to_stream(
- $stream, $this->stream, 0xFFFFF
+ $stream, $this->stream, $chunkSize
);
if (0 != $bytesNow) {
$bytes += $bytesNow;
@@ -200,10 +253,11 @@ public function sendStream($stream)
public function receive($length, $what = 'data')
{
$result = '';
+ $chunkSize = $this->chunkSize[self::DIRECTION_RECEIVE];
while ($length > 0) {
if ($this->isAvailable()) {
while ($this->isDataAwaiting()) {
- $fragment = fread($this->stream, min($length, 0xFFFFF));
+ $fragment = fread($this->stream, min($length, $chunkSize));
if ('' !== $fragment) {
$length -= strlen($fragment);
$result .= $fragment;
@@ -244,10 +298,11 @@ public function receiveStream(
}
}
+ $chunkSize = $this->chunkSize[self::DIRECTION_RECEIVE];
while ($length > 0) {
if ($this->isAvailable()) {
while ($this->isDataAwaiting()) {
- $fragment = fread($this->stream, min($length, 0xFFFFF));
+ $fragment = fread($this->stream, min($length, $chunkSize));
if ('' !== $fragment) {
$length -= strlen($fragment);
fwrite($result, $fragment);
View
@@ -18,15 +18,129 @@ public function setUp()
public function tearDown()
{
+ $this->client->close();
unset($this->client);
}
public function testOneByteEcho()
{
- $byte = 't';
+ $byte = '1';
$this->client->send($byte);
$this->assertEquals(
$byte, $this->client->receive(1), 'Wrong byte echoed.'
);
}
+
+ public function test3MegaBytesEcho()
+ {
+ $size = 3/*m*/ * 1024/*k*/ * 1024/*b*/;
+ $contents = str_repeat('2', $size);
+ $this->client->send($contents);
+ $this->assertEquals(
+ $contents, $this->client->receive($size), 'Wrong contents echoed.'
+ );
+ }
+
+ public function testOneByteEchoStreamSend()
+ {
+ $stream = fopen('php://temp', 'r+b');
+ fwrite($stream, '3');
+ rewind($stream);
+ $this->client->sendStream($stream);
+ $this->assertEquals(
+ stream_get_contents($stream), $this->client->receive(1),
+ 'Wrong byte echoed.'
+ );
+ }
+
+ public function test3MegaBytesEchoStreamSend()
+ {
+ $size = 3/*m*/ * 1024/*k*/ * 1024/*b*/;
+ $stream = fopen('php://temp', 'r+b');
+ fwrite($stream, str_repeat('4', $size));
+ rewind($stream);
+ $this->client->sendStream($stream);
+ $this->assertEquals(
+ stream_get_contents($stream), $this->client->receive($size),
+ 'Wrong contents echoed.'
+ );
+ }
+
+ public function testOneByteEchoStreamReceive()
+ {
+ $byte = '5';
+ $this->client->send($byte);
+ $this->assertEquals(
+ $byte, stream_get_contents($this->client->receiveStream(1)),
+ 'Wrong byte echoed.'
+ );
+ }
+
+ public function test3MegaBytesEchoStreamReceive()
+ {
+ $size = 3/*m*/ * 1024/*k*/ * 1024/*b*/;
+ $contents = str_repeat('6', $size);
+ $this->client->send($contents);
+ $this->assertEquals(
+ $contents, stream_get_contents($this->client->receiveStream($size)),
+ 'Wrong contents echoed.'
+ );
+ }
+
+ public function testClientReceivingFilterCollection()
+ {
+ $filters = new FilterCollection();
+ $filters->append('string.toupper');
+ $this->assertEquals(
+ 'T',
+ stream_get_contents($this->client->receiveStream(1, $filters)),
+ 'Wrong contents echoed.'
+ );
+ }
+
+ public function testPersistentClientConnection()
+ {
+ $this->client = new SocketClientTransmitter(
+ REMOTE_HOSTNAME, REMOTE_PORT, true
+ );
+ $client = new SocketClientTransmitter(
+ REMOTE_HOSTNAME, REMOTE_PORT, true
+ );
+ $this->assertTrue($this->client->isFresh());
+ $this->assertTrue($client->isFresh());
+ $this->assertEquals('t', $this->client->receive(1));
+ $this->assertFalse($this->client->isFresh());
+ $this->assertFalse($client->isFresh());
+ $client->close();
+ }
+
+ public function testClientReceivingIncompleteData()
+ {
+ try {
+ $this->client->receive(2);
+ $this->fail('Receiving had to fail.');
+ } catch(SocketException $e) {
+ $this->assertEquals(4, $e->getCode(), 'Improper exception code.');
+ }
+ }
+
+ public function testClientReceivingIncompleteDataStream()
+ {
+ try {
+ $this->client->receiveStream(2);
+ $this->fail('Receiving had to fail.');
+ } catch(SocketException $e) {
+ $this->assertEquals(5, $e->getCode(), 'Improper exception code.');
+ }
+ }
+
+ public function testServerReceivingIncompleteData()
+ {
+ $this->assertEquals(1, $this->client->send('t'), 'Wrong amount sent.');
+ }
+
+ public function testServerReceivingIncompleteDataStream()
+ {
+ $this->assertEquals(1, $this->client->send('t'), 'Wrong amount sent.');
+ }
}
Oops, something went wrong.

0 comments on commit 750cdd5

Please sign in to comment.