Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add TransformStreamDefaultController terminate() method #818

Merged
merged 2 commits into from Oct 3, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
17 changes: 12 additions & 5 deletions reference-implementation/lib/transform-stream.js
Expand Up @@ -145,12 +145,12 @@ class TransformStreamDefaultController {
TransformStreamDefaultControllerEnqueue(this, chunk);
}

close() {
terminate() {
if (IsTransformStreamDefaultController(this) === false) {
throw defaultControllerBrandCheckException('close');
}

TransformStreamDefaultControllerClose(this);
TransformStreamDefaultControllerTerminate(this);
}

error(reason) {
Expand Down Expand Up @@ -180,15 +180,22 @@ function IsTransformStreamDefaultController(x) {
return true;
}

function TransformStreamDefaultControllerClose(controller) {
// console.log('TransformStreamDefaultControllerClose()');
function TransformStreamDefaultControllerTerminate(controller) {
// console.log('TransformStreamDefaultControllerTerminate()');

const readableController = controller._controlledTransformStream._readable._readableStreamController;
const stream = controller._controlledTransformStream;
const readableController = stream._readable._readableStreamController;
if (ReadableStreamDefaultControllerCanCloseOrEnqueue(readableController) === false) {
throw new TypeError('Readable side is not in a state that can be closed');
}

ReadableStreamDefaultControllerClose(readableController);
WritableStreamDefaultControllerErrorIfNeeded(stream._writable._writableStreamController,
new TypeError('TransformStream terminated'));
if (stream._backpressure === true) {
// Permit any pending write() or start() calls to complete.
TransformStreamSetBackpressure(stream, false);
}
}

function TransformStreamDefaultControllerEnqueue(controller, chunk) {
Expand Down
Expand Up @@ -67,9 +67,9 @@ test(() => {
}, 'TransformStreamDefaultController.prototype.enqueue enforces a brand check');

test(() => {
methodThrowsForAll(TransformStreamDefaultController.prototype, 'close',
methodThrowsForAll(TransformStreamDefaultController.prototype, 'terminate',
[fakeTSDefaultController(), realTS(), undefined, null]);
}, 'TransformStreamDefaultController.prototype.close enforces a brand check');
}, 'TransformStreamDefaultController.prototype.terminate enforces a brand check');

test(() => {
methodThrowsForAll(TransformStreamDefaultController.prototype, 'error',
Expand Down
Expand Up @@ -183,7 +183,7 @@ promise_test(t => {
const ts = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk);
controller.close();
controller.terminate();
throw thrownError;
}
}, undefined, { highWaterMark: 1 });
Expand All @@ -193,7 +193,7 @@ promise_test(t => {
promise_rejects(t, thrownError, writePromise, 'write() should reject'),
promise_rejects(t, thrownError, closedPromise, 'reader.closed should reject')
]);
}, 'an exception from transform() should error the stream if close has been requested but not completed');
}, 'an exception from transform() should error the stream if terminate has been requested but not completed');

