Skip to content

Commit

Permalink
diffusion: simplify data diffusion
Browse files Browse the repository at this point in the history
It is much nicer to use recodrs than functions with multiple (>10)
arguments.  This makes it easier to maintian the code base.

The modules:

* Ouroboros.Network.Diffusion
* Ouroboros.Network.Diffusion.P2P
* Ouroboros.Network.Diffusion.NonP2P

are now public, and should be imported qualified.
  • Loading branch information
coot committed Jul 20, 2021
1 parent f9ed189 commit be7fa1b
Show file tree
Hide file tree
Showing 8 changed files with 420 additions and 674 deletions.
170 changes: 104 additions & 66 deletions ouroboros-consensus/src/Ouroboros/Consensus/Node.hs
@@ -1,9 +1,13 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MonadComprehensions #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE Rank2Types #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE TypeApplications #-}
-- | Run the whole Node
--
Expand Down Expand Up @@ -62,17 +66,17 @@ import System.Random (StdGen, newStdGen, randomIO, randomRIO)
import Control.Monad.Class.MonadTime (MonadTime)
import Control.Monad.Class.MonadTimer (MonadTimer)
import Ouroboros.Network.BlockFetch (BlockFetchConfiguration (..))
import Ouroboros.Network.Diffusion (DiffusionApplications,
DiffusionArguments, DiffusionTracers, daDiffusionMode,
getDiffusionArguments, mkDiffusionApplicationsNonP2P,
mkDiffusionApplicationsP2P, runDataDiffusion)
import qualified Ouroboros.Network.Diffusion as Diffusion
-- import qualified Ouroboros.Network.Diffusion.Common as Diffusion
import qualified Ouroboros.Network.Diffusion.NonP2P as NonP2P
import qualified Ouroboros.Network.Diffusion.P2P as P2P
import Ouroboros.Network.Magic
import Ouroboros.Network.NodeToClient (ConnectionId, LocalAddress,
NodeToClientVersionData (..), combineVersions,
LocalSocket, NodeToClientVersionData (..), combineVersions,
simpleSingletonVersions)
import Ouroboros.Network.NodeToNode (DiffusionMode,
MiniProtocolParameters, NodeToNodeVersionData (..),
RemoteAddress, blockFetchPipeliningMax,
RemoteAddress, Socket, blockFetchPipeliningMax,
defaultMiniProtocolParameters)
import Ouroboros.Network.PeerSelection.LedgerPeers
(LedgerPeersConsensusInterface (..))
Expand Down Expand Up @@ -122,7 +126,7 @@ import Ouroboros.Consensus.Storage.VolatileDB

