Skip to content

Commit

Permalink
trace-dispatcher, cardano-node: forwarding initialization
Browse files Browse the repository at this point in the history
  • Loading branch information
jutaro authored and deepfire committed Nov 25, 2021
1 parent 4e3e4fd commit dec6fd5
Show file tree
Hide file tree
Showing 6 changed files with 257 additions and 237 deletions.
43 changes: 21 additions & 22 deletions cardano-node/src/Cardano/Node/Configuration/Logging.hs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ import qualified Cardano.BM.Trace as Trace
import Cardano.BM.Tracing

import qualified Cardano.Chain.Genesis as Gen
import qualified Cardano.Ledger.Shelley.API as SL
import qualified Ouroboros.Consensus.BlockchainTime.WallClock.Types as WCT
import Ouroboros.Consensus.Byron.Ledger.Conversions
import Ouroboros.Consensus.Cardano.Block
Expand All @@ -76,11 +77,11 @@ import Ouroboros.Consensus.Config.SupportsNode
import Ouroboros.Consensus.HardFork.Combinator.Degenerate
import Ouroboros.Consensus.Node.ProtocolInfo
import Ouroboros.Consensus.Shelley.Ledger.Ledger
import qualified Cardano.Ledger.Shelley.API as SL

import Cardano.Api.Protocol.Types (BlockType (..), protocolInfo)
import Cardano.Config.Git.Rev (gitRev)
import Cardano.Node.Configuration.POM (NodeConfiguration (..), ncProtocol)
import Cardano.Node.Configuration.POM (NodeConfiguration (..),
ncProtocol)
import Cardano.Node.Protocol.Types (SomeConsensusProtocol (..))
import Cardano.Node.Types
import Cardano.Slotting.Slot (EpochSize (..))
Expand Down Expand Up @@ -120,7 +121,7 @@ data LoggingLayer = LoggingLayer
, llConfiguration :: Configuration
, llAddBackend :: Backend Text -> BackendKind -> IO ()
, llSwitchboard :: Switchboard Text
, llEKGDirect :: Maybe EKGDirect
, llEKGDirect :: EKGDirect
}

data EKGDirect = EKGDirect
Expand Down Expand Up @@ -182,22 +183,20 @@ createLoggingLayer topt ver nodeConfig' p = do
when loggingEnabled $ liftIO $
loggingPreInit nodeConfig' logConfig switchBoard trace

mEKGServer <- liftIO $ Switchboard.getSbEKGServer switchBoard

mbEkgDirect <- case mEKGServer of
Nothing -> pure Nothing
Just sv -> do
refGauge <- liftIO $ newMVar Map.empty
refLabel <- liftIO $ newMVar Map.empty
refCounter <- liftIO $ newMVar Map.empty
pure $ Just EKGDirect {
ekgServer = sv
, ekgGauges = refGauge
, ekgLabels = refLabel
, ekgCounters = refCounter
}

pure $ mkLogLayer logConfig switchBoard mbEkgDirect trace
mbEKGServer <- liftIO $ Switchboard.getSbEKGServer switchBoard
case mbEKGServer of
Nothing -> panic "Can't get EKGServer from Switchboard"
Just sv -> do
refGauge <- liftIO $ newMVar Map.empty
refLabel <- liftIO $ newMVar Map.empty
refCounter <- liftIO $ newMVar Map.empty
let ekgDirect = EKGDirect {
ekgServer = sv
, ekgGauges = refGauge
, ekgLabels = refLabel
, ekgCounters = refCounter
}
pure $ mkLogLayer logConfig switchBoard ekgDirect trace
where
loggingPreInit
:: NodeConfiguration
Expand Down Expand Up @@ -251,8 +250,8 @@ createLoggingLayer topt ver nodeConfig' p = do
-- Record node metrics, if configured
startCapturingMetrics topt trace

mkLogLayer :: Configuration -> Switchboard Text -> Maybe EKGDirect -> Trace IO Text -> LoggingLayer
mkLogLayer logConfig switchBoard mbEkgDirect trace =
mkLogLayer :: Configuration -> Switchboard Text -> EKGDirect -> Trace IO Text -> LoggingLayer
mkLogLayer logConfig switchBoard ekgDirect trace =
LoggingLayer
{ llBasicTrace = Trace.natTrace liftIO trace
, llLogDebug = Trace.logDebug
Expand All @@ -269,7 +268,7 @@ createLoggingLayer topt ver nodeConfig' p = do
, llConfiguration = logConfig
, llAddBackend = Switchboard.addExternalBackend switchBoard
, llSwitchboard = switchBoard
, llEKGDirect = mbEkgDirect
, llEKGDirect = ekgDirect
}

