Skip to content

Commit

Permalink
Add TransformStreamDefaultController terminate() method
Browse files Browse the repository at this point in the history
Replace close() with terminate(). In addition to closing the readable
side, terminate() also errors the writable side. This stops data from
being produced after we are no longer interested in it.

Modify existing tests to handle the new method, and add tests for the new
functionality.

Closes #774.
  • Loading branch information
ricea committed Oct 3, 2017
1 parent 17b7a27 commit 31863aa
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 19 deletions.
17 changes: 12 additions & 5 deletions reference-implementation/lib/transform-stream.js
Expand Up @@ -139,12 +139,12 @@ class TransformStreamDefaultController {
TransformStreamDefaultControllerEnqueue(this, chunk);
}

close() {
terminate() {
if (IsTransformStreamDefaultController(this) === false) {
throw defaultControllerBrandCheckException('close');
}

TransformStreamDefaultControllerClose(this);
TransformStreamDefaultControllerTerminate(this);
}

error(reason) {
Expand Down Expand Up @@ -174,15 +174,22 @@ function IsTransformStreamDefaultController(x) {
return true;
}

function TransformStreamDefaultControllerClose(controller) {
// console.log('TransformStreamDefaultControllerClose()');
function TransformStreamDefaultControllerTerminate(controller) {
// console.log('TransformStreamDefaultControllerTerminate()');

const readableController = controller._controlledTransformStream._readable._readableStreamController;
const stream = controller._controlledTransformStream;
const readableController = stream._readable._readableStreamController;
if (ReadableStreamDefaultControllerCanCloseOrEnqueue(readableController) === false) {
throw new TypeError('Readable side is not in a state that can be closed');
}

ReadableStreamDefaultControllerClose(readableController);
WritableStreamDefaultControllerErrorIfNeeded(stream._writable._writableStreamController,
new TypeError('TransformStream terminated'));
if (stream._backpressure === true) {
// Permit any pending write() or start() calls to complete.
TransformStreamSetBackpressure(stream, false);
}
}

function TransformStreamDefaultControllerEnqueue(controller, chunk) {
Expand Down
Expand Up @@ -67,9 +67,9 @@ test(() => {
}, 'TransformStreamDefaultController.prototype.enqueue enforces a brand check');

test(() => {
methodThrowsForAll(TransformStreamDefaultController.prototype, 'close',
methodThrowsForAll(TransformStreamDefaultController.prototype, 'terminate',
[fakeTSDefaultController(), realTS(), undefined, null]);
}, 'TransformStreamDefaultController.prototype.close enforces a brand check');
}, 'TransformStreamDefaultController.prototype.terminate enforces a brand check');

test(() => {
methodThrowsForAll(TransformStreamDefaultController.prototype, 'error',
Expand Down
Expand Up @@ -183,7 +183,7 @@ promise_test(t => {
const ts = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk);
controller.close();
controller.terminate();
throw thrownError;
}
}, undefined, { highWaterMark: 1 });
Expand All @@ -193,7 +193,7 @@ promise_test(t => {
promise_rejects(t, thrownError, writePromise, 'write() should reject'),
promise_rejects(t, thrownError, closedPromise, 'reader.closed should reject')
]);
}, 'an exception from transform() should error the stream if close has been requested but not completed');
}, 'an exception from transform() should error the stream if terminate has been requested but not completed');

