Skip to content

Commit

Permalink
Refactored Node.hs to fit new Diffusion interface.
Browse files Browse the repository at this point in the history
- Refactored imports explicit in ouroboros-consensus
- Made run functions return () instead of Void
- Added enableP2P flag to Node.hs
  • Loading branch information
bolt12 committed Jun 10, 2021
1 parent 7a6d590 commit 2a987d6
Showing 1 changed file with 148 additions and 71 deletions.
219 changes: 148 additions & 71 deletions ouroboros-consensus/src/Ouroboros/Consensus/Node.hs
Expand Up @@ -23,25 +23,24 @@ module Ouroboros.Consensus.Node (
, stdVersionDataNTC
, stdVersionDataNTN
, stdWithCheckedDB
-- ** P2P Switch
, NetworkP2PMode (..)
-- * Exposed by 'run' et al
, ChainDB.RelativeMountPoint (..)
, LowLevelRunNodeArgs (..)
, RunNodeArgs (..)
, RunNode
, Tracers
, Tracers' (..)
, ChainDB.TraceEvent (..)
, ChainDbArgs (..)
, ConnectionId (..)
, DiffusionArguments (..)
, DiffusionTracers (..)
, HardForkBlockchainTimeArgs (..)
, LastShutDownWasClean (..)
, LowLevelRunNodeArgs (..)
, MaxTxCapacityOverride (..)
, MempoolCapacityBytesOverride (..)
, NodeKernel (..)
, NodeKernelArgs (..)
, ProtocolInfo (..)
, RunNode
, RunNodeArgs (..)
, Tracers
, Tracers' (..)
-- * Internal helpers
, mkChainDbArgs
, mkNodeKernelArgs
Expand All @@ -57,22 +56,17 @@ import Data.Hashable (Hashable)
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import Data.Typeable (Typeable)
import Data.Void (Void)
import System.FilePath ((</>))
import System.Random (StdGen, newStdGen, randomIO, randomRIO)

import Control.Monad.Class.MonadTime (MonadTime)
import Control.Monad.Class.MonadTimer (MonadTimer)

import Ouroboros.Network.PeerSelection.LedgerPeers
( LedgerPeersConsensusInterface(..)
)
import Ouroboros.Network.BlockFetch (BlockFetchConfiguration (..))
import Ouroboros.Network.Diffusion
import Ouroboros.Network.Magic
import Ouroboros.Network.NodeToClient (LocalAddress,
NodeToClientVersionData (..))
import Ouroboros.Network.NodeToNode (DiffusionMode,
MiniProtocolParameters (..), NodeToNodeVersionData (..),
RemoteAddress, combineVersions,
defaultMiniProtocolParameters)
import Ouroboros.Network.PeerSelection.PeerMetric (PeerMetrics (..),
newPeerMetric, reportMetric)
import Ouroboros.Network.Protocol.Limits (shortWait)
Expand All @@ -85,6 +79,31 @@ import Ouroboros.Consensus.Fragment.InFuture (CheckInFuture,
ClockSkew)
import qualified Ouroboros.Consensus.Fragment.InFuture as InFuture
import Ouroboros.Consensus.Ledger.Extended (ExtLedgerState (..))
import Ouroboros.Network.NodeToClient
( LocalAddress
, NodeToClientVersionData(..)
, ConnectionId
, combineVersions
, simpleSingletonVersions
)
import Ouroboros.Network.NodeToNode
( RemoteAddress
, MiniProtocolParameters
, blockFetchPipeliningMax
, DiffusionMode
, NodeToNodeVersionData(..)
, defaultMiniProtocolParameters
)
import Ouroboros.Network.Diffusion
( DiffusionApplications
, DiffusionTracers
, DiffusionArguments
, daDiffusionMode
, runDataDiffusion
, mkDiffusionApplicationsP2P
, mkDiffusionApplicationsNonP2P
, getDiffusionArguments
)
import qualified Ouroboros.Consensus.Network.NodeToClient as NTC
import qualified Ouroboros.Consensus.Network.NodeToNode as NTN
import Ouroboros.Consensus.Node.DbLock
Expand All @@ -94,6 +113,7 @@ import Ouroboros.Consensus.Node.NetworkProtocolVersion
import Ouroboros.Consensus.Node.ProtocolInfo
import Ouroboros.Consensus.Node.Recovery
import Ouroboros.Consensus.Node.RethrowPolicy
import Ouroboros.Consensus.Node.ErrorPolicy
import Ouroboros.Consensus.Node.Run
import Ouroboros.Consensus.Node.Tracers
import Ouroboros.Consensus.NodeKernel
Expand Down Expand Up @@ -184,10 +204,12 @@ data LowLevelRunNodeArgs m addrNTN addrNTC versionDataNTN versionDataNTC blk = L
, llrnRunDataDiffusion ::
ResourceRegistry m
-> DiffusionApplications
addrNTN addrNTC
versionDataNTN versionDataNTC
addrNTN
addrNTC
versionDataNTN
versionDataNTC
m
-> m Void
-> m ()

, llrnVersionDataNTC :: versionDataNTC

Expand All @@ -203,13 +225,21 @@ data LowLevelRunNodeArgs m addrNTN addrNTC versionDataNTN versionDataNTC blk = L
, llrnMaxClockSkew :: ClockSkew
}

