Skip to content

Commit

Permalink
Separate node shutdown and node shutdown at a particular slot concerns
Browse files Browse the repository at this point in the history
  • Loading branch information
Jimbo4350 committed Nov 30, 2021
1 parent 54119c8 commit bfd8ca3
Show file tree
Hide file tree
Showing 8 changed files with 142 additions and 185 deletions.
@@ -1,31 +1,29 @@
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Cardano.Benchmarking.GeneratorTx.LocalProtocolDefinition
( CliError (..)
, runBenchmarkScriptWith
, startProtocol
) where

import Prelude (error, show)
import Paths_tx_generator (version)
import Prelude (error, show)

import Data.Version (showVersion)
import Data.Text (pack)
import Data.Version (showVersion)

import Cardano.Prelude hiding (TypeError, show)
import Control.Monad.Trans.Except.Extra (firstExceptT)

import Ouroboros.Consensus.Config
( configBlock, configCodec)
import Ouroboros.Consensus.Config.SupportsNode
(ConfigSupportsNode(..), getNetworkMagic)
import Ouroboros.Consensus.Config (configBlock, configCodec)
import Ouroboros.Consensus.Config.SupportsNode (ConfigSupportsNode (..), getNetworkMagic)
import Ouroboros.Network.Block (MaxSlotNo (..))
import Ouroboros.Network.NodeToClient (IOManager)
import Ouroboros.Network.Block (MaxSlotNo(..))

import Cardano.Api

Expand All @@ -41,7 +39,8 @@ import Cardano.Benchmarking.DSL
import Cardano.Benchmarking.Tracer

import Cardano.Benchmarking.GeneratorTx.NodeToNode
import Cardano.Benchmarking.OuroborosImports (getGenesis, protocolToTopLevelConfig, protocolToNetworkId)
import Cardano.Benchmarking.OuroborosImports (getGenesis, protocolToNetworkId,
protocolToTopLevelConfig)

import qualified Cardano.Benchmarking.GeneratorTx as GeneratorTx
import qualified Cardano.Benchmarking.GeneratorTx.Tx as GeneratorTx
Expand Down Expand Up @@ -127,7 +126,7 @@ startProtocol logConfigFile = do
NodeProtocolConfigurationCardano byronConfig shelleyConfig alonzoConfig hardforkConfig -> do
ptcl :: SomeConsensusProtocol <- firstExceptT (ProtocolInstantiationError . pack . show) $
mkSomeConsensusProtocolCardano byronConfig shelleyConfig alonzoConfig hardforkConfig Nothing

loggingLayer <- mkLoggingLayer nc ptcl
return (loggingLayer, ptcl)
where
Expand All @@ -150,7 +149,7 @@ startProtocol logConfigFile = do
, shelleyBulkCredsFile = Just ""
}
, pncValidateDB = Last $ Just False
, pncShutdownIPC = Last $ Just Nothing
, pncShutdownIPC = Last Nothing
, pncShutdownOnSlotSynced = Last $ Just NoMaxSlotNo
, pncConfigFile = Last $ Just configFp
}
Expand Down
1 change: 0 additions & 1 deletion cardano-node/cardano-node.cabal
Expand Up @@ -138,7 +138,6 @@ library
, ouroboros-consensus-shelley
, ouroboros-network
, ouroboros-network-framework
, process
, psqueues
, safe-exceptions
, scientific
Expand Down
8 changes: 4 additions & 4 deletions cardano-node/src/Cardano/Node/Configuration/POM.hs
Expand Up @@ -41,7 +41,8 @@ import Cardano.Crypto (RequiresNetworkMagic (..))
import Cardano.Node.Protocol.Types (Protocol (..))
import Cardano.Node.Types
import Cardano.Tracing.Config
import Ouroboros.Consensus.Mempool.API (MempoolCapacityBytesOverride (..), MempoolCapacityBytes (..))
import Ouroboros.Consensus.Mempool.API (MempoolCapacityBytes (..),
MempoolCapacityBytesOverride (..))
import Ouroboros.Consensus.Storage.LedgerDB.DiskPolicy (SnapshotInterval (..))
import Ouroboros.Network.Block (MaxSlotNo (..))
import Ouroboros.Network.NodeToNode (DiffusionMode (..))
Expand Down Expand Up @@ -151,7 +152,7 @@ data PartialNodeConfiguration
, pncDatabaseFile :: !(Last DbFile)
, pncProtocolFiles :: !(Last ProtocolFilepaths)
, pncValidateDB :: !(Last Bool)
, pncShutdownIPC :: !(Last (Maybe Fd))
, pncShutdownIPC :: !(Last Fd)
, pncShutdownOnSlotSynced :: !(Last MaxSlotNo)