-- | Arguments expected from any invocation of 'runWith', whether by deployed
-- code, tests, etc.
data RunNodeArgs m addrNTN addrNTC blk = RunNodeArgs {
data RunNodeArgs m addrNTN addrNTC blk (p2p :: Diffusion.P2P) = RunNodeArgs {
-- | Consensus tracers
rnTraceConsensus :: Tracers m (ConnectionId addrNTN) (ConnectionId addrNTC) blk

Expand All @@ -144,7 +148,7 @@ data RunNodeArgs m addrNTN addrNTC blk = RunNodeArgs {
-> m ()

-- | Network P2P Mode switch
, rnEnableP2P :: NetworkP2PMode
, rnEnableP2P :: NetworkP2PMode p2p
}

-- | Arguments that usually only tests /directly/ specify.
Expand All @@ -153,7 +157,9 @@ data RunNodeArgs m addrNTN addrNTC blk = RunNodeArgs {
-- 'runWith'. The @cardano-node@, for example, instead calls the 'run'
-- abbreviation, which uses 'stdLowLevelRunNodeArgsIO' to indirectly specify
-- these low-level values from the higher-level 'StdRunNodeArgs'.
data LowLevelRunNodeArgs m addrNTN addrNTC versionDataNTN versionDataNTC blk = LowLevelRunNodeArgs {
data LowLevelRunNodeArgs m addrNTN addrNTC versionDataNTN versionDataNTC blk
(p2p :: Diffusion.P2P) =
LowLevelRunNodeArgs {

-- | How to manage the clean-shutdown marker on disk
llrnWithCheckedDB :: forall a. (LastShutDownWasClean -> m a) -> m a
Expand Down Expand Up @@ -190,12 +196,11 @@ data LowLevelRunNodeArgs m addrNTN addrNTC versionDataNTN versionDataNTC blk = L
-- 'run' will not return before this does.
, llrnRunDataDiffusion ::
ResourceRegistry m
-> DiffusionApplications
addrNTN
addrNTC
versionDataNTN
versionDataNTC
-> Diffusion.Applications
addrNTN NodeToNodeVersion versionDataNTN
addrNTC NodeToClientVersion versionDataNTC
m
-> Diffusion.ExtraApplications p2p addrNTN m
-> m ()

, llrnVersionDataNTC :: versionDataNTC
Expand All @@ -214,15 +219,18 @@ data LowLevelRunNodeArgs m addrNTN addrNTC versionDataNTN versionDataNTC blk = L

-- | P2P Switch
--
data NetworkP2PMode = EnabledP2PMode
| DisabledP2PMode
deriving (Eq, Show)
data NetworkP2PMode (p2p :: Diffusion.P2P) where
EnabledP2PMode :: NetworkP2PMode 'Diffusion.P2P
DisabledP2PMode :: NetworkP2PMode 'Diffusion.NonP2P

deriving instance Eq (NetworkP2PMode p2p)
deriving instance Show (NetworkP2PMode p2p)

-- | Combination of 'runWith' and 'stdLowLevelRunArgsIO'
run :: forall blk.
run :: forall blk p2p.
RunNode blk
=> RunNodeArgs IO RemoteAddress LocalAddress blk
-> StdRunNodeArgs IO blk
=> RunNodeArgs IO RemoteAddress LocalAddress blk p2p
-> StdRunNodeArgs IO blk p2p
-> IO ()
run args stdArgs = stdLowLevelRunNodeArgsIO args stdArgs >>= runWith args

Expand All @@ -232,13 +240,13 @@ run args stdArgs = stdLowLevelRunNodeArgsIO args stdArgs >>= runWith args
-- network layer.
--
-- This function runs forever unless an exception is thrown.
runWith :: forall m addrNTN addrNTC versionDataNTN versionDataNTC blk.
runWith :: forall m addrNTN addrNTC versionDataNTN versionDataNTC blk p2p.
( RunNode blk
, IOLike m, MonadTime m, MonadTimer m
, Hashable addrNTN, Ord addrNTN, Typeable addrNTN
)
=> RunNodeArgs m addrNTN addrNTC blk
-> LowLevelRunNodeArgs m addrNTN addrNTC versionDataNTN versionDataNTC blk
=> RunNodeArgs m addrNTN addrNTC blk p2p
-> LowLevelRunNodeArgs m addrNTN addrNTC versionDataNTN versionDataNTC blk p2p
-> m ()
runWith RunNodeArgs{..} LowLevelRunNodeArgs{..} =

Expand Down Expand Up @@ -312,7 +320,7 @@ runWith RunNodeArgs{..} LowLevelRunNodeArgs{..} =
peerMetrics <- newPeerMetric
let ntnApps = mkNodeToNodeApps nodeKernelArgs nodeKernel peerMetrics
ntcApps = mkNodeToClientApps nodeKernelArgs nodeKernel
diffusionApplications = mkDiffusionApplications
(apps, appsExtra) = mkDiffusionApplications
rnEnableP2P
(miniProtocolParameters nodeKernelArgs)
ntnApps
Expand All @@ -321,7 +329,7 @@ runWith RunNodeArgs{..} LowLevelRunNodeArgs{..} =
peerMetrics
btime

llrnRunDataDiffusion registry diffusionApplications
llrnRunDataDiffusion registry apps appsExtra
where
ProtocolInfo
{ pInfoConfig = cfg
Expand Down Expand Up @@ -368,7 +376,7 @@ runWith RunNodeArgs{..} LowLevelRunNodeArgs{..} =
(NTC.mkHandlers nodeKernelArgs nodeKernel)

mkDiffusionApplications
:: NetworkP2PMode
:: NetworkP2PMode p2p
-> MiniProtocolParameters
-> ( BlockNodeToNodeVersion blk
-> NTN.Apps
Expand All @@ -389,12 +397,12 @@ runWith RunNodeArgs{..} LowLevelRunNodeArgs{..} =
-> NodeKernel m remotePeer localPeer blk
-> PeerMetrics m ntnAddr
-> BlockchainTime m
-> DiffusionApplications
ntnAddr
ntcAddr
versionDataNTN
versionDataNTC
m
-> ( Diffusion.Applications
ntnAddr NodeToNodeVersion versionDataNTN
ntcAddr NodeToClientVersion versionDataNTC
m
, Diffusion.ExtraApplications p2p ntnAddr m
)
mkDiffusionApplications
enP2P
miniProtocolParams
Expand All @@ -405,23 +413,33 @@ runWith RunNodeArgs{..} LowLevelRunNodeArgs{..} =
btime =
case enP2P of
EnabledP2PMode ->
mkDiffusionApplicationsP2P
initiator
initiatorAndResponder
responder
ledgerPeersConsensusInterface
miniProtocolParams
(consensusRethrowPolicy (Proxy @blk))
localRethrowPolicy
peerMetrics
(getFetchMode (getChainDB kernel) btime)
( Diffusion.Applications {
Diffusion.daApplicationInitiatorMode = initiator,
Diffusion.daApplicationInitiatorResponderMode = initiatorAndResponder,
Diffusion.daLocalResponderApplication = responder,
Diffusion.daLedgerPeersCtx = ledgerPeersConsensusInterface
}
, Diffusion.P2PApplications
P2P.ApplicationsExtra {
P2P.daMiniProtocolParameters = miniProtocolParams,
P2P.daRethrowPolicy = consensusRethrowPolicy (Proxy @blk),
P2P.daLocalRethrowPolicy = localRethrowPolicy,
P2P.daPeerMetrics = peerMetrics,
P2P.daBlockFetchMode = getFetchMode (getChainDB kernel) btime
}
)
DisabledP2PMode ->
mkDiffusionApplicationsNonP2P
initiator
initiatorAndResponder
responder
ledgerPeersConsensusInterface
(consensusErrorPolicy (Proxy @blk))
( Diffusion.Applications {
Diffusion.daApplicationInitiatorMode = initiator,
Diffusion.daApplicationInitiatorResponderMode = initiatorAndResponder,
Diffusion.daLocalResponderApplication = responder,
Diffusion.daLedgerPeersCtx = ledgerPeersConsensusInterface
}
, Diffusion.NonP2PApplications
NonP2P.ApplicationsExtra {
NonP2P.daErrorPolicies = consensusErrorPolicy (Proxy @blk)
}
)
where
initiator =
combineVersions
Expand Down Expand Up @@ -449,6 +467,8 @@ runWith RunNodeArgs{..} LowLevelRunNodeArgs{..} =
(NTC.responder version $ ntcApps blockVersion version)
| (version, blockVersion) <- Map.toList llrnNodeToClientVersions
]

ledgerPeersConsensusInterface :: LedgerPeersConsensusInterface m
ledgerPeersConsensusInterface =
LedgerPeersConsensusInterface
(getPeersFromCurrentLedgerAfterSlot kernel)
Expand Down Expand Up @@ -673,31 +693,44 @@ stdVersionDataNTC networkMagic = NodeToClientVersionData
}

stdRunDataDiffusion ::
DiffusionTracers
-> DiffusionArguments IO
-> DiffusionApplications
RemoteAddress
LocalAddress
NodeToNodeVersionData
NodeToClientVersionData
Diffusion.Tracers
RemoteAddress NodeToNodeVersion
LocalAddress NodeToClientVersion
IO
-> Diffusion.ExtraTracers p2p
-> Diffusion.Arguments
Socket RemoteAddress
LocalSocket LocalAddress
-> Diffusion.ExtraArguments p2p IO
-> Diffusion.Applications
RemoteAddress NodeToNodeVersion NodeToNodeVersionData
LocalAddress NodeToClientVersion NodeToClientVersionData
IO
-> Diffusion.ExtraApplications p2p RemoteAddress IO
-> IO ()
stdRunDataDiffusion = runDataDiffusion
stdRunDataDiffusion = Diffusion.run

-- | Higher-level arguments that can determine the 'LowLevelRunNodeArgs' under
-- some usual assumptions for realistic use cases such as in @cardano-node@.
--
-- See 'stdLowLevelRunNodeArgsIO'.
data StdRunNodeArgs m blk = StdRunNodeArgs
data StdRunNodeArgs m blk (p2p :: Diffusion.P2P) = StdRunNodeArgs
{ srnBfcMaxConcurrencyBulkSync :: Maybe Word
, srnBfcMaxConcurrencyDeadline :: Maybe Word
, srnChainDbValidateOverride :: Bool
-- ^ If @True@, validate the ChainDB on init no matter what
, srnSnapshotInterval :: SnapshotInterval
, srnDatabasePath :: FilePath
-- ^ Location of the DBs
, srnDiffusionArguments :: DiffusionArguments m
, srnDiffusionTracers :: DiffusionTracers
, srnDiffusionArguments :: Diffusion.Arguments
Socket RemoteAddress
LocalSocket LocalAddress
, srnDiffusionArgumentsExtra :: Diffusion.ExtraArguments p2p m
, srnDiffusionTracers :: Diffusion.Tracers
RemoteAddress NodeToNodeVersion
LocalAddress NodeToClientVersion
IO
, srnDiffusionTracersExtra :: Diffusion.ExtraTracers p2p
, srnEnableInDevelopmentVersions :: Bool
-- ^ If @False@, then the node will limit the negotiated NTN and NTC
-- versions to the latest " official " release (as chosen by Network and
Expand All @@ -709,16 +742,17 @@ data StdRunNodeArgs m blk = StdRunNodeArgs
-- | Conveniently packaged 'LowLevelRunNodeArgs' arguments from a standard
-- non-testing invocation.
stdLowLevelRunNodeArgsIO ::
forall blk. RunNode blk
=> RunNodeArgs IO RemoteAddress LocalAddress blk
-> StdRunNodeArgs IO blk
forall blk p2p. RunNode blk
=> RunNodeArgs IO RemoteAddress LocalAddress blk p2p
-> StdRunNodeArgs IO blk p2p
-> IO (LowLevelRunNodeArgs
IO
RemoteAddress
LocalAddress
NodeToNodeVersionData
NodeToClientVersionData
blk)
blk
p2p)
stdLowLevelRunNodeArgsIO RunNodeArgs{ rnProtocolInfo } StdRunNodeArgs{..} = do
llrnBfcSalt <- stdBfcSaltIO
llrnKeepAliveRng <- stdKeepAliveRngIO
Expand All @@ -732,14 +766,18 @@ stdLowLevelRunNodeArgsIO RunNodeArgs{ rnProtocolInfo } StdRunNodeArgs{..} = do
, llrnCustomiseChainDbArgs = id
, llrnCustomiseNodeKernelArgs
, llrnRunDataDiffusion =
\_reg apps ->
stdRunDataDiffusion srnDiffusionTracers srnDiffusionArguments apps
\_reg apps extraApps ->
stdRunDataDiffusion srnDiffusionTracers
srnDiffusionTracersExtra
srnDiffusionArguments
srnDiffusionArgumentsExtra
apps extraApps
, llrnVersionDataNTC =
stdVersionDataNTC networkMagic
, llrnVersionDataNTN =
stdVersionDataNTN
networkMagic
(daDiffusionMode (getDiffusionArguments srnDiffusionArguments))
(Diffusion.daDiffusionMode srnDiffusionArguments)
, llrnNodeToNodeVersions =
limitToLatestReleasedVersion
fst
Expand Down
Expand Up @@ -641,7 +641,7 @@ backfillChunk chunkInfo chunk (NextRelativeSlot nextExpected) offset =
------------------------------------------------------------------------------}

(!) :: (HasCallStack, V.Unbox a) => Vector a -> Int -> a
v ! i
(!) v i
| 0 <= i, i < V.length v
= V.unsafeIndex v i
| otherwise
Expand Down
4 changes: 2 additions & 2 deletions ouroboros-network/ouroboros-network.cabal
Expand Up @@ -64,6 +64,8 @@ library
Ouroboros.Network.Counter
Ouroboros.Network.DeltaQ
Ouroboros.Network.Diffusion
Ouroboros.Network.Diffusion.P2P
Ouroboros.Network.Diffusion.NonP2P
Ouroboros.Network.Diffusion.Policies
Ouroboros.Network.KeepAlive
Ouroboros.Network.Magic
Expand Down Expand Up @@ -129,8 +131,6 @@ library
Ouroboros.Network.TxSubmission.Mempool.Reader
Ouroboros.Network.TxSubmission.Outbound
other-modules: Ouroboros.Network.Diffusion.Common
Ouroboros.Network.Diffusion.P2P
Ouroboros.Network.Diffusion.NonP2P
Ouroboros.Network.PeerSelection.Governor.ActivePeers
Ouroboros.Network.PeerSelection.Governor.EstablishedPeers
Ouroboros.Network.PeerSelection.Governor.KnownPeers
Expand Down

0 comments on commit be7fa1b

Please sign in to comment.