-- | P2P Switch
--
data NetworkP2PMode = EnabledP2PMode
| DisabledP2PMode
deriving (Eq, Show)

-- | Combination of 'runWith' and 'stdLowLevelRunArgsIO'
run :: forall blk.
RunNode blk
=> RunNodeArgs IO RemoteAddress LocalAddress blk
=> NetworkP2PMode
-> RunNodeArgs IO RemoteAddress LocalAddress blk
-> StdRunNodeArgs IO blk
-> IO Void
run args stdArgs = stdLowLevelRunNodeArgsIO args stdArgs >>= runWith args
-> IO ()
run enableP2P args stdArgs =
stdLowLevelRunNodeArgsIO args stdArgs >>= runWith enableP2P args

-- | Start a node.
--
Expand All @@ -222,10 +252,11 @@ runWith :: forall m addrNTN addrNTC versionDataNTN versionDataNTC blk.
, IOLike m, MonadTime m, MonadTimer m
, Hashable addrNTN, Ord addrNTN, Typeable addrNTN
)
=> RunNodeArgs m addrNTN addrNTC blk
=> NetworkP2PMode
-> RunNodeArgs m addrNTN addrNTC blk
-> LowLevelRunNodeArgs m addrNTN addrNTC versionDataNTN versionDataNTC blk
-> m Void
runWith RunNodeArgs{..} LowLevelRunNodeArgs{..} =
-> m ()
runWith enableP2P RunNodeArgs{..} LowLevelRunNodeArgs{..} =

llrnWithCheckedDB $ \(LastShutDownWasClean lastShutDownWasClean) ->
withRegistry $ \registry -> do
Expand Down Expand Up @@ -298,6 +329,7 @@ runWith RunNodeArgs{..} LowLevelRunNodeArgs{..} =
let ntnApps = mkNodeToNodeApps nodeKernelArgs nodeKernel peerMetrics
ntcApps = mkNodeToClientApps nodeKernelArgs nodeKernel
diffusionApplications = mkDiffusionApplications
enableP2P
(miniProtocolParameters nodeKernelArgs)
ntnApps
ntcApps
Expand All @@ -321,7 +353,14 @@ runWith RunNodeArgs{..} LowLevelRunNodeArgs{..} =
-> NodeKernel m (ConnectionId addrNTN) (ConnectionId addrNTC) blk
-> PeerMetrics m addrNTN
-> BlockNodeToNodeVersion blk
-> NTN.Apps m (ConnectionId addrNTN) ByteString ByteString ByteString ByteString ByteString ()
-> NTN.Apps m
(ConnectionId addrNTN)
ByteString
ByteString
ByteString
ByteString
ByteString
()
mkNodeToNodeApps nodeKernelArgs nodeKernel peerMetrics version =
NTN.mkApps
nodeKernel
Expand All @@ -345,51 +384,87 @@ runWith RunNodeArgs{..} LowLevelRunNodeArgs{..} =
(NTC.mkHandlers nodeKernelArgs nodeKernel)

