Skip to content

Commit

Permalink
Clean up unneeded references for unwrapped streams when closing
Browse files Browse the repository at this point in the history
  • Loading branch information
clue committed Jul 2, 2019
1 parent 2d47cc2 commit bb8527f
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 10 deletions.
25 changes: 17 additions & 8 deletions src/UnwrapReadableStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public function __construct(PromiseInterface $promise)

$this->promise = $promise->then(
function ($stream) {
if (!($stream instanceof ReadableStreamInterface)) {
if (!$stream instanceof ReadableStreamInterface) {
throw new InvalidArgumentException('Not a readable stream');
}
return $stream;
Expand Down Expand Up @@ -80,6 +80,9 @@ function ($e) use ($out, &$closed) {
$out->emit('error', array($e, $out));
$out->close();
}

// resume() and pause() may attach to this promise, so ensure we actually reject here
throw $e;
}
);
}
Expand All @@ -91,16 +94,20 @@ public function isReadable()

public function pause()
{
$this->promise->then(function (ReadableStreamInterface $stream) {
$stream->pause();
});
if ($this->promise !== null) {
$this->promise->then(function (ReadableStreamInterface $stream) {
$stream->pause();
});
}
}

public function resume()
{
$this->promise->then(function (ReadableStreamInterface $stream) {
$stream->resume();
});
if ($this->promise !== null) {
$this->promise->then(function (ReadableStreamInterface $stream) {
$stream->resume();
});
}
}

public function pipe(WritableStreamInterface $dest, array $options = array())
Expand All @@ -122,7 +129,9 @@ public function close()
if ($this->promise instanceof CancellablePromiseInterface) {
$this->promise->cancel();
}
$this->promise = null;

$this->emit('close', array($this));
$this->emit('close');
$this->removeAllListeners();
}
}
6 changes: 4 additions & 2 deletions src/UnwrapWritableStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public function __construct(PromiseInterface $promise)

$this->promise = $promise->then(
function ($stream) {
if (!($stream instanceof WritableStreamInterface)) {
if (!$stream instanceof WritableStreamInterface) {
throw new InvalidArgumentException('Not a writable stream');
}
return $stream;
Expand Down Expand Up @@ -156,7 +156,9 @@ public function close()
if ($this->promise instanceof CancellablePromiseInterface) {
$this->promise->cancel();
}
$this->promise = $this->stream = null;

$this->emit('close', array($this));
$this->emit('close');
$this->removeAllListeners();
}
}
63 changes: 63 additions & 0 deletions tests/UnwrapReadableTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,30 @@ public function testForwardsPauseToInputStream()
$stream->pause();
}

/**
* @doesNotPerformAssertions
*/
public function testPauseAfterCloseHasNoEffect()
{
$promise = new \React\Promise\Promise(function () { });
$stream = Stream\unwrapReadable($promise);

$stream->close();
$stream->pause();
}


/**
* @doesNotPerformAssertions
*/
public function testPauseAfterErrorDueToInvalidInputHasNoEffect()
{
$promise = \React\Promise\reject(new \RuntimeException());
$stream = Stream\unwrapReadable($promise);

$stream->pause();
}

public function testForwardsResumeToInputStream()
{
$input = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock();
Expand All @@ -211,6 +235,18 @@ public function testForwardsResumeToInputStream()
$stream->resume();
}

/**
* @doesNotPerformAssertions
*/
public function testResumeAfterCloseHasNoEffect()
{
$promise = new \React\Promise\Promise(function () { });
$stream = Stream\unwrapReadable($promise);

$stream->close();
$stream->resume();
}

public function testPipingStreamWillForwardDataEvents()
{
$input = new ThroughStream();
Expand Down Expand Up @@ -279,4 +315,31 @@ public function testClosingStreamWillCloseStreamFromCancellationHandler()

$this->assertFalse($input->isReadable());
}

public function testCloseShouldRemoveAllListenersAfterCloseEvent()
{
$promise = new \React\Promise\Promise(function () { });
$stream = Stream\unwrapReadable($promise);

$stream->on('close', $this->expectCallableOnce());
$this->assertCount(1, $stream->listeners('close'));

$stream->close();

$this->assertCount(0, $stream->listeners('close'));
}

public function testCloseShouldRemoveReferenceToPromiseToAvoidGarbageReferences()
{
$promise = new \React\Promise\Promise(function () { });
$stream = Stream\unwrapReadable($promise);

$stream->close();

$ref = new \ReflectionProperty($stream, 'promise');
$ref->setAccessible(true);
$value = $ref->getValue($stream);

$this->assertNull($value);
}
}
33 changes: 33 additions & 0 deletions tests/UnwrapWritableTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -352,4 +352,37 @@ public function testClosingStreamWillCloseStreamFromCancellationHandler()

$this->assertFalse($input->isWritable());
}

public function testCloseShouldRemoveAllListenersAfterCloseEvent()
{
$promise = new \React\Promise\Promise(function () { });
$stream = Stream\unwrapWritable($promise);

$stream->on('close', $this->expectCallableOnce());
$this->assertCount(1, $stream->listeners('close'));

$stream->close();

$this->assertCount(0, $stream->listeners('close'));
}

public function testCloseShouldRemoveReferenceToPromiseAndStreamToAvoidGarbageReferences()
{
$promise = \React\Promise\resolve(new ThroughStream());
$stream = Stream\unwrapWritable($promise);

$stream->close();

$ref = new \ReflectionProperty($stream, 'promise');
$ref->setAccessible(true);
$value = $ref->getValue($stream);

$this->assertNull($value);

$ref = new \ReflectionProperty($stream, 'stream');
$ref->setAccessible(true);
$value = $ref->getValue($stream);

$this->assertNull($value);
}
}

0 comments on commit bb8527f

Please sign in to comment.