-- Protocol-specific parameters:
Expand Down Expand Up @@ -440,7 +441,6 @@ makeNodeConfiguration pnc = do
databaseFile <- lastToEither "Missing DatabaseFile" $ pncDatabaseFile pnc
protocolFiles <- lastToEither "Missing ProtocolFiles" $ pncProtocolFiles pnc
validateDB <- lastToEither "Missing ValidateDB" $ pncValidateDB pnc
shutdownIPC <- lastToEither "Missing ShutdownIPC" $ pncShutdownIPC pnc
shutdownOnSlotSynced <- lastToEither "Missing ShutdownOnSlotSynced" $ pncShutdownOnSlotSynced pnc
protocolConfig <- lastToEither "Missing ProtocolConfig" $ pncProtocolConfig pnc
loggingSwitch <- lastToEither "Missing LoggingSwitch" $ pncLoggingSwitch pnc
Expand Down Expand Up @@ -483,7 +483,7 @@ makeNodeConfiguration pnc = do
, ncDatabaseFile = databaseFile
, ncProtocolFiles = protocolFiles
, ncValidateDB = validateDB
, ncShutdownIPC = shutdownIPC
, ncShutdownIPC = getLast $ pncShutdownIPC pnc
, ncShutdownOnSlotSynced = shutdownOnSlotSynced
, ncProtocolConfig = protocolConfig
, ncSocketPath = getLast $ pncSocketPath pnc
Expand Down
181 changes: 47 additions & 134 deletions cardano-node/src/Cardano/Node/Handlers/Shutdown.hs
Expand Up @@ -5,181 +5,94 @@
module Cardano.Node.Handlers.Shutdown
(
-- * Generalised shutdown handling
ShutdownFDs
, withShutdownHandling

-- * Requesting shutdown
, ShutdownDoorbell
, getShutdownDoorbell
, triggerShutdown
withShutdownHandling

-- * Watch ChainDB for passing a configured slot sync limit threshold,
-- translating it to a graceful shutdown.
, maybeSpawnOnSlotSyncedShutdownHandler
)
where

import Cardano.Prelude hiding (ByteString, atomically, take, trace)

import qualified Control.Concurrent.Async as Async
import Data.Text (pack)
import Prelude

import Control.Concurrent.Async (race_)
import Control.Exception
import Control.Monad
import Data.Text (Text, pack)
import qualified GHC.IO.Handle.FD as IO (fdToHandle)
import System.Exit
import qualified System.IO as IO
import qualified System.IO.Error as IO
import System.Posix.Types (Fd (Fd))
import qualified System.Process as IO (createPipeFd)

import Cardano.BM.Data.Tracer (TracingVerbosity (..), severityNotice, trTransformer)
import Cardano.BM.Trace
import Cardano.Slotting.Slot (WithOrigin (..))
import Control.Tracer
import qualified Ouroboros.Consensus.Storage.ChainDB as ChainDB
import Ouroboros.Consensus.Util.ResourceRegistry (ResourceRegistry)
import Ouroboros.Consensus.Util.STM (Watcher(..), forkLinkedWatcher)
import Ouroboros.Consensus.Util.STM (Watcher (..), forkLinkedWatcher)
import Ouroboros.Network.Block (MaxSlotNo (..), SlotNo, pointSlot)

import Cardano.Node.Configuration.POM (NodeConfiguration (..))

-- | 'ShutdownFDs' mediate the graceful shutdown requests,
-- either external or internal to the process.
--
-- In the external mediation case, the parent process passes us the file descriptor
-- number of the read end of a pipe, via the CLI with @--shutdown-ipc FD@.
-- In the internal mediation case, we create our own pipe.
-- In both cases we store the accessible ends in 'ShutdownFDs'.
--
-- In either case, if the write end gets closed, either deliberately:
-- - by internal call of 'triggerShutdown' on 'ShutdownFDs', or
-- - by parent process
-- ..or automatically, because the parent process itself terminated,
-- then we initiate a clean shutdown.
data ShutdownFDs
= NoShutdownFDs
| ExternalShutdown !ShutdownListener
-- ^ Shutdown to be provided by external process.
| InternalShutdown !ShutdownListener !ShutdownDoorbell
-- ^ Shutdown to be provided from within the process.

-- | FD used to send an EOF-based request for shutdown.
newtype ShutdownDoorbell = ShutdownDoorbell { _doorbellFd :: Fd }

-- | FD we're listening on for the EOF signalling the shutdown.
newtype ShutdownListener = ShutdownListener { _listenerFd :: Fd }

