From 3d97fdab8b350a641dd90284fd7edd750e890912 Mon Sep 17 00:00:00 2001 From: TseIan Date: Fri, 20 Mar 2026 23:28:21 +0800 Subject: [PATCH] child-process: watch pipe peer close event --- lib/internal/child_process.js | 9 +- src/pipe_wrap.cc | 99 +++++++++++++++++++ src/pipe_wrap.h | 12 +++ .../test-child-process-stdin-close-event.js | 27 +++++ 4 files changed, 146 insertions(+), 1 deletion(-) create mode 100644 test/parallel/test-child-process-stdin-close-event.js diff --git a/lib/internal/child_process.js b/lib/internal/child_process.js index 9eac06d1fdf145..91db80653c54a8 100644 --- a/lib/internal/child_process.js +++ b/lib/internal/child_process.js @@ -333,7 +333,14 @@ function flushStdio(subprocess) { function createSocket(pipe, readable) { - return net.Socket({ handle: pipe, readable }); + const socket = net.Socket({ handle: pipe, readable }); + if (!readable && + process.platform !== 'win32' && + typeof pipe?.watchPeerClose === 'function') { + pipe.watchPeerClose(() => socket.destroy()); + socket.once('close', () => pipe.unwatchPeerClose?.()); + } + return socket; } diff --git a/src/pipe_wrap.cc b/src/pipe_wrap.cc index 770f0847aec59f..806429d38f4d2b 100644 --- a/src/pipe_wrap.cc +++ b/src/pipe_wrap.cc @@ -28,6 +28,7 @@ #include "handle_wrap.h" #include "node.h" #include "node_buffer.h" +#include "node_errors.h" #include "node_external_reference.h" #include "stream_base-inl.h" #include "stream_wrap.h" @@ -80,6 +81,8 @@ void PipeWrap::Initialize(Local target, SetProtoMethod(isolate, t, "listen", Listen); SetProtoMethod(isolate, t, "connect", Connect); SetProtoMethod(isolate, t, "open", Open); + SetProtoMethod(isolate, t, "watchPeerClose", WatchPeerClose); + SetProtoMethod(isolate, t, "unwatchPeerClose", UnwatchPeerClose); #ifdef _WIN32 SetProtoMethod(isolate, t, "setPendingInstances", SetPendingInstances); @@ -110,6 +113,8 @@ void PipeWrap::RegisterExternalReferences(ExternalReferenceRegistry* registry) { registry->Register(Listen); registry->Register(Connect); registry->Register(Open); + registry->Register(WatchPeerClose); + registry->Register(UnwatchPeerClose); #ifdef _WIN32 registry->Register(SetPendingInstances); #endif @@ -159,6 +164,10 @@ PipeWrap::PipeWrap(Environment* env, // Suggestion: uv_pipe_init() returns void. } +PipeWrap::~PipeWrap() { + peer_close_watching_ = false; + peer_close_cb_.Reset(); + void PipeWrap::Bind(const FunctionCallbackInfo& args) { PipeWrap* wrap; ASSIGN_OR_RETURN_UNWRAP(&wrap, args.This()); @@ -213,6 +222,96 @@ void PipeWrap::Open(const FunctionCallbackInfo& args) { args.GetReturnValue().Set(err); } +void PipeWrap::WatchPeerClose(const FunctionCallbackInfo& args) { + PipeWrap* wrap; + ASSIGN_OR_RETURN_UNWRAP(&wrap, args.This()); + + if (!wrap->IsAlive()) { + return args.GetReturnValue().Set(UV_EBADF); + } + + if (wrap->peer_close_watching_) { + return args.GetReturnValue().Set(0); + } + + CHECK_GT(args.Length(), 0); + CHECK(args[0]->IsFunction()); + + Environment* env = wrap->env(); + Isolate* isolate = env->isolate(); + + // Store the JS callback securely so it isn't garbage collected. + wrap->peer_close_cb_.Reset(isolate, args[0].As()); + wrap->peer_close_watching_ = true; + + // Start reading to detect EOF/ECONNRESET from the peer. + // We use our custom allocator and reader, ignoring actual data. + int err = uv_read_start(wrap->stream(), PeerCloseAlloc, PeerCloseRead); + if (err != 0) { + wrap->peer_close_watching_ = false; + wrap->peer_close_cb_.Reset(); + } + args.GetReturnValue().Set(err); +} + +void PipeWrap::UnwatchPeerClose(const FunctionCallbackInfo& args) { + PipeWrap* wrap; + ASSIGN_OR_RETURN_UNWRAP(&wrap, args.This()); + + if (!wrap->peer_close_watching_) { + wrap->peer_close_cb_.Reset(); + return args.GetReturnValue().Set(0); + } + + // Stop listening and release the JS callback to prevent memory leaks. + wrap->peer_close_watching_ = false; + wrap->peer_close_cb_.Reset(); + args.GetReturnValue().Set(uv_read_stop(wrap->stream())); +} + +void PipeWrap::PeerCloseAlloc(uv_handle_t* handle, + size_t suggested_size, + uv_buf_t* buf) { + // We only care about EOF, not the actual data. + // Using a static 1-byte buffer avoids dynamic memory allocation overhead. + static char scratch; + *buf = uv_buf_init(&scratch, 1); +} + +void PipeWrap::PeerCloseRead(uv_stream_t* stream, + ssize_t nread, + const uv_buf_t* buf) { + PipeWrap* wrap = static_cast(stream->data); + if (wrap == nullptr || !wrap->peer_close_watching_) return; + + // Ignore actual data reads or EAGAIN (0). We only watch for disconnects. + if (nread > 0 || nread == 0) return; + + // Wait specifically for EOF or connection reset (peer closed). + if (nread != UV_EOF && nread != UV_ECONNRESET) return; + + // Peer has closed the connection. Stop reading immediately. + wrap->peer_close_watching_ = false; + uv_read_stop(stream); + + if (wrap->peer_close_cb_.IsEmpty()) return; + Environment* env = wrap->env(); + Isolate* isolate = env->isolate(); + + // Set up V8 context and handles to safely execute the JS callback. + v8::HandleScope handle_scope(isolate); + v8::Context::Scope context_scope(env->context()); + Local cb = wrap->peer_close_cb_.Get(isolate); + // Reset before calling to prevent re-entrancy issues + wrap->peer_close_cb_.Reset(); + + errors::TryCatchScope try_catch(env); + try_catch.SetVerbose(true); + + // MakeCallback properly tracks AsyncHooks context and flushes microtasks. + wrap->MakeCallback(cb, 0, nullptr); +} + void PipeWrap::Connect(const FunctionCallbackInfo& args) { Environment* env = Environment::GetCurrent(args); diff --git a/src/pipe_wrap.h b/src/pipe_wrap.h index c0722b63d85372..a8f7cd33eee5d1 100644 --- a/src/pipe_wrap.h +++ b/src/pipe_wrap.h @@ -54,6 +54,7 @@ class PipeWrap : public ConnectionWrap { SET_SELF_SIZE(PipeWrap) private: + ~PipeWrap() override; PipeWrap(Environment* env, v8::Local object, ProviderType provider, @@ -64,12 +65,23 @@ class PipeWrap : public ConnectionWrap { static void Listen(const v8::FunctionCallbackInfo& args); static void Connect(const v8::FunctionCallbackInfo& args); static void Open(const v8::FunctionCallbackInfo& args); + static void WatchPeerClose(const v8::FunctionCallbackInfo& args); + static void UnwatchPeerClose(const v8::FunctionCallbackInfo& args); + static void PeerCloseAlloc(uv_handle_t* handle, + size_t suggested_size, + uv_buf_t* buf); + static void PeerCloseRead(uv_stream_t* stream, + ssize_t nread, + const uv_buf_t* buf); #ifdef _WIN32 static void SetPendingInstances( const v8::FunctionCallbackInfo& args); #endif static void Fchmod(const v8::FunctionCallbackInfo& args); + + bool peer_close_watching_ = false; + v8::Global peer_close_cb_; }; diff --git a/test/parallel/test-child-process-stdin-close-event.js b/test/parallel/test-child-process-stdin-close-event.js new file mode 100644 index 00000000000000..c734b48eff26e0 --- /dev/null +++ b/test/parallel/test-child-process-stdin-close-event.js @@ -0,0 +1,27 @@ +'use strict'; + +const common = require('../common'); +const assert = require('assert'); +const { spawn } = require('child_process'); + +if (common.isWindows) { + common.skip('Not applicable on Windows'); +} + +const child = spawn(process.execPath, [ + '-e', + 'require("fs").closeSync(0); setTimeout(() => {}, 2000)', +], { stdio: ['pipe', 'ignore', 'ignore'] }); + +const timeout = setTimeout(() => { + assert.fail('stdin close event was not emitted'); +}, 1000); + +child.stdin.on('close', common.mustCall(() => { + clearTimeout(timeout); + child.kill(); +})); + +child.on('exit', common.mustCall(() => { + clearTimeout(timeout); +})); \ No newline at end of file