diff --git a/Makefile b/Makefile index 119f073542..ed5c8a330d 100644 --- a/Makefile +++ b/Makefile @@ -299,7 +299,8 @@ XML_TEST_BINARIES := \ # test suite TEST_BINARIES := \ block_sim \ - test_libnimbus_lc + test_libnimbus_lc \ + process_state .PHONY: $(TEST_BINARIES) $(XML_TEST_BINARIES) force_build_alone_all_tests # Preset-dependent tests @@ -384,6 +385,14 @@ block_sim: | build deps $(NIM_PARAMS) && \ echo -e $(BUILD_END_MSG) "build/$@" +process_state: | build deps + + echo -e $(BUILD_MSG) "build/$@" && \ + MAKE="$(MAKE)" V="$(V)" $(ENV_SCRIPT) scripts/compile_nim_program.sh \ + $@ \ + "beacon_chain/$@.nim" \ + $(NIM_PARAMS) && \ + echo -e $(BUILD_END_MSG) "build/$@" + DISABLE_TEST_FIXTURES_SCRIPT := 0 # This parameter passing scheme is ugly, but short. test: | $(XML_TEST_BINARIES) $(TEST_BINARIES) diff --git a/beacon_chain/beacon_node_status.nim b/beacon_chain/beacon_node_status.nim deleted file mode 100644 index 8abb5c575d..0000000000 --- a/beacon_chain/beacon_node_status.nim +++ /dev/null @@ -1,18 +0,0 @@ -# beacon_chain -# Copyright (c) 2018-2024 Status Research & Development GmbH -# Licensed and distributed under either of -# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). -# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). -# at your option. This file may not be copied, modified, or distributed except according to those terms. - -{.push raises: [].} - -type - # "state" is already taken by BeaconState - BeaconNodeStatus* = enum - Starting - Running - Stopping - -# this needs to be global, so it can be set in the Ctrl+C signal handler -var bnStatus* = BeaconNodeStatus.Starting diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index 68d7a52139..efd3a7a8ff 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -23,9 +23,9 @@ import engine_authentication, weak_subjectivity, peerdas_helpers], ./sync/[sync_protocol, light_client_protocol, sync_overseer], ./validators/[keystore_management, beacon_validators], - "."/[ + ./[ beacon_node, beacon_node_light_client, deposits, - nimbus_binary_common, statusbar, trusted_node_sync, wallets] + nimbus_binary_common, process_state, statusbar, trusted_node_sync, wallets] when defined(posix): import system/ansi_c @@ -395,7 +395,7 @@ proc initFullNode( proc eventWaiter(): Future[void] {.async: (raises: [CancelledError]).} = await node.shutdownEvent.wait() - bnStatus = BeaconNodeStatus.Stopping + ProcessState.scheduleStop("shutdownEvent") asyncSpawn eventWaiter() @@ -741,14 +741,14 @@ proc init*(T: type BeaconNode, try: if config.numThreads < 0: fatal "The number of threads --num-threads cannot be negative." - quit 1 + quit QuitFailure elif config.numThreads == 0: Taskpool.new(numThreads = min(countProcessors(), 16)) else: Taskpool.new(numThreads = config.numThreads) except CatchableError as e: fatal "Cannot start taskpool", err = e.msg - quit 1 + quit QuitFailure info "Threadpool started", numThreads = taskpool.numThreads @@ -1931,7 +1931,7 @@ proc onSecond(node: BeaconNode, time: Moment) = if node.config.stopAtSyncedEpoch != 0 and node.dag.head.slot.epoch >= node.config.stopAtSyncedEpoch: notice "Shutting down after having reached the target synced epoch" - bnStatus = BeaconNodeStatus.Stopping + ProcessState.scheduleStop("stopAtSyncedEpoch") proc runOnSecondLoop(node: BeaconNode) {.async.} = const @@ -2159,8 +2159,6 @@ proc installMessageValidators(node: BeaconNode) = node.installLightClientMessageValidators() proc stop(node: BeaconNode) = - bnStatus = BeaconNodeStatus.Stopping - notice "Graceful shutdown" if not node.config.inProcessValidators: try: node.vcProcess.close() @@ -2179,7 +2177,7 @@ proc stop(node: BeaconNode) = notice "Databases closed" proc run(node: BeaconNode) {.raises: [CatchableError].} = - bnStatus = BeaconNodeStatus.Running + ProcessState.notifyRunning() if not isNil(node.restServer): node.restServer.installRestHandlers(node) @@ -2217,9 +2215,7 @@ proc run(node: BeaconNode) {.raises: [CatchableError].} = asyncSpawn runQueueProcessingLoop(node.blockProcessor) asyncSpawn runKeystoreCachePruningLoop(node.keystoreCache) - # main event loop - while bnStatus == BeaconNodeStatus.Running: - poll() # if poll fails, the network is broken + waitFor ProcessState.waitStopSignals() # time to say goodbye node.stop() @@ -2437,8 +2433,6 @@ proc doRunBeaconNode(config: var BeaconNodeConf, rng: ref HmacDrbgContext) {.rai ignoreDeprecatedOption web3ForcePolling ignoreDeprecatedOption finalizedDepositTreeSnapshot - createPidFile(config.dataDir.string / "beacon_node.pid") - config.createDumpDirs() # There are no managed event loops in here, to do a graceful shutdown, but @@ -2451,27 +2445,6 @@ proc doRunBeaconNode(config: var BeaconNodeConf, rng: ref HmacDrbgContext) {.rai for node in metadata.bootstrapNodes: config.bootstrapNodes.add node - ## Ctrl+C handling - proc controlCHandler() {.noconv.} = - when defined(windows): - # workaround for https://github.com/nim-lang/Nim/issues/4057 - try: - setupForeignThreadGc() - except Exception as exc: raiseAssert exc.msg # shouldn't happen - notice "Shutting down after having received SIGINT" - bnStatus = BeaconNodeStatus.Stopping - try: - setControlCHook(controlCHandler) - except Exception as exc: # TODO Exception - warn "Cannot set ctrl-c handler", msg = exc.msg - - # equivalent SIGTERM handler - when defined(posix): - proc SIGTERMHandler(signal: cint) {.noconv.} = - notice "Shutting down after having received SIGTERM" - bnStatus = BeaconNodeStatus.Stopping - c_signal(ansi_c.SIGTERM, SIGTERMHandler) - block: let res = if config.trustedSetupFile.isNone: @@ -2481,6 +2454,9 @@ proc doRunBeaconNode(config: var BeaconNodeConf, rng: ref HmacDrbgContext) {.rai if res.isErr(): raiseAssert res.error() + if ProcessState.stopping(): + return + let node = waitFor BeaconNode.init(rng, config, metadata) let metricsServer = (waitFor config.initMetricsServer()).valueOr: @@ -2492,7 +2468,7 @@ proc doRunBeaconNode(config: var BeaconNodeConf, rng: ref HmacDrbgContext) {.rai node.metricsServer = metricsServer - if bnStatus == BeaconNodeStatus.Stopping: + if ProcessState.stopping(): return when not defined(windows): @@ -2599,7 +2575,11 @@ proc handleStartUpCmd(config: var BeaconNodeConf) {.raises: [CatchableError].} = let rng = HmacDrbgContext.new() case config.cmd - of BNStartUpCmd.noCommand: doRunBeaconNode(config, rng) + of BNStartUpCmd.noCommand: + createPidFile(config.dataDir.string / "beacon_node.pid") + ProcessState.setupStopHandlers() + + doRunBeaconNode(config, rng) of BNStartUpCmd.deposits: doDeposits(config, rng[]) of BNStartUpCmd.wallets: doWallets(config, rng[]) of BNStartUpCmd.record: doRecord(config, rng[]) @@ -2665,7 +2645,7 @@ proc main() {.noinline, raises: [CatchableError].} = when defined(windows): if config.runAsService: proc exitService() = - bnStatus = BeaconNodeStatus.Stopping + ProcessState.scheduleStop("exitService") establishWindowsService(clientId, copyrights, nimBanner, SPEC_VERSION, "nimbus_beacon_node", BeaconNodeConf, handleStartUpCmd, exitService) diff --git a/beacon_chain/nimbus_binary_common.nim b/beacon_chain/nimbus_binary_common.nim index 989fde9cc2..e28599f9d9 100644 --- a/beacon_chain/nimbus_binary_common.nim +++ b/beacon_chain/nimbus_binary_common.nim @@ -21,7 +21,7 @@ import # Local modules ./spec/[helpers, keystore], ./spec/datatypes/base, - ./[beacon_clock, beacon_node_status, conf, version] + ./[beacon_clock, conf, process_state, version] when defaultChroniclesStream.outputs.type.arity == 2: from std/os import commandLineParams, getEnv, splitFile @@ -41,7 +41,7 @@ declareGauge nimVersionGauge, "Nim version info", ["version", "nim_commit"], nam nimVersionGauge.set(1, labelValues=[NimVersion, getNimGitHash()]) export - confutils, toml_serialization, beacon_clock, beacon_node_status, conf + confutils, toml_serialization, beacon_clock, conf type @@ -296,7 +296,7 @@ proc runSlotLoop*[T](node: T, startTime: BeaconTime, fatal "System time adjusted backwards significantly - clock may be inaccurate - shutting down", nextSlot = shortLog(nextSlot), wallSlot = shortLog(wallSlot) - bnStatus = BeaconNodeStatus.Stopping + ProcessState.scheduleStop("clock skew") return # Time moved back by a single slot - this could be a minor adjustment, diff --git a/beacon_chain/process_state.nim b/beacon_chain/process_state.nim new file mode 100644 index 0000000000..5718da0416 --- /dev/null +++ b/beacon_chain/process_state.nim @@ -0,0 +1,250 @@ +# beacon_chain +# Copyright (c) 2025 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +## Process state helper using a global variable to coordinate multithreaded +## shutdown in the presence of C signals +## +## The high-level idea is the following: +## +## * The main thread monitors OS signals using either `signal` or `waitSignal` +## * All other threads block signal handling using `ignoreStopSignalsInThread` +## * When the main thread launches another thread, it passes a "stop event" to +## the thread - this can be a chronos ThreadSignalPtr, a condvar/lock or any +## other cross-thread "wake-up" mechanism that can tell the thread that it's +## time to go +## * When a signal is activated, a global flag is set indicating that the +## polling loop of the main thread should stop +## * The main thread wakes up any threads it started and notifies them of the +## imminent shutdown then waits for them to terminate +## +## In this way, the main thread is notified that _some_ thread or the user wants +## the process to shut down. The main thread stops whatever it's doing and +## notifies all threads it started that shutdown is imminent and then proceeds +## with the shutdown. + +{.push raises: [].} + +import std/atomics, chronos, chronos/threadsync, chronicles + +type ProcessState* {.pure.} = enum + Starting + Running + Stopping + +var processState: Atomic[ProcessState] +var shutdownSource: Atomic[pointer] + +import system/ansi_c + +when defined(posix): + import posix + + when defined(linux): + var signalTarget = pthread_self() + + proc ignoreStopSignalsInThread*(_: type ProcessState): bool = + # Block signals in the current thread and all threads created from it + # (hopefully) + var signalMask, oldSignalMask: Sigset + + sigemptyset(signalMask) == 0 and sigaddset(signalMask, posix.SIGINT) == 0 and + sigaddset(signalMask, posix.SIGTERM) == 0 and + pthread_sigmask(SIG_BLOCK, signalMask, oldSignalMask) == 0 + + proc raiseStopSignal() = + # If the default signal handler is blocked and the app is polling, we still + # want the state updated - blocking signals on all threads is necessary for + # waitSignal to work, but the application might want to check _before_ the + # signal handler is invoked. + processState.store(ProcessState.Stopping) + + when defined(linux): + # On linux, we want to direct the signal to the thread that is currently + # listening on `waitSignal` - when there's no such thread, it doesn't + # really matter which thread it goes to + discard pthread_kill(signalTarget, posix.SIGTERM) + else: + # kqueue listens only to process-directed signals - for waitSignal to + # work as expected, we use kill + discard kill(getpid(), posix.SIGTERM) + +else: + proc ignoreStopSignalsInThread*(_: type ProcessState): bool = + true + + import chronos/osutils + + proc raiseStopSignal() = + discard c_raise(ansi_c.SIGINT) + # Chronos installs its own handlers that are incompatible with `raise` - + # when waitSignal is running we must also notify chronos + discard osutils.raiseSignal(chronos.SIGINT) + +proc scheduleStop*(_: type ProcessState, source: cstring) = + ## Schedule that the process should stop in a thread-safe way. This function + ## can be used from non-nim threads as well. + var nilptr: pointer + discard shutdownSource.compareExchange(nilptr, source) + + raiseStopSignal() + +proc notifyRunning*(_: type ProcessState) = + processState.store(ProcessState.Running, moRelaxed) + +proc setupStopHandlers*(_: type ProcessState) = + ## Install signal handlers for SIGINT/SIGTERM such that the application + ## updates `processState` on CTRL-C and similar, allowing it to gracefully + ## shut down by monitoring `ProcessState.running` at regular intervals. + ## + ## `async` applications should prefer to use + ## `await ProcessState.waitStopsignals()` since the CTRL-C handling provided + ## by `signal` does not wake the async polling loop and can therefore get + ## stuck if no events are happening. + ## + ## This function should be called early on from the main thread to avoid the + ## default Nim signal handlers from being used as these will crash or close + ## the application. + + proc controlCHandler(a: cint) {.noconv.} = + # Cannot log in here because that would imply memory allocations and system + # calls + let sourceName = + if a == ansi_c.SIGINT: + cstring("SIGINT") + else: + cstring("SIGTERM") + + var nilptr: pointer + discard shutdownSource.compareExchange(nilptr, sourceName) + # Should also provide synchronization for the shutdownSource write.. + processState.store(Stopping) + + # Nim sets signal handlers using `c_signal`, but unfortunately these are broken + # since they perform memory allocations and call unsafe system functions: + # https://github.com/nim-lang/Nim/blob/c6352ce0ab5fef061b43c8ca960ff7728541b30b/lib/system/excpt.nim#L622 + + # Avoid using `setControlCHook` since it has an exception effect + c_signal(ansi_c.SIGINT, controlCHandler) + + # equivalent SIGTERM handler - this is only set on posix systems since on + # windows, SIGTERM is not generated - however, chronos may generate them so + # below, in the chronos version, we do monitor it on all platforms. + # https://learn.microsoft.com/en-us/cpp/c-runtime-library/reference/signal?view=msvc-170 + when defined(posix): + c_signal(ansi_c.SIGTERM, controlCHandler) + +proc waitStopSignals*(_: type ProcessState) {.async: (raises: [CancelledError]).} = + ## Monitor stop signals via chronos' event loop, masking other handlers. + ## + ## This approach ensures that the event loop wakes up on signal delivery + ## unlike `setupStopHandlers` which merely sets a flag that must be polled. + ## + ## Only one thread should ever listen for stop signals this way. + + # Ensure other threads don't cause a crash, in case the application did not + # already call it + ProcessState.setupStopHandlers() + + let + sigint = waitSignal(chronos.SIGINT) + sigterm = waitSignal(chronos.SIGTERM) + + debug "Waiting for signal", chroniclesThreadIds = true + when defined(linux): + signalTarget = pthread_self() + + try: + discard await race(sigint, sigterm) + + var source = cast[cstring](shutdownSource.load()) + if source == nil: + source = "Unknown" + + notice "Shutting down", chroniclesThreadIds = true, source + + processState.store(ProcessState.Stopping, moRelaxed) + finally: + # waitSignal sometimes overwrites signal handlers: + # https://github.com/status-im/nim-chronos/issues/581 + ProcessState.setupStopHandlers() + + # Might be finished already, which is fine.. + await noCancel sigint.cancelAndWait() + await noCancel sigterm.cancelAndWait() + + +proc running*(_: type ProcessState): bool = + processState.load(moRelaxed) == ProcessState.Running + +proc stopping*(_: type ProcessState): bool = + processState.load(moRelaxed) == ProcessState.Stopping + +when isMainModule: # Test case + import os + + proc threadWork() {.async.} = + var todo = 2 + while todo > 0: + echo "Terminating in ", todo + + await sleepAsync(1.seconds) + todo -= 1 + + # Sends signal from non-main thread + ProcessState.scheduleStop("thread") + + echo "Waiting for the end... " + await sleepAsync(10.seconds) + + raiseAssert "Should not reach here, ie stopping the thread should not take 10s" + + proc worker(p: ThreadSignalPtr) {.thread.} = + let + stop = p.wait() + work = threadWork() + discard waitFor noCancel race(stop, work) + + waitFor noCancel stop.cancelAndWait() + waitFor noCancel work.cancelAndWait() + + proc main() {.raises: [CatchableError].} = + let stopper = ThreadSignalPtr.new().expect("working thread signal") + + var workerThread: Thread[ThreadSignalPtr] + createThread(workerThread, worker, stopper) + + # Setup sync stop handlers - these are used whenever `waitSignal` is not + # used - whenever a `waitSignals` future is active, these signals should be + # masked - even if they are not masked, they are harmless in that they + # set the same flag as `waitStopSignals` does. + ProcessState.setupStopHandlers() + + # Wait for a stop signal - this can be either the user pressing ctrl-c or + # an out-of-band notification via kill/windows service command / some rest + # API etc + waitFor ProcessState.waitStopSignals() + + # Notify the thread should stop itself as well using a ThreadSignalPtr + # rather than an OS signal + waitFor stopper.fire() + + workerThread.joinThread() + + # Now let's reset and try the sync API + ProcessState.notifyRunning() + ProcessState.scheduleStop("done") + + # poll for 10s, this should be enough even on platforms with async signal + # delivery (like windows, presumably?) + for i in 0 ..< 100: + if ProcessState.stopping(): + break + os.sleep(100) + + doAssert ProcessState.stopping() + + main() diff --git a/tests/test_keymanager_api.nim b/tests/test_keymanager_api.nim index 5a54c26e4f..74553c9192 100644 --- a/tests/test_keymanager_api.nim +++ b/tests/test_keymanager_api.nim @@ -26,7 +26,7 @@ import [keystore_management, slashing_protection_common, validator_pool], ../beacon_chain/networking/network_metadata, ../beacon_chain/rpc/rest_key_management_api, - ../beacon_chain/[conf, filepath, beacon_node, nimbus_beacon_node, beacon_node_status], + ../beacon_chain/[conf, filepath, beacon_node, nimbus_beacon_node, process_state], ../beacon_chain/validator_client/common, ../ncli/ncli_testnet, ./testutil @@ -2031,7 +2031,7 @@ proc delayedTests(basePort: int, pool: ref ValidatorPool, # validatorPool: pool, # keymanagerHost: host) - while bnStatus != BeaconNodeStatus.Running: + while not ProcessState.running: await sleepAsync(1.seconds) # asyncSpawn startValidatorClient(basePort) @@ -2046,9 +2046,12 @@ proc delayedTests(basePort: int, pool: ref ValidatorPool, # Re-enable it in a follow-up PR # await runTests(validatorClientKeymanager) - bnStatus = BeaconNodeStatus.Stopping + ProcessState.scheduleStop("stop") proc main(basePort: int) {.async.} = + # Overwrite the standard nim stop handlers + ProcessState.setupStopHandlers() + if dirExists(dataDir): os.removeDir dataDir