startCapturingMetrics :: TraceOptions
Expand Down
53 changes: 21 additions & 32 deletions cardano-node/src/Cardano/Node/Run.hs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ import Network.Socket (AddrInfo, Socket)
import System.Directory (canonicalizePath, createDirectoryIfMissing,
makeAbsolute)
import System.Environment (lookupEnv)
import qualified System.Metrics as EKG

#ifdef UNIX
import System.Posix.Files
import System.Posix.Types (FileMode)
Expand Down Expand Up @@ -165,17 +167,18 @@ runNode cmdPc = do
Right res -> return res

-- New logging initialisation
let ekgServer' = ekgServer (llEKGDirect loggingLayer)
ekgStore <- EKG.newStore
loggerConfiguration <-
case getLast $ pncConfigFile cmdPc of
Just fileName -> NL.readConfiguration (unConfigPath fileName)
Nothing -> putTextLn "No configuration file name found!" >> exitFailure
baseTrace <- NL.standardTracer
nodeInfo <- prepareNodeInfo nc p loggerConfiguration now
forwardTrace <- withIOManager $ \iomgr -> NL.forwardTracer iomgr loggerConfiguration nodeInfo
mbEkgTrace <- case llEKGDirect loggingLayer of
Nothing -> pure Nothing
Just ekgDirect ->
liftM Just (NL.ekgTracer (Right (ekgServer ekgDirect)))
forwardSink <- withIOManager $ \iomgr ->
NL.initForwarding iomgr loggerConfiguration ekgStore nodeInfo
let forwardTrace = NL.forwardTracer forwardSink
ekgTrace <- NL.ekgTracer (Right ekgServer')
-- End new logging initialisation

!trace <- setupTrace loggingLayer
Expand Down Expand Up @@ -209,17 +212,19 @@ runNode cmdPc = do
(llEKGDirect loggingLayer)
p2pMode
-- Couldn't resolve it.
--tracers <- mkDispatchTracers
-- (Consensus.configBlock cfg)
-- (ncTraceConfig nc)
-- trace
-- nodeKernelData
-- (llEKGDirect loggingLayer)
-- baseTrace
-- forwardTrace
-- mbEkgTrace
-- loggerConfiguration
-- bi
{-
tracers <- mkDispatchTracers
(Consensus.configBlock cfg)
(ncTraceConfig nc)
trace
nodeKernelData
(Just (llEKGDirect loggingLayer))
baseTrace
forwardTrace
(Just ekgTrace)
loggerConfiguration
bi
-}
Async.withAsync (handlePeersListSimple trace nodeKernelData)
$ \_peerLogingThread ->
-- We ignore peer loging thread if it dies, but it will be killed
Expand All @@ -232,22 +237,6 @@ runNode cmdPc = do
case p of
SomeConsensusProtocol _ runP -> handleNodeWithTracers runP

initTraceDispatcher :: ... -> TraceDispatcher
initTraceDispatcher = do
forwardTrace <- withIOManager $ \iomgr -> NL.forwardTracer iomgr loggerConfiguration nodeInfo
mbEkgTrace <- case llEKGDirect loggingLayer of
Nothing -> pure Nothing
Just ekgDirect ->
liftM Just (NL.ekgTracer (Right (ekgServer ekgDirect)))
baseTrace <- NL.standardTracer

data TraceDispatcher
= TraceDispatcher
{ tdForwardTracer :: ForwardTracer
, tdEKGTracer :: Maybe EKGTracer
, tdStdoutTracer :: StdoutTracer
}

logTracingVerbosity :: NodeConfiguration -> Tracer IO String -> IO ()
logTracingVerbosity nc tracer =
case ncTraceConfig nc of
Expand Down
1 change: 1 addition & 0 deletions trace-dispatcher/src/Cardano/Logging.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import Cardano.Logging.FrequencyLimiter as X
import Cardano.Logging.Trace as X
import Cardano.Logging.Tracer.EKG as X
import Cardano.Logging.Tracer.Standard as X
import Cardano.Logging.Forwarding as X
import Cardano.Logging.Tracer.Forward as X
import Cardano.Logging.Types as X
import Cardano.Logging.Tracer.Composed as X
Expand Down
210 changes: 210 additions & 0 deletions trace-dispatcher/src/Cardano/Logging/Forwarding.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE PackageImports #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Cardano.Logging.Forwarding
(
initForwarding
) where

import Codec.CBOR.Term (Term)
import Control.Concurrent.Async (race_, wait, withAsync)
import Control.Monad.IO.Class

import "contra-tracer" Control.Tracer (contramap, stdoutTracer)
import qualified Data.ByteString.Lazy as LBS
import Data.Void (Void)
import Data.Word (Word16)

import Ouroboros.Network.Driver.Limits (ProtocolTimeLimits)
import Ouroboros.Network.ErrorPolicy (nullErrorPolicies)
import Ouroboros.Network.IOManager (IOManager)
import Ouroboros.Network.Mux (MiniProtocol (..),
MiniProtocolLimits (..), MiniProtocolNum (..),
MuxMode (..), OuroborosApplication (..),
RunMiniProtocol (..), miniProtocolLimits, miniProtocolNum,
miniProtocolRun)
import Ouroboros.Network.Protocol.Handshake.Codec
(cborTermVersionDataCodec, noTimeLimitsHandshake)
import Ouroboros.Network.Protocol.Handshake.Type (Handshake)
import Ouroboros.Network.Protocol.Handshake.Unversioned
(UnversionedProtocol (..), UnversionedProtocolData (..),
unversionedHandshakeCodec, unversionedProtocolDataCodec)
import Ouroboros.Network.Protocol.Handshake.Version
(acceptableVersion, simpleSingletonVersions)
import Ouroboros.Network.Snocket (Snocket, localAddressFromPath,
localSnocket)
import Ouroboros.Network.Socket (AcceptedConnectionsLimit (..),
SomeResponderApplication (..), cleanNetworkMutableState,
connectToNode, newNetworkMutableState, nullNetworkConnectTracers,
nullNetworkServerTracers, withServerNode)

import qualified System.Metrics as EKG
import qualified System.Metrics.Configuration as EKGF
import System.Metrics.Network.Forwarder
import qualified Trace.Forward.Configuration as TF
import Trace.Forward.Network.Forwarder
import Trace.Forward.Protocol.Type (NodeInfo (..))
import Trace.Forward.Utils

import Cardano.Logging.Types


initForwarding :: forall m. (MonadIO m)
=> IOManager
-> TraceConfig
-> EKG.Store
-> NodeInfo
-> m (ForwardSink TraceObject)
initForwarding iomgr config ekgstore nodeInfo = liftIO $ do
forwardSink <- initForwardSink tfConfig
launchForwarders iomgr config ekgstore ekgConfig tfConfig forwardSink
pure forwardSink
where
LocalSocket p = tofAddress $ tcForwarder config

ekgConfig :: EKGF.ForwarderConfiguration
ekgConfig =
EKGF.ForwarderConfiguration
{ EKGF.forwarderTracer = contramap show stdoutTracer
, EKGF.acceptorEndpoint = EKGF.LocalPipe p
, EKGF.reConnectFrequency = 1.0
, EKGF.actionOnRequest = const $ pure ()
}

tfConfig :: TF.ForwarderConfiguration TraceObject
tfConfig =
TF.ForwarderConfiguration
{ TF.forwarderTracer = contramap show stdoutTracer
, TF.acceptorEndpoint = TF.LocalPipe p
, TF.getNodeInfo = pure nodeInfo
, TF.disconnectedQueueSize = 200000
, TF.connectedQueueSize = 2000
}

launchForwarders
:: IOManager
-> TraceConfig
-> EKG.Store
-> EKGF.ForwarderConfiguration
-> TF.ForwarderConfiguration TraceObject
-> ForwardSink TraceObject
-> IO ()
launchForwarders iomgr TraceConfig{tcForwarder} store ekgConfig tfConfig sink = flip
withAsync
wait
$ runActionInLoop
(launchForwardersViaLocalSocket iomgr tcForwarder (ekgConfig, tfConfig) sink store)
(TF.LocalPipe p)
1
where
LocalSocket p = tofAddress tcForwarder

launchForwardersViaLocalSocket
:: IOManager
-> TraceOptionForwarder
-> (EKGF.ForwarderConfiguration, TF.ForwarderConfiguration TraceObject)
-> ForwardSink TraceObject
-> EKG.Store
-> IO ()
launchForwardersViaLocalSocket iomgr
TraceOptionForwarder {tofAddress=(LocalSocket p), tofMode=Initiator}
configs sink store =
doConnectToAcceptor (localSnocket iomgr) (localAddressFromPath p)
noTimeLimitsHandshake configs sink store
launchForwardersViaLocalSocket iomgr
TraceOptionForwarder {tofAddress=(LocalSocket p), tofMode=Responder}
configs sink store =
doListenToAcceptor (localSnocket iomgr) (localAddressFromPath p)
noTimeLimitsHandshake configs sink store

doConnectToAcceptor
:: Snocket IO fd addr
-> addr
-> ProtocolTimeLimits (Handshake UnversionedProtocol Term)
-> (EKGF.ForwarderConfiguration, TF.ForwarderConfiguration TraceObject)
-> ForwardSink TraceObject
-> EKG.Store
-> IO ()
doConnectToAcceptor snocket address timeLimits (ekgConfig, tfConfig) sink store = do
connectToNode
snocket
unversionedHandshakeCodec
timeLimits
(cborTermVersionDataCodec unversionedProtocolDataCodec)
nullNetworkConnectTracers
acceptableVersion
(simpleSingletonVersions
UnversionedProtocol
UnversionedProtocolData
(forwarderApp [ (forwardEKGMetrics ekgConfig store, 1)
, (forwardTraceObjects tfConfig sink, 2)
]
)
)
Nothing
address
where
forwarderApp
:: [(RunMiniProtocol 'InitiatorMode LBS.ByteString IO () Void, Word16)]
-> OuroborosApplication 'InitiatorMode addr LBS.ByteString IO () Void
forwarderApp protocols =
OuroborosApplication $ \_connectionId _shouldStopSTM ->
[ MiniProtocol
{ miniProtocolNum = MiniProtocolNum num
, miniProtocolLimits = MiniProtocolLimits { maximumIngressQueue = maxBound }
, miniProtocolRun = prot
}
| (prot, num) <- protocols
]

doListenToAcceptor
:: Ord addr
=> Snocket IO fd addr
-> addr
-> ProtocolTimeLimits (Handshake UnversionedProtocol Term)
-> (EKGF.ForwarderConfiguration, TF.ForwarderConfiguration TraceObject)
-> ForwardSink TraceObject
-> EKG.Store
-> IO ()
doListenToAcceptor snocket address timeLimits (ekgConfig, tfConfig) sink store = do
networkState <- newNetworkMutableState
race_ (cleanNetworkMutableState networkState)
$ withServerNode
snocket
nullNetworkServerTracers
networkState
(AcceptedConnectionsLimit maxBound maxBound 0)
address
unversionedHandshakeCodec
timeLimits
(cborTermVersionDataCodec unversionedProtocolDataCodec)
acceptableVersion
(simpleSingletonVersions
UnversionedProtocol
UnversionedProtocolData
(SomeResponderApplication $
forwarderApp [ (forwardEKGMetricsResp ekgConfig store, 1)
, (forwardTraceObjectsResp tfConfig sink, 2)
]
)
)
nullErrorPolicies
$ \_ serverAsync ->
wait serverAsync -- Block until async exception.
where
forwarderApp
:: [(RunMiniProtocol 'ResponderMode LBS.ByteString IO Void (), Word16)]
-> OuroborosApplication 'ResponderMode addr LBS.ByteString IO Void ()
forwarderApp protocols =
OuroborosApplication $ \_connectionId _shouldStopSTM ->
[ MiniProtocol
{ miniProtocolNum = MiniProtocolNum num
, miniProtocolLimits = MiniProtocolLimits { maximumIngressQueue = maxBound }
, miniProtocolRun = prot
}
| (prot, num) <- protocols
]
Loading

0 comments on commit dec6fd5

Please sign in to comment.