Skip to content

Commit

Permalink
Add TransformStream tests for re-entrant strategies
Browse files Browse the repository at this point in the history
Add tests to ensure that implementations of TransformStream behave correctly
when operations are triggered from within the readableStrategy.size function.

This is not really a "supported" use case as any code that does this is either
malicious or broken, but implementations need to avoid crashes or memory
corruption in these cases.

Also merge changes to recording-streams.js from web-platform-tests that are
required to make these tests work.
  • Loading branch information
ricea committed Sep 11, 2017
1 parent 8a39c6a commit 168e56c
Show file tree
Hide file tree
Showing 3 changed files with 328 additions and 5 deletions.
Expand Up @@ -54,18 +54,16 @@ self.recordingWritableStream = (extras = {}, strategy) => {

return undefined;
},
write(chunk) {
write(chunk, controller) {
stream.events.push('write', chunk);

if (extras.write) {
return extras.write(chunk);
return extras.write(chunk, controller);
}

return undefined;
},
close(...args) {
assert_array_equals(args, [controllerToCopyOver], 'close must always be called with the controller');

close() {
stream.events.push('close');

if (extras.close) {
Expand Down
@@ -0,0 +1,14 @@
<!DOCTYPE html>
<meta charset="utf-8">
<title>reentrant-strategies.js browser context wrapper file</title>

<!-- This is a placeholder file until the tests are upstreamed. -->

<script src="/resources/testharness.js"></script>
<script src="/resources/testharnessreport.js"></script>

<script src="../resources/recording-streams.js"></script>
<script src="../resources/rs-utils.js"></script>
<script src="../resources/test-utils.js"></script>

<script src="reentrant-strategies.js"></script>
@@ -0,0 +1,311 @@
'use strict';

// The size() function of readableStrategy can re-entrantly call back into the TransformStream implementation. This
// makes it risky to cache state across the call to ReadableStreamDefaultControllerEnqueue. These tests attempt to catch
// such errors. They are separated from the other strategy tests because no real user code should ever do anything like
// this.
//
// There is no such issue with writableStrategy size() because it is never called from within TransformStream
// algorithms.

if (self.importScripts) {
self.importScripts('/resources/testharness.js');
self.importScripts('../resources/recording-streams.js');
self.importScripts('../resources/rs-utils.js');
self.importScripts('../resources/test-utils.js');
}

const error1 = new Error('error1');
error1.name = 'error1';

promise_test(() => {
let controller;
let calls = 0;
const ts = new TransformStream({
start(c) {
controller = c;
}
}, undefined, {
size() {
++calls;
if (calls < 2) {
controller.enqueue('b');
}
return 1;
},
highWaterMark: Infinity
});
const writer = ts.writable.getWriter();
return Promise.all([writer.write('a'), writer.close()])
.then(() => readableStreamToArray(ts.readable))
.then(array => assert_array_equals(array, ['b', 'a'], 'array should contain two chunks'));
}, 'enqueue() inside size() should work');

// The behaviour in this test may seem strange, but it is logical. The call to controller.close() happens while the
// readable queue is still empty, so the readable transitions to the "closed" state immediately, and the chunk is left
// stranded in the readable's queue.
promise_test(() => {
let controller;
const ts = new TransformStream({
start(c) {
controller = c;
}
}, undefined, {
size() {
controller.close();
return 1;
},
highWaterMark: Infinity
});
const writer = ts.writable.getWriter();
return writer.write('a')
.then(() => readableStreamToArray(ts.readable))
.then(array => assert_array_equals(array, [], 'array should contain no chunks'));
}, 'close() inside size() should work');

promise_test(t => {
let controller;
const ts = new TransformStream({
start(c) {
controller = c;
}
}, undefined, {
size() {
controller.error(error1);
return 1;
},
highWaterMark: Infinity
});
const writer = ts.writable.getWriter();
return writer.write('a')
.then(() => promise_rejects(t, error1, ts.readable.getReader().read(), 'read() should reject'));
}, 'error() inside size() should work');

promise_test(() => {
let controller;
const ts = new TransformStream({
start(c) {
controller = c;
}
}, undefined, {
size() {
assert_equals(controller.desiredSize, 1, 'desiredSize should be 1');
return 1;
},
highWaterMark: 1
});
const writer = ts.writable.getWriter();
return Promise.all([writer.write('a'), writer.close()])
.then(() => readableStreamToArray(ts.readable))
.then(array => assert_array_equals(array, ['a'], 'array should contain one chunk'));
}, 'desiredSize inside size() should work');

promise_test(t => {
let cancelPromise;
const ts = new TransformStream({}, undefined, {
size() {
cancelPromise = ts.readable.cancel(error1);
return 1;
},
highWaterMark: Infinity
});
const writer = ts.writable.getWriter();
return writer.write('a')
.then(() => {
promise_rejects(t, error1, writer.closed, 'writer.closed should reject');
return cancelPromise;
});
}, 'readable cancel() inside size() should work');

promise_test(() => {
let controller;
let pipeToPromise;
const ws = recordingWritableStream();
const ts = new TransformStream({
start(c) {
controller = c;
}
}, undefined, {
size() {
pipeToPromise = ts.readable.pipeTo(ws);
return 1;
},
highWaterMark: 1
});
// Allow promise returned by start() to resolve so that enqueue() will happen synchronously.
return delay(0).then(() => {
controller.enqueue('a');
// Allow pipeTo() to move the 'a' to the writable.
return delay(0);
}).then(() => {
assert_array_equals(ws.events, ['write', 'a'], 'first chunk should have been written');
controller.close();
return pipeToPromise;
}).then(() => {
assert_array_equals(ws.events, ['write', 'a', 'close'], 'target should have been closed');
});
}, 'pipeTo() inside size() should work');

promise_test(() => {
let controller;
let readPromise;
let calls = 0;
let reader;
const ts = new TransformStream({
start(c) {
controller = c;
}
}, undefined, {
size() {
// This is triggered by controller.enqueue(). The queue is empty and there are no pending reads. pull() is called
// synchronously, allowing transform() to proceed asynchronously. This results in a second call to enqueue(),
// which resolves this pending read() without calling size() again.
readPromise = reader.read();
++calls;
return 1;
},
highWaterMark: 0
});
reader = ts.readable.getReader();
const writer = ts.writable.getWriter();
let writeCalled = false;
const writePromise = writer.write('b').then(() => {
writeCalled = true;
});
return flushAsyncEvents().then(() => {
assert_false(writeCalled);
controller.enqueue('a');
assert_equals(calls, 1, 'size() should have been called once');
return delay(0);
}).then(() => {
assert_true(writeCalled);
assert_equals(calls, 1, 'size() should only be called once');
return readPromises[0];
}).then(({ value, done }) => {
assert_false(done, 'done should be false');
// See https://github.com/whatwg/streams/issues/794 for why this chunk is not 'a'.
assert_equals(value, 'b', 'chunk should have been read');
return writePromise;
});
}, 'read() inside of size() should work');

promise_test(() => {
let writer;
let writePromise1;
let calls = 0;
const ts = new TransformStream({}, undefined, {
size() {
++calls;
if (calls < 2) {
writePromise1 = writer.write('a');
}
return 1;
},
highWaterMark: Infinity
});
writer = ts.writable.getWriter();
// Give pull() a chance to be called.
return delay(0).then(() => {
// This write results in a synchronous call to transform(), enqueue(), and size().
const writePromise2 = writer.write('b');
assert_equals(calls, 1, 'size() should have been called once');
return Promise.all([writePromise1, writePromise2, writer.close()]);
}).then(() => {
assert_equals(calls, 2, 'size() should have been called twice');
return readableStreamToArray(ts.readable);
}).then(array => {
assert_array_equals(array, ['b', 'a'], 'both chunks should have been enqueued');
});
}, 'writer.write() inside size() should work');

promise_test(() => {
let controller;
let writer;
let writePromise;
let calls = 0;
const ts = new TransformStream({
start(c) {
controller = c;
}
}, undefined, {
size() {
++calls;
if (calls < 2) {
writePromise = writer.write('a');
}
return 1;
},
highWaterMark: Infinity
});
writer = ts.writable.getWriter();
// Give pull() a chance to be called.
return delay(0).then(() => {
// This enqueue results in synchronous calls to size(), write(), transform() and enqueue().
controller.enqueue('b');
assert_equals(calls, 2, 'size() should have been called twice');
return Promise.all([writePromise, writer.close()]);
}).then(() => {
return readableStreamToArray(ts.readable);
}).then(array => {
// Because one call to enqueue() is nested inside the other, they finish in the opposite order that they were
// called, so the chunks end up reverse order.
assert_array_equals(array, ['a', 'b'], 'both chunks should have been enqueued');
});
}, 'synchronous writer.write() inside size() should work');

promise_test(() => {
let writer;
let closePromise;
let controller;
const ts = new TransformStream({
start(c) {
controller = c;
}
}, undefined, {
size() {
closePromise = writer.close();
return 1;
},
highWaterMark: 1
});
writer = ts.writable.getWriter();
const reader = ts.readable.getReader();
// Wait for the promise returned by start() to be resolved so that the call to close() will result in a synchronous
// call to TransformStreamDefaultSink.
return delay(0).then(() => {
controller.enqueue('a');
return reader.read();
}).then(({ value, done }) => {
assert_false(done, 'done should be false');
assert_equals(value, 'a', 'value should be correct');
return reader.read();
}).then(({ done }) => {
assert_true(done, 'done should be true');
return closePromise;
});
}, 'writer.close() inside size() should work');

promise_test(t => {
let abortPromise;
let controller;
const ts = new TransformStream({
start(c) {
controller = c;
}
}, undefined, {
size() {
abortPromise = ts.writable.abort(error1);
return 1;
},
highWaterMark: 1
});
const reader = ts.readable.getReader();
// Wait for the promise returned by start() to be resolved so that the call to abort() will result in a synchronous
// call to TransformStreamDefaultSink.
return delay(0).then(() => {
controller.enqueue('a');
return Promise.all([promise_rejects(t, new TypeError(), reader.read(), 'read() should reject'), abortPromise]);
});
}, 'writer.abort() inside size() should work');

done();

0 comments on commit 168e56c

Please sign in to comment.