From 3a27b37a60eef828398806e1ee3c2631c687f116 Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Wed, 20 Aug 2025 11:03:48 +0200 Subject: [PATCH 01/13] Rework process shutdown (fixes #3235) The signal handler installed currently is prone to crashing as it logs from within the signal handler which may end up corrupting the memory allocator. Replace it with a process tracker: * shut down the application using chronos' waitSignal when available * avoid allocating memory in signal handler * track and print shutdown reason --- Makefile | 11 +- beacon_chain/beacon_node_status.nim | 18 --- beacon_chain/nimbus_beacon_node.nim | 56 +++---- beacon_chain/nimbus_binary_common.nim | 6 +- beacon_chain/process_state.nim | 222 ++++++++++++++++++++++++++ tests/test_keymanager_api.nim | 7 +- 6 files changed, 257 insertions(+), 63 deletions(-) delete mode 100644 beacon_chain/beacon_node_status.nim create mode 100644 beacon_chain/process_state.nim diff --git a/Makefile b/Makefile index 119f073542..ed5c8a330d 100644 --- a/Makefile +++ b/Makefile @@ -299,7 +299,8 @@ XML_TEST_BINARIES := \ # test suite TEST_BINARIES := \ block_sim \ - test_libnimbus_lc + test_libnimbus_lc \ + process_state .PHONY: $(TEST_BINARIES) $(XML_TEST_BINARIES) force_build_alone_all_tests # Preset-dependent tests @@ -384,6 +385,14 @@ block_sim: | build deps $(NIM_PARAMS) && \ echo -e $(BUILD_END_MSG) "build/$@" +process_state: | build deps + + echo -e $(BUILD_MSG) "build/$@" && \ + MAKE="$(MAKE)" V="$(V)" $(ENV_SCRIPT) scripts/compile_nim_program.sh \ + $@ \ + "beacon_chain/$@.nim" \ + $(NIM_PARAMS) && \ + echo -e $(BUILD_END_MSG) "build/$@" + DISABLE_TEST_FIXTURES_SCRIPT := 0 # This parameter passing scheme is ugly, but short. test: | $(XML_TEST_BINARIES) $(TEST_BINARIES) diff --git a/beacon_chain/beacon_node_status.nim b/beacon_chain/beacon_node_status.nim deleted file mode 100644 index 8abb5c575d..0000000000 --- a/beacon_chain/beacon_node_status.nim +++ /dev/null @@ -1,18 +0,0 @@ -# beacon_chain -# Copyright (c) 2018-2024 Status Research & Development GmbH -# Licensed and distributed under either of -# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). -# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). -# at your option. This file may not be copied, modified, or distributed except according to those terms. - -{.push raises: [].} - -type - # "state" is already taken by BeaconState - BeaconNodeStatus* = enum - Starting - Running - Stopping - -# this needs to be global, so it can be set in the Ctrl+C signal handler -var bnStatus* = BeaconNodeStatus.Starting diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index 68d7a52139..6506bb512e 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -23,9 +23,9 @@ import engine_authentication, weak_subjectivity, peerdas_helpers], ./sync/[sync_protocol, light_client_protocol, sync_overseer], ./validators/[keystore_management, beacon_validators], - "."/[ + ./[ beacon_node, beacon_node_light_client, deposits, - nimbus_binary_common, statusbar, trusted_node_sync, wallets] + nimbus_binary_common, process_state, statusbar, trusted_node_sync, wallets] when defined(posix): import system/ansi_c @@ -395,7 +395,7 @@ proc initFullNode( proc eventWaiter(): Future[void] {.async: (raises: [CancelledError]).} = await node.shutdownEvent.wait() - bnStatus = BeaconNodeStatus.Stopping + ProcessState.scheduleStop("shutdownEvent") asyncSpawn eventWaiter() @@ -741,14 +741,14 @@ proc init*(T: type BeaconNode, try: if config.numThreads < 0: fatal "The number of threads --num-threads cannot be negative." - quit 1 + quit QuitFailure elif config.numThreads == 0: Taskpool.new(numThreads = min(countProcessors(), 16)) else: Taskpool.new(numThreads = config.numThreads) except CatchableError as e: fatal "Cannot start taskpool", err = e.msg - quit 1 + quit QuitFailure info "Threadpool started", numThreads = taskpool.numThreads @@ -1931,7 +1931,7 @@ proc onSecond(node: BeaconNode, time: Moment) = if node.config.stopAtSyncedEpoch != 0 and node.dag.head.slot.epoch >= node.config.stopAtSyncedEpoch: notice "Shutting down after having reached the target synced epoch" - bnStatus = BeaconNodeStatus.Stopping + ProcessState.scheduleStop("stopAtSyncedEpoch") proc runOnSecondLoop(node: BeaconNode) {.async.} = const @@ -2159,8 +2159,6 @@ proc installMessageValidators(node: BeaconNode) = node.installLightClientMessageValidators() proc stop(node: BeaconNode) = - bnStatus = BeaconNodeStatus.Stopping - notice "Graceful shutdown" if not node.config.inProcessValidators: try: node.vcProcess.close() @@ -2179,7 +2177,7 @@ proc stop(node: BeaconNode) = notice "Databases closed" proc run(node: BeaconNode) {.raises: [CatchableError].} = - bnStatus = BeaconNodeStatus.Running + ProcessState.notifyRunning() if not isNil(node.restServer): node.restServer.installRestHandlers(node) @@ -2216,10 +2214,10 @@ proc run(node: BeaconNode) {.raises: [CatchableError].} = asyncSpawn runOnSecondLoop(node) asyncSpawn runQueueProcessingLoop(node.blockProcessor) asyncSpawn runKeystoreCachePruningLoop(node.keystoreCache) + asyncSpawn ProcessState.waitStopSignals() # main event loop - while bnStatus == BeaconNodeStatus.Running: - poll() # if poll fails, the network is broken + ProcessState.pollUntilStopped() # time to say goodbye node.stop() @@ -2437,8 +2435,6 @@ proc doRunBeaconNode(config: var BeaconNodeConf, rng: ref HmacDrbgContext) {.rai ignoreDeprecatedOption web3ForcePolling ignoreDeprecatedOption finalizedDepositTreeSnapshot - createPidFile(config.dataDir.string / "beacon_node.pid") - config.createDumpDirs() # There are no managed event loops in here, to do a graceful shutdown, but @@ -2451,27 +2447,6 @@ proc doRunBeaconNode(config: var BeaconNodeConf, rng: ref HmacDrbgContext) {.rai for node in metadata.bootstrapNodes: config.bootstrapNodes.add node - ## Ctrl+C handling - proc controlCHandler() {.noconv.} = - when defined(windows): - # workaround for https://github.com/nim-lang/Nim/issues/4057 - try: - setupForeignThreadGc() - except Exception as exc: raiseAssert exc.msg # shouldn't happen - notice "Shutting down after having received SIGINT" - bnStatus = BeaconNodeStatus.Stopping - try: - setControlCHook(controlCHandler) - except Exception as exc: # TODO Exception - warn "Cannot set ctrl-c handler", msg = exc.msg - - # equivalent SIGTERM handler - when defined(posix): - proc SIGTERMHandler(signal: cint) {.noconv.} = - notice "Shutting down after having received SIGTERM" - bnStatus = BeaconNodeStatus.Stopping - c_signal(ansi_c.SIGTERM, SIGTERMHandler) - block: let res = if config.trustedSetupFile.isNone: @@ -2481,6 +2456,9 @@ proc doRunBeaconNode(config: var BeaconNodeConf, rng: ref HmacDrbgContext) {.rai if res.isErr(): raiseAssert res.error() + if ProcessState.stopping(): + return + let node = waitFor BeaconNode.init(rng, config, metadata) let metricsServer = (waitFor config.initMetricsServer()).valueOr: @@ -2492,7 +2470,7 @@ proc doRunBeaconNode(config: var BeaconNodeConf, rng: ref HmacDrbgContext) {.rai node.metricsServer = metricsServer - if bnStatus == BeaconNodeStatus.Stopping: + if ProcessState.stopping(): return when not defined(windows): @@ -2599,7 +2577,11 @@ proc handleStartUpCmd(config: var BeaconNodeConf) {.raises: [CatchableError].} = let rng = HmacDrbgContext.new() case config.cmd - of BNStartUpCmd.noCommand: doRunBeaconNode(config, rng) + of BNStartUpCmd.noCommand: + createPidFile(config.dataDir.string / "beacon_node.pid") + ProcessState.setupStopHandlers() + + doRunBeaconNode(config, rng) of BNStartUpCmd.deposits: doDeposits(config, rng[]) of BNStartUpCmd.wallets: doWallets(config, rng[]) of BNStartUpCmd.record: doRecord(config, rng[]) @@ -2665,7 +2647,7 @@ proc main() {.noinline, raises: [CatchableError].} = when defined(windows): if config.runAsService: proc exitService() = - bnStatus = BeaconNodeStatus.Stopping + ProcessState.scheduleStop("exitService") establishWindowsService(clientId, copyrights, nimBanner, SPEC_VERSION, "nimbus_beacon_node", BeaconNodeConf, handleStartUpCmd, exitService) diff --git a/beacon_chain/nimbus_binary_common.nim b/beacon_chain/nimbus_binary_common.nim index 989fde9cc2..e28599f9d9 100644 --- a/beacon_chain/nimbus_binary_common.nim +++ b/beacon_chain/nimbus_binary_common.nim @@ -21,7 +21,7 @@ import # Local modules ./spec/[helpers, keystore], ./spec/datatypes/base, - ./[beacon_clock, beacon_node_status, conf, version] + ./[beacon_clock, conf, process_state, version] when defaultChroniclesStream.outputs.type.arity == 2: from std/os import commandLineParams, getEnv, splitFile @@ -41,7 +41,7 @@ declareGauge nimVersionGauge, "Nim version info", ["version", "nim_commit"], nam nimVersionGauge.set(1, labelValues=[NimVersion, getNimGitHash()]) export - confutils, toml_serialization, beacon_clock, beacon_node_status, conf + confutils, toml_serialization, beacon_clock, conf type @@ -296,7 +296,7 @@ proc runSlotLoop*[T](node: T, startTime: BeaconTime, fatal "System time adjusted backwards significantly - clock may be inaccurate - shutting down", nextSlot = shortLog(nextSlot), wallSlot = shortLog(wallSlot) - bnStatus = BeaconNodeStatus.Stopping + ProcessState.scheduleStop("clock skew") return # Time moved back by a single slot - this could be a minor adjustment, diff --git a/beacon_chain/process_state.nim b/beacon_chain/process_state.nim new file mode 100644 index 0000000000..34f0fc914a --- /dev/null +++ b/beacon_chain/process_state.nim @@ -0,0 +1,222 @@ +# beacon_chain +# Copyright (c) 2025 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + + +## Process state helper using a global variable to coordinate multithreaded +## shutdown in the presence of C signals +## +## The high-level idea is the following: +## +## * The main thread monitors OS signals using either `signal` or `waitSignal` +## * All other threads block signal handling using `ignoreStopSignalsInThread` +## * When the main thread launches another thread, it passes a "stop event" to +## the thread - this can be a chronos ThreadSignalPtr, a condvar/lock or any +## other cross-thread "wake-up" mechanism that can tell the thread that it's +## time to go +## * When a signal is activated, a global flag is set indicating that the +## polling loop of the main thread should stop +## * The main thread wakes up any threads it started and notifies them of the +## imminent shutdown then waits for them to terminate +## +## In this way, the main thread is notified that _some_ thread or the user wants +## the process to shut down. The main thread stops whatever it's doing and +## notifies all threads it started that shutdown is imminent and then proceeds +## with the shutdown. + +{.push raises: [].} + +import std/atomics, chronos, chronos/threadsync, chronicles + +type + ProcessState* {.pure.} = enum + Starting + Running + Stopping + +var processState: Atomic[ProcessState] +var shutdownSource: Atomic[pointer] + +import system/ansi_c + +when defined(posix): + import posix + var signalTarget = pthread_self() + + proc ignoreStopSignalsInThread*(_: type ProcessState) = + # Block all signals in this thread, so we don't interfere with regular signal + # handling elsewhere. + var signalMask, oldSignalMask: Sigset + + if sigemptyset(signalMask) != 0: + fatal "Error creating signal mask", err = osErrorMsg(osLastError()) + quit(QuitFailure) + + if sigaddset(signalMask, posix.SIGINT) != 0: + fatal "Error updating signal mask", err = osErrorMsg(osLastError()) + quit(QuitFailure) + if sigaddset(signalMask, posix.SIGTERM) != 0: + fatal "Error updating signal mask", err = osErrorMsg(osLastError()) + quit(QuitFailure) + + if pthread_sigmask(SIG_BLOCK, signalMask, oldSignalMask) != 0: + fatal "Error setting signal mask", err = osErrorMsg(osLastError()) + quit(QuitFailure) + + debug "Ignoring signals in thread", chroniclesThreadIds = true + + proc raiseStopSignal() = + # Main thread that is monitoring the signals... + discard pthread_kill(signalTarget, posix.SIGTERM) + +else: + proc ignoreStopSignalsInThread*(_: type ProcessState) = + discard + + proc raiseStopSignal() = + discard c_raise(ansi_c.SIGINT) + +proc scheduleStop*(_: type ProcessState, source: cstring) = + debug "Scheduling shutdown", source + var nilptr: pointer + discard shutdownSource.compareExchange(nilptr, source) + raiseStopSignal() + +proc notifyRunning*(_: type ProcessState) = + processState.store(ProcessState.Running, moRelaxed) + +proc setupStopHandlers*(_: type ProcessState) = + ## Install signal handlers for SIGINT/SIGTERM such that the application + ## updates `processState` on CTRL-C and similar, allowing it to gracefully + ## shut down. + ## + ## The CTRL-C handling provided by `signal` does not wake the async polling + ## loop and can therefore get stuck if no events are happening - see + ## `waitStopSignals` for a version that works with the chronos poll loop. + ## + ## This function should be called early on from the main thread to avoid the + ## default Nim signal handlers from being used as these will crash or close + ## the application. + ## + ## Non-main threads should instead call `ignoreStopSignalsInThread` + + proc controlCHandler(a: cint) {.noconv.} = + # Cannot log in here because that would imply memory allocations and system + # calls + let sourceName = case a + of ansi_c.SIGINT: + cstring("SIGINT") + of ansi_c.SIGTERM: + cstring("SIGTERM") + else: + cstring("SIGNAL") + + var nilptr: pointer + discard shutdownSource.compareExchange(nilptr, sourceName) + # Should also provide synchronization for the shutdownSource write.. + processState.store(Stopping) + + # Avoid using `setControlCHook` since it has an exception effect + c_signal(ansi_c.SIGINT, controlCHandler) + + # equivalent SIGTERM handler + when declared(ansi_c.SIGTERM): + c_signal(ansi_c.SIGTERM, controlCHandler) + +proc waitStopSignals*(_: type ProcessState) {.async: (raises: [CancelledError]).} = + ## Monitor stop signals via chronos' event loop, masking other handlers. + ## + ## This approach ensures that the event loop wakes up on signal delivery + ## unlike `setupStopHandlers` which merely sets a flag that must be polled. + let + sigint = waitSignal(chronos.SIGINT) + sigterm = waitSignal(chronos.SIGTERM) + + debug "Waiting for signal", chroniclesThreadIds = true + + try: + discard await race(sigint, sigterm) + processState.store(ProcessState.Stopping, moRelaxed) + finally: + # Might be finished already, which is fine.. + await noCancel sigint.cancelAndWait() + await noCancel sigterm.cancelAndWait() + +proc stopping*(_: type ProcessState): bool = + processState.load(moRelaxed) == ProcessState.Stopping + +proc pollUntilStopped*(_: type ProcessState) = + while processState.load(moRelaxed) != ProcessState.Stopping: + poll() + + var source = cast[cstring](shutdownSource.load()) + if source == nil: + source = "Unknown" + notice "Shutting down", chroniclesThreadIds = true, source + +when isMainModule: # Test case + import os + + proc threadWork() {.async.} = + var todo = 2 + while todo > 0: + echo "Terminating in ", todo + + await sleepAsync(1.seconds) + todo -= 1 + + ProcessState.scheduleStop("thread") + + echo "Waiting for the end... " + await sleepAsync(10.seconds) + + raiseAssert "Should not reach here, ie stopping the thread should not take 10s" + + proc worker(p: ThreadSignalPtr) {.thread.} = + ProcessState.ignoreStopSignalsInThread() + let + stop = p.wait() + work = threadWork() + discard waitFor noCancel race(stop, work) + + waitFor noCancel stop.cancelAndWait() + waitFor noCancel work.cancelAndWait() + + proc main() {.raises: [CatchableError].} = + let stopper = ThreadSignalPtr.new().expect("working thread signal") + + var workerThread: Thread[ThreadSignalPtr] + createThread(workerThread, worker, stopper) + + # Setup sync stop handlers - these are used whenever `waitSignal` is not + # used - whenever a `waitSignals` future is active, these signals should be + # masked - even if they are not masked, they are harmless in that they + # set the same flag as `waitStopSignals` does. + ProcessState.setupStopHandlers() + + let stop = ProcessState.waitStopSignals() + + ProcessState.pollUntilStopped() + waitFor stopper.fire() + + waitFor stop.cancelAndWait() + + workerThread.joinThread() + + ProcessState.notifyRunning() + + # The async waiting has finished - let's try the sync waiting + ProcessState.scheduleStop("done") + + # poll for 10s, this should be enough + for i in 0..<100: + if ProcessState.stopping(): + break + os.sleep(100) + + doAssert ProcessState.stopping() + + main() diff --git a/tests/test_keymanager_api.nim b/tests/test_keymanager_api.nim index 5a54c26e4f..6844d6bd9e 100644 --- a/tests/test_keymanager_api.nim +++ b/tests/test_keymanager_api.nim @@ -26,7 +26,7 @@ import [keystore_management, slashing_protection_common, validator_pool], ../beacon_chain/networking/network_metadata, ../beacon_chain/rpc/rest_key_management_api, - ../beacon_chain/[conf, filepath, beacon_node, nimbus_beacon_node, beacon_node_status], + ../beacon_chain/[conf, filepath, beacon_node, nimbus_beacon_node, process_state], ../beacon_chain/validator_client/common, ../ncli/ncli_testnet, ./testutil @@ -2031,8 +2031,7 @@ proc delayedTests(basePort: int, pool: ref ValidatorPool, # validatorPool: pool, # keymanagerHost: host) - while bnStatus != BeaconNodeStatus.Running: - await sleepAsync(1.seconds) + ProcessState.pollUntilStopped() # asyncSpawn startValidatorClient(basePort) @@ -2046,7 +2045,7 @@ proc delayedTests(basePort: int, pool: ref ValidatorPool, # Re-enable it in a follow-up PR # await runTests(validatorClientKeymanager) - bnStatus = BeaconNodeStatus.Stopping + ProcessState.scheduleStop("stop") proc main(basePort: int) {.async.} = if dirExists(dataDir): From 69d23b82f5043de02ed137f062d3ee9574fe783c Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Wed, 20 Aug 2025 15:43:28 +0200 Subject: [PATCH 02/13] no need to handle sigterm on windows, normally --- beacon_chain/process_state.nim | 33 +++++++++++++++++---------------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/beacon_chain/process_state.nim b/beacon_chain/process_state.nim index 34f0fc914a..f47accf00d 100644 --- a/beacon_chain/process_state.nim +++ b/beacon_chain/process_state.nim @@ -5,7 +5,6 @@ # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). # at your option. This file may not be copied, modified, or distributed except according to those terms. - ## Process state helper using a global variable to coordinate multithreaded ## shutdown in the presence of C signals ## @@ -31,11 +30,10 @@ import std/atomics, chronos, chronos/threadsync, chronicles -type - ProcessState* {.pure.} = enum - Starting - Running - Stopping +type ProcessState* {.pure.} = enum + Starting + Running + Stopping var processState: Atomic[ProcessState] var shutdownSource: Atomic[pointer] @@ -106,13 +104,11 @@ proc setupStopHandlers*(_: type ProcessState) = proc controlCHandler(a: cint) {.noconv.} = # Cannot log in here because that would imply memory allocations and system # calls - let sourceName = case a - of ansi_c.SIGINT: - cstring("SIGINT") - of ansi_c.SIGTERM: - cstring("SIGTERM") - else: - cstring("SIGNAL") + let sourceName = + if a == ansi_c.SIGINT: + cstring("SIGINT") + else: + cstring("SIGTERM") var nilptr: pointer discard shutdownSource.compareExchange(nilptr, sourceName) @@ -133,7 +129,11 @@ proc waitStopSignals*(_: type ProcessState) {.async: (raises: [CancelledError]). ## unlike `setupStopHandlers` which merely sets a flag that must be polled. let sigint = waitSignal(chronos.SIGINT) - sigterm = waitSignal(chronos.SIGTERM) + sigterm = + when defined(windows): + default(array[0, FutureBase]) + else: + [waitSignal(chronos.SIGTERM)] debug "Waiting for signal", chroniclesThreadIds = true @@ -143,7 +143,8 @@ proc waitStopSignals*(_: type ProcessState) {.async: (raises: [CancelledError]). finally: # Might be finished already, which is fine.. await noCancel sigint.cancelAndWait() - await noCancel sigterm.cancelAndWait() + for f in sigterm: + await noCancel f.cancelAndWait() proc stopping*(_: type ProcessState): bool = processState.load(moRelaxed) == ProcessState.Stopping @@ -212,7 +213,7 @@ when isMainModule: # Test case ProcessState.scheduleStop("done") # poll for 10s, this should be enough - for i in 0..<100: + for i in 0 ..< 100: if ProcessState.stopping(): break os.sleep(100) From 83da573c5446e2be1417d4d36cb114fc306e2a81 Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Wed, 20 Aug 2025 17:19:28 +0200 Subject: [PATCH 03/13] oops --- beacon_chain/process_state.nim | 42 +++++++++++++++++++++------------- 1 file changed, 26 insertions(+), 16 deletions(-) diff --git a/beacon_chain/process_state.nim b/beacon_chain/process_state.nim index f47accf00d..e0962fef55 100644 --- a/beacon_chain/process_state.nim +++ b/beacon_chain/process_state.nim @@ -127,24 +127,33 @@ proc waitStopSignals*(_: type ProcessState) {.async: (raises: [CancelledError]). ## ## This approach ensures that the event loop wakes up on signal delivery ## unlike `setupStopHandlers` which merely sets a flag that must be polled. - let - sigint = waitSignal(chronos.SIGINT) - sigterm = - when defined(windows): - default(array[0, FutureBase]) - else: - [waitSignal(chronos.SIGTERM)] - debug "Waiting for signal", chroniclesThreadIds = true + when defined(posix): + let + sigint = waitSignal(chronos.SIGINT) + sigterm = waitSignal(chronos.SIGTERM) + + debug "Waiting for signal", chroniclesThreadIds = true + + try: + discard await race(sigint, sigterm) + processState.store(ProcessState.Stopping, moRelaxed) + finally: + # Might be finished already, which is fine.. + await noCancel sigint.cancelAndWait() + await noCancel sigterm.cancelAndWait() + + else: + let sigint = waitSignal(chronos.SIGINT) + + debug "Waiting for signal", chroniclesThreadIds = true - try: - discard await race(sigint, sigterm) - processState.store(ProcessState.Stopping, moRelaxed) - finally: - # Might be finished already, which is fine.. - await noCancel sigint.cancelAndWait() - for f in sigterm: - await noCancel f.cancelAndWait() + try: + discard await race(sigint) + processState.store(ProcessState.Stopping, moRelaxed) + finally: + # Might be finished already, which is fine.. + await noCancel sigint.cancelAndWait() proc stopping*(_: type ProcessState): bool = processState.load(moRelaxed) == ProcessState.Stopping @@ -169,6 +178,7 @@ when isMainModule: # Test case await sleepAsync(1.seconds) todo -= 1 + # Sends signal from non-main thread ProcessState.scheduleStop("thread") echo "Waiting for the end... " From 54140c73efe03d23d64b20f4e4ec8b87d6b5ddef Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Thu, 21 Aug 2025 15:04:39 +0200 Subject: [PATCH 04/13] rm `pollUntilStopped`, capture sigterm in chronos/windows --- beacon_chain/nimbus_beacon_node.nim | 4 +- beacon_chain/process_state.nim | 117 +++++++++++++--------------- tests/test_keymanager_api.nim | 3 +- 3 files changed, 59 insertions(+), 65 deletions(-) diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index 6506bb512e..efd3a7a8ff 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -2214,10 +2214,8 @@ proc run(node: BeaconNode) {.raises: [CatchableError].} = asyncSpawn runOnSecondLoop(node) asyncSpawn runQueueProcessingLoop(node.blockProcessor) asyncSpawn runKeystoreCachePruningLoop(node.keystoreCache) - asyncSpawn ProcessState.waitStopSignals() - # main event loop - ProcessState.pollUntilStopped() + waitFor ProcessState.waitStopSignals() # time to say goodbye node.stop() diff --git a/beacon_chain/process_state.nim b/beacon_chain/process_state.nim index e0962fef55..7187f1fe81 100644 --- a/beacon_chain/process_state.nim +++ b/beacon_chain/process_state.nim @@ -44,41 +44,38 @@ when defined(posix): import posix var signalTarget = pthread_self() - proc ignoreStopSignalsInThread*(_: type ProcessState) = - # Block all signals in this thread, so we don't interfere with regular signal - # handling elsewhere. + proc ignoreStopSignalsInThread*(_: type ProcessState): bool = + # Block stop signals in the calling thread - this can be used to avoid + # having certain threads be interrupted by process-directed signals var signalMask, oldSignalMask: Sigset if sigemptyset(signalMask) != 0: - fatal "Error creating signal mask", err = osErrorMsg(osLastError()) - quit(QuitFailure) + return false if sigaddset(signalMask, posix.SIGINT) != 0: - fatal "Error updating signal mask", err = osErrorMsg(osLastError()) - quit(QuitFailure) + return false if sigaddset(signalMask, posix.SIGTERM) != 0: - fatal "Error updating signal mask", err = osErrorMsg(osLastError()) - quit(QuitFailure) + return false if pthread_sigmask(SIG_BLOCK, signalMask, oldSignalMask) != 0: - fatal "Error setting signal mask", err = osErrorMsg(osLastError()) - quit(QuitFailure) + return false - debug "Ignoring signals in thread", chroniclesThreadIds = true + true proc raiseStopSignal() = # Main thread that is monitoring the signals... discard pthread_kill(signalTarget, posix.SIGTERM) else: - proc ignoreStopSignalsInThread*(_: type ProcessState) = - discard + proc ignoreStopSignalsInThread*(_: type ProcessState): bool = + true proc raiseStopSignal() = discard c_raise(ansi_c.SIGINT) proc scheduleStop*(_: type ProcessState, source: cstring) = - debug "Scheduling shutdown", source + ## Schedule that the process should stop in a thread-safe way. This function + ## can be used from non-nim threads as well. var nilptr: pointer discard shutdownSource.compareExchange(nilptr, source) raiseStopSignal() @@ -89,17 +86,16 @@ proc notifyRunning*(_: type ProcessState) = proc setupStopHandlers*(_: type ProcessState) = ## Install signal handlers for SIGINT/SIGTERM such that the application ## updates `processState` on CTRL-C and similar, allowing it to gracefully - ## shut down. + ## shut down by monitoring `ProcessState.running` at regular intervals. ## - ## The CTRL-C handling provided by `signal` does not wake the async polling - ## loop and can therefore get stuck if no events are happening - see - ## `waitStopSignals` for a version that works with the chronos poll loop. + ## `async` applications should prefer to use + ## `await ProcessState.waitStopsignals()` since the CTRL-C handling provided + ## by `signal` does not wake the async polling loop and can therefore get + ## stuck if no events are happening. ## ## This function should be called early on from the main thread to avoid the ## default Nim signal handlers from being used as these will crash or close ## the application. - ## - ## Non-main threads should instead call `ignoreStopSignalsInThread` proc controlCHandler(a: cint) {.noconv.} = # Cannot log in here because that would imply memory allocations and system @@ -115,11 +111,18 @@ proc setupStopHandlers*(_: type ProcessState) = # Should also provide synchronization for the shutdownSource write.. processState.store(Stopping) + # Nim sets signal handlers using `c_signal`, but unfortunately these are broken + # since they perform memory allocations and call unsafe system functions: + # https://github.com/nim-lang/Nim/blob/c6352ce0ab5fef061b43c8ca960ff7728541b30b/lib/system/excpt.nim#L622 + # Avoid using `setControlCHook` since it has an exception effect c_signal(ansi_c.SIGINT, controlCHandler) - # equivalent SIGTERM handler - when declared(ansi_c.SIGTERM): + # equivalent SIGTERM handler - this is only set on posix systems since on + # windows, SIGTERM is not generated - however, chronos may generate them so + # below, in the chronos version, we do monitor it on all platforms. + # https://learn.microsoft.com/en-us/cpp/c-runtime-library/reference/signal?view=msvc-170 + when defined(posix): c_signal(ansi_c.SIGTERM, controlCHandler) proc waitStopSignals*(_: type ProcessState) {.async: (raises: [CancelledError]).} = @@ -127,46 +130,36 @@ proc waitStopSignals*(_: type ProcessState) {.async: (raises: [CancelledError]). ## ## This approach ensures that the event loop wakes up on signal delivery ## unlike `setupStopHandlers` which merely sets a flag that must be polled. + ## + ## Make sure to call `ignoreStopSignalsInThread` - when defined(posix): - let - sigint = waitSignal(chronos.SIGINT) - sigterm = waitSignal(chronos.SIGTERM) + let + sigint = waitSignal(chronos.SIGINT) + sigterm = waitSignal(chronos.SIGTERM) - debug "Waiting for signal", chroniclesThreadIds = true + debug "Waiting for signal", chroniclesThreadIds = true - try: - discard await race(sigint, sigterm) - processState.store(ProcessState.Stopping, moRelaxed) - finally: - # Might be finished already, which is fine.. - await noCancel sigint.cancelAndWait() - await noCancel sigterm.cancelAndWait() + try: + discard await race(sigint, sigterm) - else: - let sigint = waitSignal(chronos.SIGINT) + var source = cast[cstring](shutdownSource.load()) + if source == nil: + source = "Unknown" - debug "Waiting for signal", chroniclesThreadIds = true + notice "Shutting down", chroniclesThreadIds = true, source - try: - discard await race(sigint) - processState.store(ProcessState.Stopping, moRelaxed) - finally: - # Might be finished already, which is fine.. - await noCancel sigint.cancelAndWait() + processState.store(ProcessState.Stopping, moRelaxed) + finally: + # Might be finished already, which is fine.. + await noCancel sigint.cancelAndWait() + await noCancel sigterm.cancelAndWait() + +proc running*(_: type ProcessState): bool = + processState.load(moRelaxed) == ProcessState.Running proc stopping*(_: type ProcessState): bool = processState.load(moRelaxed) == ProcessState.Stopping -proc pollUntilStopped*(_: type ProcessState) = - while processState.load(moRelaxed) != ProcessState.Stopping: - poll() - - var source = cast[cstring](shutdownSource.load()) - if source == nil: - source = "Unknown" - notice "Shutting down", chroniclesThreadIds = true, source - when isMainModule: # Test case import os @@ -187,7 +180,7 @@ when isMainModule: # Test case raiseAssert "Should not reach here, ie stopping the thread should not take 10s" proc worker(p: ThreadSignalPtr) {.thread.} = - ProcessState.ignoreStopSignalsInThread() + doAssert ProcessState.ignoreStopSignalsInThread() let stop = p.wait() work = threadWork() @@ -208,21 +201,23 @@ when isMainModule: # Test case # set the same flag as `waitStopSignals` does. ProcessState.setupStopHandlers() - let stop = ProcessState.waitStopSignals() + # Wait for a stop signal - this can be either the user pressing ctrl-c or + # an out-of-band notification via kill/windows service command / some rest + # API etc + waitFor ProcessState.waitStopSignals() - ProcessState.pollUntilStopped() + # Notify the thread should stop itself as well using a ThreadSignalPtr + # rather than an OS signal waitFor stopper.fire() - waitFor stop.cancelAndWait() - workerThread.joinThread() + # Now let's reset and try the sync API ProcessState.notifyRunning() - - # The async waiting has finished - let's try the sync waiting ProcessState.scheduleStop("done") - # poll for 10s, this should be enough + # poll for 10s, this should be enough even on platforms with async signal + # delivery (like windows, presumably?) for i in 0 ..< 100: if ProcessState.stopping(): break diff --git a/tests/test_keymanager_api.nim b/tests/test_keymanager_api.nim index 6844d6bd9e..d14d25a0b0 100644 --- a/tests/test_keymanager_api.nim +++ b/tests/test_keymanager_api.nim @@ -2031,7 +2031,8 @@ proc delayedTests(basePort: int, pool: ref ValidatorPool, # validatorPool: pool, # keymanagerHost: host) - ProcessState.pollUntilStopped() + while not ProcessState.running: + await sleepAsync(1.seconds) # asyncSpawn startValidatorClient(basePort) From 21815629fcf9a75e709f1f519a60f46b5be23883 Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Fri, 22 Aug 2025 07:39:37 +0200 Subject: [PATCH 05/13] logging --- beacon_chain/process_state.nim | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/beacon_chain/process_state.nim b/beacon_chain/process_state.nim index 7187f1fe81..ec9fab1d98 100644 --- a/beacon_chain/process_state.nim +++ b/beacon_chain/process_state.nim @@ -78,6 +78,9 @@ proc scheduleStop*(_: type ProcessState, source: cstring) = ## can be used from non-nim threads as well. var nilptr: pointer discard shutdownSource.compareExchange(nilptr, source) + + c_printf("XXXXXX: schedule %d\n") + raiseStopSignal() proc notifyRunning*(_: type ProcessState) = @@ -105,6 +108,7 @@ proc setupStopHandlers*(_: type ProcessState) = cstring("SIGINT") else: cstring("SIGTERM") + c_printf("XXXXXX: handler %d\n") var nilptr: pointer discard shutdownSource.compareExchange(nilptr, sourceName) @@ -180,7 +184,6 @@ when isMainModule: # Test case raiseAssert "Should not reach here, ie stopping the thread should not take 10s" proc worker(p: ThreadSignalPtr) {.thread.} = - doAssert ProcessState.ignoreStopSignalsInThread() let stop = p.wait() work = threadWork() From 0c68a5f3fa2f893b23178cbcd12786a860daf7c2 Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Fri, 22 Aug 2025 10:40:24 +0200 Subject: [PATCH 06/13] raise chronos signal also --- beacon_chain/process_state.nim | 2 ++ 1 file changed, 2 insertions(+) diff --git a/beacon_chain/process_state.nim b/beacon_chain/process_state.nim index ec9fab1d98..1b80044e7e 100644 --- a/beacon_chain/process_state.nim +++ b/beacon_chain/process_state.nim @@ -72,6 +72,8 @@ else: proc raiseStopSignal() = discard c_raise(ansi_c.SIGINT) + # Chronos installs its own handlers that are incompatible with `raise`: + discard os.raiseSignal(chronos.SIGINT) proc scheduleStop*(_: type ProcessState, source: cstring) = ## Schedule that the process should stop in a thread-safe way. This function From 16c9e9d815ed1bf36598d0838d0ba703db05d142 Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Fri, 22 Aug 2025 10:50:12 +0200 Subject: [PATCH 07/13] raise instead of pthread_kill --- beacon_chain/process_state.nim | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/beacon_chain/process_state.nim b/beacon_chain/process_state.nim index 1b80044e7e..03e7ab7189 100644 --- a/beacon_chain/process_state.nim +++ b/beacon_chain/process_state.nim @@ -42,8 +42,6 @@ import system/ansi_c when defined(posix): import posix - var signalTarget = pthread_self() - proc ignoreStopSignalsInThread*(_: type ProcessState): bool = # Block stop signals in the calling thread - this can be used to avoid # having certain threads be interrupted by process-directed signals @@ -63,8 +61,7 @@ when defined(posix): true proc raiseStopSignal() = - # Main thread that is monitoring the signals... - discard pthread_kill(signalTarget, posix.SIGTERM) + c_raise(posix.SIGTERM) else: proc ignoreStopSignalsInThread*(_: type ProcessState): bool = From ea7707e0ea98ecc2df3d6d117d6e7c0eea008bc8 Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Fri, 22 Aug 2025 13:03:15 +0200 Subject: [PATCH 08/13] oops --- beacon_chain/process_state.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon_chain/process_state.nim b/beacon_chain/process_state.nim index 03e7ab7189..3ffd602e33 100644 --- a/beacon_chain/process_state.nim +++ b/beacon_chain/process_state.nim @@ -61,7 +61,7 @@ when defined(posix): true proc raiseStopSignal() = - c_raise(posix.SIGTERM) + discard c_raise(posix.SIGTERM) else: proc ignoreStopSignalsInThread*(_: type ProcessState): bool = From 156b830053ee91eee65a251f8b433ef1aef0cb6f Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Fri, 22 Aug 2025 13:38:25 +0200 Subject: [PATCH 09/13] more oops --- beacon_chain/process_state.nim | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/beacon_chain/process_state.nim b/beacon_chain/process_state.nim index 3ffd602e33..812e845626 100644 --- a/beacon_chain/process_state.nim +++ b/beacon_chain/process_state.nim @@ -67,10 +67,13 @@ else: proc ignoreStopSignalsInThread*(_: type ProcessState): bool = true + import chronos/osutils + proc raiseStopSignal() = discard c_raise(ansi_c.SIGINT) - # Chronos installs its own handlers that are incompatible with `raise`: - discard os.raiseSignal(chronos.SIGINT) + # Chronos installs its own handlers that are incompatible with `raise` - + # when waitSignal is running we must also notify chronos + discard osutils.raiseSignal(chronos.SIGINT) proc scheduleStop*(_: type ProcessState, source: cstring) = ## Schedule that the process should stop in a thread-safe way. This function From ecf94d4e3b925d4c3a4ddb043d39ab388222aebc Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Fri, 22 Aug 2025 15:23:24 +0200 Subject: [PATCH 10/13] kqueue --- beacon_chain/process_state.nim | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/beacon_chain/process_state.nim b/beacon_chain/process_state.nim index 812e845626..715f85affd 100644 --- a/beacon_chain/process_state.nim +++ b/beacon_chain/process_state.nim @@ -42,6 +42,10 @@ import system/ansi_c when defined(posix): import posix + + when defined(linux): + var signalTarget = pthread_self() + proc ignoreStopSignalsInThread*(_: type ProcessState): bool = # Block stop signals in the calling thread - this can be used to avoid # having certain threads be interrupted by process-directed signals @@ -61,7 +65,15 @@ when defined(posix): true proc raiseStopSignal() = - discard c_raise(posix.SIGTERM) + when defined(linux): + # On linux, we want to direct the signal to the thread that is currently + # listening on `waitSignal` - when there's no such thread, it doesn't + # really matter which thread it goes to + discard pthread_kill(signalTarget, posix.SIGTERM) + else: + # kqueue listens only to process-directed signals - for waitSignal to + # work as expected, we use kill + discard kill(getpid(), posix.SIGTERM) else: proc ignoreStopSignalsInThread*(_: type ProcessState): bool = @@ -81,8 +93,6 @@ proc scheduleStop*(_: type ProcessState, source: cstring) = var nilptr: pointer discard shutdownSource.compareExchange(nilptr, source) - c_printf("XXXXXX: schedule %d\n") - raiseStopSignal() proc notifyRunning*(_: type ProcessState) = @@ -137,13 +147,15 @@ proc waitStopSignals*(_: type ProcessState) {.async: (raises: [CancelledError]). ## This approach ensures that the event loop wakes up on signal delivery ## unlike `setupStopHandlers` which merely sets a flag that must be polled. ## - ## Make sure to call `ignoreStopSignalsInThread` + ## Only one thread should ever listen for stop signals this way. let sigint = waitSignal(chronos.SIGINT) sigterm = waitSignal(chronos.SIGTERM) debug "Waiting for signal", chroniclesThreadIds = true + when defined(linux): + signalTarget = pthread_self() try: discard await race(sigint, sigterm) From 0d858612181b85b30137fcac54965b98dfb1062f Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Fri, 22 Aug 2025 15:47:42 +0200 Subject: [PATCH 11/13] more hacks --- beacon_chain/process_state.nim | 26 +++++++++++--------------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/beacon_chain/process_state.nim b/beacon_chain/process_state.nim index 715f85affd..7db179a7f2 100644 --- a/beacon_chain/process_state.nim +++ b/beacon_chain/process_state.nim @@ -47,24 +47,21 @@ when defined(posix): var signalTarget = pthread_self() proc ignoreStopSignalsInThread*(_: type ProcessState): bool = - # Block stop signals in the calling thread - this can be used to avoid - # having certain threads be interrupted by process-directed signals + # Block signals in the current thread and all threads created from it + # (hopefully) var signalMask, oldSignalMask: Sigset - if sigemptyset(signalMask) != 0: - return false - - if sigaddset(signalMask, posix.SIGINT) != 0: - return false - if sigaddset(signalMask, posix.SIGTERM) != 0: - return false - - if pthread_sigmask(SIG_BLOCK, signalMask, oldSignalMask) != 0: - return false - - true + sigemptyset(signalMask) == 0 and sigaddset(signalMask, posix.SIGINT) == 0 and + sigaddset(signalMask, posix.SIGTERM) == 0 and + pthread_sigmask(SIG_BLOCK, signalMask, oldSignalMask) == 0 proc raiseStopSignal() = + # If the default signal handler is blocked and the app is polling, we still + # want the state updated - blocking signals on all threads is necessary for + # waitSignal to work, but the application might want to check _before_ the + # signal handler is invoked. + processState.store(ProcessState.Stopping) + when defined(linux): # On linux, we want to direct the signal to the thread that is currently # listening on `waitSignal` - when there's no such thread, it doesn't @@ -120,7 +117,6 @@ proc setupStopHandlers*(_: type ProcessState) = cstring("SIGINT") else: cstring("SIGTERM") - c_printf("XXXXXX: handler %d\n") var nilptr: pointer discard shutdownSource.compareExchange(nilptr, sourceName) From 341495e60db408df41fac8b5e45834f558fdec3d Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Fri, 22 Aug 2025 20:33:38 +0200 Subject: [PATCH 12/13] test --- tests/test_keymanager_api.nim | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/test_keymanager_api.nim b/tests/test_keymanager_api.nim index d14d25a0b0..74553c9192 100644 --- a/tests/test_keymanager_api.nim +++ b/tests/test_keymanager_api.nim @@ -2049,6 +2049,9 @@ proc delayedTests(basePort: int, pool: ref ValidatorPool, ProcessState.scheduleStop("stop") proc main(basePort: int) {.async.} = + # Overwrite the standard nim stop handlers + ProcessState.setupStopHandlers() + if dirExists(dataDir): os.removeDir dataDir From 7d5f45ccc8e9767667d5f94b34d8c06b7a914d66 Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Sat, 23 Aug 2025 09:18:48 +0200 Subject: [PATCH 13/13] restore signal state after waitSignal --- beacon_chain/process_state.nim | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/beacon_chain/process_state.nim b/beacon_chain/process_state.nim index 7db179a7f2..5718da0416 100644 --- a/beacon_chain/process_state.nim +++ b/beacon_chain/process_state.nim @@ -145,6 +145,10 @@ proc waitStopSignals*(_: type ProcessState) {.async: (raises: [CancelledError]). ## ## Only one thread should ever listen for stop signals this way. + # Ensure other threads don't cause a crash, in case the application did not + # already call it + ProcessState.setupStopHandlers() + let sigint = waitSignal(chronos.SIGINT) sigterm = waitSignal(chronos.SIGTERM) @@ -164,10 +168,15 @@ proc waitStopSignals*(_: type ProcessState) {.async: (raises: [CancelledError]). processState.store(ProcessState.Stopping, moRelaxed) finally: + # waitSignal sometimes overwrites signal handlers: + # https://github.com/status-im/nim-chronos/issues/581 + ProcessState.setupStopHandlers() + # Might be finished already, which is fine.. await noCancel sigint.cancelAndWait() await noCancel sigterm.cancelAndWait() + proc running*(_: type ProcessState): bool = processState.load(moRelaxed) == ProcessState.Running