-- | Gracefully handle shutdown requests, if requested by 'ShutdownFDs'.
--
-- The file descriptor wrapped in a 'ShutdownListener' designates the
-- receiving end of the shutdown signalling communication channel.
-- The opposite end might be either internal or external to the node process.
withShutdownHandler :: Maybe ShutdownListener -> Trace IO Text -> IO () -> IO ()
withShutdownHandler listener trace action
| Just (ShutdownListener fd) <- listener =
Async.race_ (wrapUninterruptableIO $ waitForEOF fd) action
| otherwise = action
where
waitForEOF :: Fd -> IO ()
waitForEOF (Fd fd) = do
hnd <- IO.fdToHandle fd
r <- try $ IO.hGetChar hnd
case r of
Left e
| IO.isEOFError e -> traceWith tracer "received shutdown request"
| otherwise -> throwIO e

Right _ ->
throwIO $ IO.userError "--shutdown-ipc FD does not expect input"

tracer :: Tracer IO Text
tracer = trTransformer MaximalVerbosity (severityNotice trace)

sfdsListener :: ShutdownFDs -> Maybe ShutdownListener
sfdsListener = \case
ExternalShutdown r -> Just r
InternalShutdown r _w -> Just r
_ -> Nothing

-- | Windows blocking file IO calls like 'hGetChar' are not interruptable by
-- asynchronous exceptions, as used by async 'cancel' (as of base-4.12).
--
-- This wrapper works around that problem by running the blocking IO in a
-- separate thread. If the parent thread receives an async cancel then it
-- will return. Note however that in this circumstance the child thread may
-- continue and remain blocked, leading to a leak of the thread. As such this
-- is only reasonable to use a fixed number of times for the whole process.
--
wrapUninterruptableIO :: IO a -> IO a
wrapUninterruptableIO action = async action >>= wait

-- | If 'ShutdownFDs' supports internal shutdown requests,
-- return its shutdown doorbell.
getShutdownDoorbell :: ShutdownFDs -> Maybe ShutdownDoorbell
getShutdownDoorbell (InternalShutdown _l doorbell) = Just doorbell
getShutdownDoorbell _ = Nothing

-- | Given the 'ShutdownDoorbell' component of 'ShutdownFDs',
-- and an explanation of the reason, request a graceful shutdown.
triggerShutdown :: ShutdownDoorbell -> Trace IO Text -> Text -> IO ()
triggerShutdown (ShutdownDoorbell (Fd shutFd)) trace reason = do
traceWith (trTransformer MaximalVerbosity $ severityNotice trace)
("Ringing the node shutdown doorbell: " <> reason)
IO.hClose =<< IO.fdToHandle shutFd

-- | We provide an optional cross-platform method to politely request shut down.
--
-- For the duration of 'action', we gracefully handle shutdown requests,
-- external or internal, as requested by configuration in 'NodeCLI',
-- while allocating corresponding 'ShutdownFDs', and providing them to the 'action'.
-- The parent process passes us the file descriptor number of the read end of a pipe,
-- via the CLI with @--shutdown-ipc FD@
withShutdownHandling
:: NodeConfiguration
:: Maybe Fd
-> Trace IO Text
-> (ShutdownFDs -> IO ())
-> IO ()
withShutdownHandling nc trace action = do
sfds <- decideShutdownFds nc
withShutdownHandler (sfdsListener sfds) trace (action sfds)
-- ^ Action to potentially shutdown via file descriptor
-> IO ()
withShutdownHandling Nothing _ action = action
withShutdownHandling (Just fileDescriptor) trace action = do
race_ (waitForEOF fileDescriptor) action
where
decideShutdownFds :: NodeConfiguration -> IO ShutdownFDs
decideShutdownFds NodeConfiguration{ncShutdownIPC = Just fd} =
pure $ ExternalShutdown (ShutdownListener fd)
decideShutdownFds NodeConfiguration{ncShutdownOnSlotSynced = MaxSlotNo{}} =
mkInternalShutdown
decideShutdownFds _ = pure NoShutdownFDs

mkInternalShutdown :: IO ShutdownFDs
mkInternalShutdown = do
(r, w) <- IO.createPipeFd
pure $ InternalShutdown (ShutdownListener $ Fd r) (ShutdownDoorbell $ Fd w)

-- | If configuration in 'NodeCLI' and 'ShutdownFDs' agree,
-- spawn a thread that would cause node to shutdown upon ChainDB reaching the
tracer :: Tracer IO Text
tracer = trTransformer MaximalVerbosity (severityNotice trace)