mkDiffusionApplications
:: MiniProtocolParameters
-> ( BlockNodeToNodeVersion blk
-> NTN.Apps m (ConnectionId addrNTN) ByteString ByteString ByteString ByteString ByteString ()
)
-> ( BlockNodeToClientVersion blk
:: NetworkP2PMode
-> MiniProtocolParameters
-> (BlockNodeToNodeVersion blk
-> NTN.Apps
m
(ConnectionId ntnAddr)
ByteString
ByteString
ByteString
ByteString
ByteString
())
-> (BlockNodeToClientVersion blk
-> NodeToClientVersion
-> NTC.Apps m (ConnectionId addrNTC) ByteString ByteString ByteString ()
)
-> NodeKernel m (ConnectionId addrNTN) (ConnectionId addrNTC) blk
-> PeerMetrics m addrNTN
-> NTC.Apps
m (ConnectionId ntcAddr) ByteString ByteString ByteString ())
-> NodeKernel m remotePeer localPeer blk
-> PeerMetrics m ntnAddr
-> BlockchainTime m
-> DiffusionApplications
addrNTN addrNTC
versionDataNTN versionDataNTC
ntnAddr
ntcAddr
versionDataNTN
versionDataNTC
m
mkDiffusionApplications miniProtocolParams ntnApps ntcApps kernel peerMetrics btime =
DiffusionApplications {
daApplicationInitiatorMode = combineVersions
[ simpleSingletonVersions
version
llrnVersionDataNTN
(NTN.initiator miniProtocolParams version $ ntnApps blockVersion)
| (version, blockVersion) <- Map.toList llrnNodeToNodeVersions
]
, daApplicationInitiatorResponderMode = combineVersions
[ simpleSingletonVersions
version
llrnVersionDataNTN
(NTN.initiatorAndResponder miniProtocolParams version $ ntnApps blockVersion)
| (version, blockVersion) <- Map.toList llrnNodeToNodeVersions
]
, daLocalResponderApplication = combineVersions [
simpleSingletonVersions
version
llrnVersionDataNTC
(NTC.responder version $ ntcApps blockVersion version)
| (version, blockVersion) <- Map.toList llrnNodeToClientVersions
]
, daMiniProtocolParameters = miniProtocolParams
, daRethrowPolicy = consensusRethrowPolicy (Proxy @blk)
, daLocalRethrowPolicy = mempty
, daLedgerPeersCtx = LedgerPeersConsensusInterface (getPeersFromCurrentLedgerAfterSlot kernel)
, daPeerMetrics = peerMetrics
, daBlockFetchMode = getFetchMode (getChainDB kernel) btime
}
mkDiffusionApplications
enP2P
miniProtocolParams
ntnApps
ntcApps
kernel
peerMetrics
btime =
case enP2P of
EnabledP2PMode ->
mkDiffusionApplicationsP2P
initiator
initiatorAndResponder
responder
ledgerPeersConsensusInterface
miniProtocolParams
(consensusRethrowPolicy (Proxy @blk))
mempty
peerMetrics
(getFetchMode (getChainDB kernel) btime)
DisabledP2PMode ->
mkDiffusionApplicationsNonP2P
initiator
initiatorAndResponder
responder
ledgerPeersConsensusInterface
(consensusErrorPolicy (Proxy @blk))
where
initiator =
combineVersions
[ simpleSingletonVersions
version
llrnVersionDataNTN
(NTN.initiator miniProtocolParams version $ ntnApps blockVersion)
| (version, blockVersion) <- Map.toList llrnNodeToNodeVersions
]
initiatorAndResponder =
combineVersions
[ simpleSingletonVersions
version
llrnVersionDataNTN
(NTN.initiatorAndResponder miniProtocolParams version
$ ntnApps blockVersion)
| (version, blockVersion) <- Map.toList llrnNodeToNodeVersions
]
responder =
combineVersions
[ simpleSingletonVersions
version
llrnVersionDataNTC
(NTC.responder version $ ntcApps blockVersion version)
| (version, blockVersion) <- Map.toList llrnNodeToClientVersions
]
ledgerPeersConsensusInterface =
LedgerPeersConsensusInterface (getPeersFromCurrentLedgerAfterSlot kernel)


-- | Did the ChainDB already have existing clean-shutdown marker on disk?
newtype LastShutDownWasClean = LastShutDownWasClean Bool
Expand Down Expand Up @@ -611,10 +686,12 @@ stdRunDataDiffusion ::
DiffusionTracers
-> DiffusionArguments IO
-> DiffusionApplications
RemoteAddress LocalAddress
NodeToNodeVersionData NodeToClientVersionData
RemoteAddress
LocalAddress
NodeToNodeVersionData
NodeToClientVersionData
IO
-> IO Void
-> IO ()
stdRunDataDiffusion = runDataDiffusion

-- | Higher-level arguments that can determine the 'LowLevelRunNodeArgs' under
Expand Down Expand Up @@ -672,7 +749,7 @@ stdLowLevelRunNodeArgsIO RunNodeArgs{ rnProtocolInfo } StdRunNodeArgs{..} = do
, llrnVersionDataNTN =
stdVersionDataNTN
networkMagic
(daDiffusionMode srnDiffusionArguments)
(daDiffusionMode (getDiffusionArguments srnDiffusionArguments))
, llrnNodeToNodeVersions =
limitToLatestReleasedVersion
fst
Expand Down

0 comments on commit 2a987d6

Please sign in to comment.