promise_test(t => {
const ts = new TransformStream();
Expand Down
Expand Up @@ -376,11 +376,11 @@ promise_test(() => {
test(() => {
new TransformStream({
start(controller) {
controller.close();
controller.terminate();
assert_throws(new TypeError(), () => controller.enqueue(), 'enqueue should throw');
}
});
}, 'enqueue() should throw after controller.close()');
}, 'enqueue() should throw after controller.terminate()');

promise_test(() => {
let controller;
Expand All @@ -397,11 +397,11 @@ promise_test(() => {
test(() => {
new TransformStream({
start(controller) {
controller.close();
assert_throws(new TypeError(), () => controller.close(), 'close should throw');
controller.terminate();
assert_throws(new TypeError(), () => controller.terminate(), 'terminate should throw');
}
});
}, 'controller.close() should throw the second time it is called');
}, 'controller.terminate() should throw the second time it is called');

promise_test(() => {
let controller;
Expand All @@ -411,9 +411,9 @@ promise_test(() => {
}
});
const cancelPromise = ts.readable.cancel();
assert_throws(new TypeError(), () => controller.close(), 'close should throw');
assert_throws(new TypeError(), () => controller.terminate(), 'terminate should throw');
return cancelPromise;
}, 'close() should throw after readable.cancel()');
}, 'terminate() should throw after readable.cancel()');

promise_test(() => {
let calls = 0;
Expand Down
Expand Up @@ -50,7 +50,7 @@ promise_test(() => {
}, undefined, {
size() {
// The readable queue is empty.
controller.close();
controller.terminate();
// The readable state has gone from "readable" to "closed".
return 1;
// This chunk will be enqueued, but will be impossible to read because the state is already "closed".
Expand All @@ -63,7 +63,7 @@ promise_test(() => {
.then(array => assert_array_equals(array, [], 'array should contain no chunks'));
// The chunk 'a' is still in readable's queue. readable is closed so 'a' cannot be read. writable's queue is empty and
// it is still writable.
}, 'close() inside size() should work');
}, 'terminate() inside size() should work');

promise_test(t => {
let controller;
Expand Down Expand Up @@ -141,7 +141,7 @@ promise_test(() => {
return delay(0);
}).then(() => {
assert_array_equals(ws.events, ['write', 'a'], 'first chunk should have been written');
controller.close();
controller.terminate();
return pipeToPromise;
}).then(() => {
assert_array_equals(ws.events, ['write', 'a', 'close'], 'target should have been closed');
Expand Down
@@ -0,0 +1,11 @@
<!DOCTYPE html>
<meta charset="utf-8">
<title>terminate.js browser context wrapper file</title>

<script src="/resources/testharness.js"></script>
<script src="/resources/testharnessreport.js"></script>

<script src="../resources/test-utils.js"></script>
<script src="../resources/recording-streams.js"></script>

<script src="terminate.js"></script>
@@ -0,0 +1,86 @@
'use strict';

if (self.importScripts) {
self.importScripts('/resources/testharness.js');
self.importScripts('../resources/recording-streams.js');
self.importScripts('../resources/test-utils.js');
}

promise_test(t => {
const ts = recordingTransformStream({}, undefined, { highWaterMark: 0 });
const rs = new ReadableStream({
start(controller) {
controller.enqueue(0);
}
});
let pipeToRejected = false;
const pipeToPromise = promise_rejects(t, new TypeError(), rs.pipeTo(ts.writable), 'pipeTo should reject').then(() => {
pipeToRejected = true;
});
return delay(0).then(() => {
assert_array_equals(ts.events, [], 'transform() should have seen no chunks');
assert_false(pipeToRejected, 'pipeTo() should not have rejected yet');
ts.controller.terminate();
return pipeToPromise;
}).then(() => {
assert_array_equals(ts.events, [], 'transform() should still have seen no chunks');
assert_true(pipeToRejected, 'pipeToRejected must be true');
});
}, 'controller.terminate() should error pipeTo()');

promise_test(t => {
const ts = recordingTransformStream({}, undefined, { highWaterMark: 1 });
const rs = new ReadableStream({
start(controller) {
controller.enqueue(0);
controller.enqueue(1);
}
});
const pipeToPromise = rs.pipeTo(ts.writable);
return delay(0).then(() => {
assert_array_equals(ts.events, ['transform', 0], 'transform() should have seen one chunk');
ts.controller.terminate();
return promise_rejects(t, new TypeError(), pipeToPromise, 'pipeTo() should reject');
}).then(() => {
assert_array_equals(ts.events, ['transform', 0], 'transform() should still have seen only one chunk');
});
}, 'controller.terminate() should prevent remaining chunks from being processed');

test(() => {
new TransformStream({
start(controller) {
controller.enqueue(0);
controller.terminate();
assert_throws(new TypeError(), () => controller.enqueue(1), 'enqueue should throw');
}
});
}, 'controller.enqueue() should throw after controller.terminate()');

const error1 = new Error('error1');
error1.name = 'error1';

promise_test(t => {
const ts = new TransformStream({
start(controller) {
controller.enqueue(0);
controller.terminate();
controller.error(error1);
}
});
return Promise.all([
promise_rejects(t, new TypeError(), ts.writable.abort(), 'abort() should reject with a TypeError'),
promise_rejects(t, error1, ts.readable.cancel(), 'cancel() should reject with error1'),
promise_rejects(t, error1, ts.readable.getReader().closed, 'closed should reject with error1')
]);
}, 'controller.error() after controller.terminate() with queued chunk should error the readable');

test(() => {
new TransformStream({
start(controller) {
controller.terminate();
assert_throws(new TypeError(), () => controller.error(error1), 'error() should throw');
}
});
}, 'controller.error() after controller.terminate() without queued chunk should throw');

done();

0 comments on commit 31863aa

Please sign in to comment.