waitForEOF :: Fd -> IO ()
waitForEOF (Fd fd) = do
hnd <- IO.fdToHandle fd
r <- try $ IO.hGetChar hnd
case r of
Left e
| IO.isEOFError e -> do
traceWith tracer "Received shutdown request and shutting node down..."
| otherwise -> do
traceWith tracer "Received shutdown request but did not encounter EOL in --shutdown-ipc FD"
throwIO e
Right inp -> do
traceWith tracer
$ "Received shutdown request but found unexpected input in --shutdown-ipc FD: " <> pack (show inp)

-- | Spawn a thread that would cause node to shutdown upon ChainDB reaching the
-- configuration-defined slot.
maybeSpawnOnSlotSyncedShutdownHandler
:: NodeConfiguration
-> ShutdownFDs
-> Trace IO Text
-> ResourceRegistry IO
-> ChainDB.ChainDB IO blk
-> IO ()
maybeSpawnOnSlotSyncedShutdownHandler nc sfds trace registry chaindb =
case (ncShutdownOnSlotSynced nc, sfds) of
(MaxSlotNo maxSlot, InternalShutdown _sl sd) -> do
maybeSpawnOnSlotSyncedShutdownHandler nc trace registry chaindb =
case ncShutdownOnSlotSynced nc of
NoMaxSlotNo -> return ()
MaxSlotNo maxSlot -> do
traceWith (trTransformer MaximalVerbosity $ severityNotice trace)
("will terminate upon reaching " <> pack (show maxSlot))
spawnSlotLimitTerminator maxSlot sd
(MaxSlotNo{}, _) -> panic
"internal error: slot-limited shutdown requested, but no proper ShutdownFDs passed."
_ -> pure ()
spawnSlotLimitTerminator maxSlot
where
spawnSlotLimitTerminator :: SlotNo -> ShutdownDoorbell -> IO ()
spawnSlotLimitTerminator maxSlot sd =
spawnSlotLimitTerminator :: SlotNo -> IO ()
spawnSlotLimitTerminator maxSlot =
void $ forkLinkedWatcher registry "slotLimitTerminator" Watcher {
wFingerprint = identity
wFingerprint = id
, wInitial = Nothing
, wNotify = \case
Origin -> pure ()
At cur -> when (cur >= maxSlot) $
triggerShutdown sd trace
("spawnSlotLimitTerminator: reached target " <> show cur)
At cur -> when (cur >= maxSlot) $ do
traceWith (trTransformer MaximalVerbosity $ severityNotice trace)
(("spawnSlotLimitTerminator: reached target " :: String) <> show cur)
throwIO ExitSuccess
, wReader = pointSlot <$> ChainDB.getTipPoint chaindb
}
19 changes: 10 additions & 9 deletions cardano-node/src/Cardano/Node/Parsers.hs
Expand Up @@ -20,7 +20,8 @@ import System.Posix.Types (Fd (..))

import Ouroboros.Network.Block (MaxSlotNo (..), SlotNo (..))

import Ouroboros.Consensus.Mempool.API (MempoolCapacityBytesOverride (..), MempoolCapacityBytes (..))
import Ouroboros.Consensus.Mempool.API (MempoolCapacityBytes (..),
MempoolCapacityBytesOverride (..))
import Ouroboros.Consensus.Storage.LedgerDB.DiskPolicy (SnapshotInterval (..))

import Cardano.Node.Configuration.POM (PartialNodeConfiguration (..), lastOption)
Expand All @@ -40,7 +41,7 @@ nodeRunParser = do
-- Filepaths
topFp <- lastOption parseTopologyFile
dbFp <- lastOption parseDbPath
socketFp <- lastOption $ parseSocketPath "Path to a cardano-node socket"
socketFp <- lastOption $ parseSocketPath "Path to a cardano-node socket"

-- Protocol files
byronCertFile <- optional parseByronDelegationCert
Expand Down Expand Up @@ -199,14 +200,14 @@ parseValidateDB =
<> help "Validate all on-disk database files"
)

parseShutdownIPC :: Parser (Maybe Fd)
parseShutdownIPC :: Parser Fd
parseShutdownIPC =
optional $ option (Fd <$> auto) (
long "shutdown-ipc"
<> metavar "FD"
<> help "Shut down the process when this inherited FD reaches EOF"
<> hidden
)
option (Fd <$> auto) (
long "shutdown-ipc"
<> metavar "FD"
<> help "Shut down the process when this inherited FD reaches EOF"
<> hidden
)

parseShutdownOnSlotSynced :: Parser MaxSlotNo
parseShutdownOnSlotSynced =
Expand Down

0 comments on commit bfd8ca3

Please sign in to comment.