From e62d8e6f3150dd5a8d9e2b9b56e62365eb0d73e3 Mon Sep 17 00:00:00 2001 From: Keyhan Vakil <60900335+airtable-keyhanvakil@users.noreply.github.com> Date: Fri, 6 May 2022 15:01:04 -0700 Subject: [PATCH] worker: fix stream racing with terminate `OnStreamAfterReqFinished` uses `v8::Object::Has` to check if it needs to call `oncomplete`. `v8::Object::Has` needs to execute Javascript. However when worker threads are involved, `OnStreamAfterReqFinished` may be called after the worker thread termination has begun via `worker.terminate()`. This makes `v8::Object::Has` return `Nothing`, which triggers an assert. This diff fixes the issue by simply defaulting us to `false` in the case where `Nothing` is returned. This is sound because we can't execute `oncomplete` anyway as the isolate is terminating. Fixes: https://github.com/nodejs/node/issues/38418 PR-URL: https://github.com/nodejs/node/pull/42874 Reviewed-By: Darshan Sen Reviewed-By: Anna Henningsen Reviewed-By: Antoine du Hamel --- src/stream_base.cc | 1 + .../test-worker-http2-stream-terminate.js | 63 +++++++++++++++++++ 2 files changed, 64 insertions(+) create mode 100644 test/parallel/test-worker-http2-stream-terminate.js diff --git a/src/stream_base.cc b/src/stream_base.cc index a47fad970355dc..2b3fbe38ff2872 100644 --- a/src/stream_base.cc +++ b/src/stream_base.cc @@ -601,6 +601,7 @@ void ReportWritesToJSStreamListener::OnStreamAfterReqFinished( StreamReq* req_wrap, int status) { StreamBase* stream = static_cast(stream_); Environment* env = stream->stream_env(); + if (env->is_stopping()) return; AsyncWrap* async_wrap = req_wrap->GetAsyncWrap(); HandleScope handle_scope(env->isolate()); Context::Scope context_scope(env->context()); diff --git a/test/parallel/test-worker-http2-stream-terminate.js b/test/parallel/test-worker-http2-stream-terminate.js new file mode 100644 index 00000000000000..94e60e773c4b3e --- /dev/null +++ b/test/parallel/test-worker-http2-stream-terminate.js @@ -0,0 +1,63 @@ +'use strict'; +const common = require('../common'); +if (!common.hasCrypto) + common.skip('missing crypto'); +const assert = require('assert'); +const http2 = require('http2'); +const makeDuplexPair = require('../common/duplexpair'); +const { Worker, parentPort } = require('worker_threads'); + +// This test ensures that workers can be terminated without error while +// stream activity is ongoing, in particular the C++ function +// ReportWritesToJSStreamListener::OnStreamAfterReqFinished. + +const MAX_ITERATIONS = 20; +const MAX_THREADS = 10; + +// Do not use isMainThread so that this test itself can be run inside a Worker. +if (!process.env.HAS_STARTED_WORKER) { + process.env.HAS_STARTED_WORKER = 1; + + function spinWorker(iter) { + const w = new Worker(__filename); + w.on('message', common.mustCall((msg) => { + assert.strictEqual(msg, 'terminate'); + w.terminate(); + })); + + w.on('exit', common.mustCall(() => { + if (iter < MAX_ITERATIONS) + spinWorker(++iter); + })); + } + + for (let i = 0; i < MAX_THREADS; i++) { + spinWorker(0); + } +} else { + const server = http2.createServer(); + let i = 0; + server.on('stream', (stream, headers) => { + if (i === 1) { + parentPort.postMessage('terminate'); + } + i++; + + stream.end(''); + }); + + const { clientSide, serverSide } = makeDuplexPair(); + server.emit('connection', serverSide); + + const client = http2.connect('http://localhost:80', { + createConnection: () => clientSide, + }); + + function makeRequests() { + for (let i = 0; i < 3; i++) { + client.request().end(); + } + setImmediate(makeRequests); + } + makeRequests(); +}