promise_test(t => {
const ts = new TransformStream();
Expand Down
Expand Up @@ -375,11 +375,11 @@ promise_test(() => {
test(() => {
new TransformStream({
start(controller) {
controller.close();
controller.terminate();
assert_throws(new TypeError(), () => controller.enqueue(), 'enqueue should throw');
}
});
}, 'enqueue() should throw after controller.close()');
}, 'enqueue() should throw after controller.terminate()');

promise_test(() => {
let controller;
Expand All @@ -396,11 +396,11 @@ promise_test(() => {
test(() => {
new TransformStream({
start(controller) {
controller.close();
assert_throws(new TypeError(), () => controller.close(), 'close should throw');
controller.terminate();
assert_throws(new TypeError(), () => controller.terminate(), 'terminate should throw');
}
});
}, 'controller.close() should throw the second time it is called');
}, 'controller.terminate() should throw the second time it is called');

promise_test(() => {
let controller;
Expand All @@ -410,9 +410,9 @@ promise_test(() => {
}
});
const cancelPromise = ts.readable.cancel();
assert_throws(new TypeError(), () => controller.close(), 'close should throw');
assert_throws(new TypeError(), () => controller.terminate(), 'terminate should throw');
return cancelPromise;
}, 'close() should throw after readable.cancel()');
}, 'terminate() should throw after readable.cancel()');

promise_test(() => {
let calls = 0;
Expand Down
Expand Up @@ -50,7 +50,7 @@ promise_test(() => {
}, undefined, {
size() {
// The readable queue is empty.
controller.close();
controller.terminate();
// The readable state has gone from "readable" to "closed".
return 1;
// This chunk will be enqueued, but will be impossible to read because the state is already "closed".
Expand All @@ -63,7 +63,7 @@ promise_test(() => {
.then(array => assert_array_equals(array, [], 'array should contain no chunks'));
// The chunk 'a' is still in readable's queue. readable is closed so 'a' cannot be read. writable's queue is empty and
// it is still writable.
}, 'close() inside size() should work');
}, 'terminate() inside size() should work');

promise_test(t => {
let controller;
Expand Down Expand Up @@ -141,7 +141,7 @@ promise_test(() => {
return delay(0);
}).then(() => {
assert_array_equals(ws.events, ['write', 'a'], 'first chunk should have been written');
controller.close();
controller.terminate();
return pipeToPromise;
}).then(() => {
assert_array_equals(ws.events, ['write', 'a', 'close'], 'target should have been closed');
Expand Down
@@ -0,0 +1,11 @@
<!DOCTYPE html>
<meta charset="utf-8">
<title>terminate.js browser context wrapper file</title>

<script src="/resources/testharness.js"></script>
<script src="/resources/testharnessreport.js"></script>

<script src="../resources/test-utils.js"></script>
<script src="../resources/recording-streams.js"></script>

<script src="terminate.js"></script>
@@ -0,0 +1,85 @@
'use strict';

if (self.importScripts) {
self.importScripts('/resources/testharness.js');
self.importScripts('../resources/recording-streams.js');
self.importScripts('../resources/test-utils.js');
}

promise_test(t => {
const ts = recordingTransformStream({}, undefined, { highWaterMark: 0 });
const rs = new ReadableStream({
start(controller) {
controller.enqueue(0);
}
});
let pipeToRejected = false;
const pipeToPromise = promise_rejects(t, new TypeError(), rs.pipeTo(ts.writable), 'pipeTo should reject').then(() => {
pipeToRejected = true;
});
return delay(0).then(() => {
assert_array_equals(ts.events, [], 'transform() should have seen no chunks');
assert_false(pipeToRejected, 'pipeTo() should not have rejected yet');
ts.controller.terminate();
return pipeToPromise;
}).then(() => {
assert_array_equals(ts.events, [], 'transform() should still have seen no chunks');
assert_true(pipeToRejected, 'pipeToRejected must be true');
});
}, 'controller.terminate() should error pipeTo()');

promise_test(t => {
const ts = recordingTransformStream({}, undefined, { highWaterMark: 1 });
const rs = new ReadableStream({
start(controller) {
controller.enqueue(0);
controller.enqueue(1);
}
});
const pipeToPromise = rs.pipeTo(ts.writable);
return delay(0).then(() => {
assert_array_equals(ts.events, ['transform', 0], 'transform() should have seen one chunk');
ts.controller.terminate();
return promise_rejects(t, new TypeError(), pipeToPromise, 'pipeTo() should reject');
}).then(() => {
assert_array_equals(ts.events, ['transform', 0], 'transform() should still have seen only one chunk');
});
}, 'controller.terminate() should prevent remaining chunks from being processed');

test(() => {
new TransformStream({
start(controller) {
controller.enqueue(0);
controller.terminate();
assert_throws(new TypeError(), () => controller.enqueue(1), 'enqueue should throw');
}
});
}, 'controller.enqueue() should throw after controller.terminate()');

const error1 = new Error('error1');
error1.name = 'error1';

promise_test(t => {
const ts = new TransformStream({
start(controller) {
controller.enqueue(0);
controller.terminate();
controller.error(error1);
}
});
return Promise.all([
promise_rejects(t, new TypeError(), ts.writable.abort(), 'abort() should reject with a TypeError'),
promise_rejects(t, error1, ts.readable.cancel(), 'cancel() should reject with error1')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we test more than .cancel() to test if it's errored properly?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.cancel() is actually pretty specific. I added reader.closed to the tests just to be sure. I don't want to get into testing Readablestream itself here.

]);
}, 'controller.error() after controller.terminate() with queued chunk should error the readable');

test(() => {
new TransformStream({
start(controller) {
controller.terminate();
assert_throws(new TypeError(), () => controller.error(error1), 'error() should throw');
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opened #821.

In general, have we tested/are we OK with the behavior for when controller methods throw? I think a guiding principle is that you shouldn't throw if it's not "your fault", e.g. if someone using the public API of the stream can transition the state in some way as to cause an exception. Does that sound familiar from previous discussions? Are we following it here, e.g. if the writable side is aborted or the readable side canceled?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for bringing this up. I had forgotten about it actually.

I filed #822 to track the changes to transform stream.

}
});
}, 'controller.error() after controller.terminate() without queued chunk should throw');

done();