Skip to content

Commit

Permalink
Tie the keep-alive protocol to the PeerGSV state
Browse files Browse the repository at this point in the history
  • Loading branch information
karknu committed Jul 1, 2020
1 parent ceafa31 commit d6b975c
Show file tree
Hide file tree
Showing 12 changed files with 269 additions and 47 deletions.
Expand Up @@ -63,6 +63,7 @@ import Ouroboros.Network.Point (WithOrigin (..))

import qualified Ouroboros.Network.BlockFetch.Client as BFClient
import Ouroboros.Network.NodeToNode (MiniProtocolParameters (..))
import Ouroboros.Network.Protocol.KeepAlive.Type
import Ouroboros.Network.Protocol.TxSubmission.Type
import qualified Ouroboros.Network.TxSubmission.Inbound as TxInbound
import qualified Ouroboros.Network.TxSubmission.Outbound as TxOutbound
Expand Down Expand Up @@ -901,6 +902,7 @@ runThreadNetwork systemTime ThreadNetworkArgs
Lazy.ByteString
Lazy.ByteString
(AnyMessage (TxSubmission (GenTxId blk) (GenTx blk)))
(AnyMessage KeepAlive)
customNodeToNodeCodecs cfg = NTN.Codecs
{ cChainSyncCodec =
mapFailureCodec (CodecBytesFailure "ChainSync") $
Expand All @@ -917,6 +919,9 @@ runThreadNetwork systemTime ThreadNetworkArgs
, cTxSubmissionCodec =
mapFailureCodec CodecIdFailure $
NTN.cTxSubmissionCodec NTN.identityCodecs
, cKeepAliveCodec =
mapFailureCodec CodecIdFailure $
NTN.cKeepAliveCodec NTN.identityCodecs
}
where
binaryProtocolCodecs =
Expand Down Expand Up @@ -1098,6 +1103,9 @@ directedEdgeInner edgeStatusVar
, miniProtocol
NTN.aTxSubmissionClient
NTN.aTxSubmissionServer
, miniProtocol
NTN.aKeepAliveClient
NTN.aKeepAliveServer
]
where
getApp v = readTVar v >>= \case
Expand Down Expand Up @@ -1347,6 +1355,7 @@ type LimitedApp' m peer blk =
Lazy.ByteString
Lazy.ByteString
(AnyMessage (TxSubmission (GenTxId blk) (GenTx blk)))
(AnyMessage KeepAlive)
()

{-------------------------------------------------------------------------------
Expand Down
93 changes: 83 additions & 10 deletions ouroboros-consensus/src/Ouroboros/Consensus/Network/NodeToNode.hs
Expand Up @@ -34,6 +34,7 @@ import Codec.Serialise (Serialise)
import Control.Monad.Class.MonadTimer (MonadTimer)
import Control.Tracer
import Data.ByteString.Lazy (ByteString)
import qualified Data.Map.Strict as M
import Data.Proxy (Proxy (..))
import Data.Void (Void)

Expand All @@ -44,7 +45,9 @@ import Ouroboros.Network.BlockFetch.Client (BlockFetchClient,
blockFetchClient)
import Ouroboros.Network.Channel
import Ouroboros.Network.Codec
import Ouroboros.Network.DeltaQ
import Ouroboros.Network.Driver
import Ouroboros.Network.KeepAlive
import Ouroboros.Network.Mux
import Ouroboros.Network.NodeToNode
import Ouroboros.Network.Protocol.BlockFetch.Codec
Expand All @@ -56,6 +59,10 @@ import Ouroboros.Network.Protocol.ChainSync.Codec
import Ouroboros.Network.Protocol.ChainSync.PipelineDecision
import Ouroboros.Network.Protocol.ChainSync.Server
import Ouroboros.Network.Protocol.ChainSync.Type
import Ouroboros.Network.Protocol.KeepAlive.Client
import Ouroboros.Network.Protocol.KeepAlive.Codec
import Ouroboros.Network.Protocol.KeepAlive.Server
import Ouroboros.Network.Protocol.KeepAlive.Type
import Ouroboros.Network.Protocol.TxSubmission.Client
import Ouroboros.Network.Protocol.TxSubmission.Codec
import Ouroboros.Network.Protocol.TxSubmission.Server
Expand Down Expand Up @@ -120,17 +127,31 @@ data Handlers m peer blk = Handlers {
:: BlockNodeToNodeVersion blk
-> peer
-> TxSubmissionServerPipelined (GenTxId blk) (GenTx blk) m ()

, hKeepAliveClient
:: BlockNodeToNodeVersion blk
-> peer
-> (StrictTVar m (M.Map peer PeerGSV))
-> KeepAliveInterval
-> StrictTVar m (Maybe Time)
-> KeepAliveClient m ()
, hKeepAliveServer
:: BlockNodeToNodeVersion blk
-> peer
-> KeepAliveServer m ()
}

mkHandlers
:: forall m blk remotePeer localPeer.
( IOLike m
, MonadTimer m
, LedgerSupportsMempool blk
, HasTxId (GenTx blk)
, LedgerSupportsProtocol blk
, Serialise (HeaderHash blk)
, ReconstructNestedCtxt Header blk
, TranslateNetworkProtocolVersion blk
, Ord remotePeer
)
=> NodeArgs m remotePeer localPeer blk
-> NodeKernel m remotePeer localPeer blk
Expand Down Expand Up @@ -172,27 +193,30 @@ mkHandlers
(getMempoolReader getMempool)
(getMempoolWriter getMempool)
(nodeToNodeProtocolVersion (Proxy :: Proxy blk) version)
, hKeepAliveClient = \_version -> keepAliveClient (Node.keepAliveClientTracer tracers)
, hKeepAliveServer = \_version _peer -> keepAliveServer
}

{-------------------------------------------------------------------------------
Codecs
-------------------------------------------------------------------------------}

-- | Node-to-node protocol codecs needed to run 'Handlers'.
data Codecs blk e m bCS bSCS bBF bSBF bTX = Codecs {
data Codecs blk e m bCS bSCS bBF bSBF bTX bKA = Codecs {
cChainSyncCodec :: Codec (ChainSync (Header blk) (Tip blk)) e m bCS
, cChainSyncCodecSerialised :: Codec (ChainSync (SerialisedHeader blk) (Tip blk)) e m bSCS
, cBlockFetchCodec :: Codec (BlockFetch blk) e m bBF
, cBlockFetchCodecSerialised :: Codec (BlockFetch (Serialised blk)) e m bSBF
, cTxSubmissionCodec :: Codec (TxSubmission (GenTxId blk) (GenTx blk)) e m bTX
, cKeepAliveCodec :: Codec KeepAlive e m bKA
}

-- | Protocol codecs for the node-to-node protocols
defaultCodecs :: forall m blk. (IOLike m, SerialiseNodeToNodeConstraints blk)
=> CodecConfig blk
-> BlockNodeToNodeVersion blk
-> Codecs blk DeserialiseFailure m
ByteString ByteString ByteString ByteString ByteString
ByteString ByteString ByteString ByteString ByteString ByteString
defaultCodecs ccfg version = Codecs {
cChainSyncCodec =
codecChainSync
Expand Down Expand Up @@ -232,6 +256,9 @@ defaultCodecs ccfg version = Codecs {
dec
enc
dec
, cKeepAliveCodec =
codecKeepAlive

}
where
p :: Proxy blk
Expand All @@ -251,12 +278,14 @@ identityCodecs :: Monad m
(AnyMessage (BlockFetch blk))
(AnyMessage (BlockFetch (Serialised blk)))
(AnyMessage (TxSubmission (GenTxId blk) (GenTx blk)))
(AnyMessage KeepAlive)
identityCodecs = Codecs {
cChainSyncCodec = codecChainSyncId
, cChainSyncCodecSerialised = codecChainSyncId
, cBlockFetchCodec = codecBlockFetchId
, cBlockFetchCodecSerialised = codecBlockFetchId
, cTxSubmissionCodec = codecTxSubmissionId
, cKeepAliveCodec = codecKeepAliveId
}

{-------------------------------------------------------------------------------
Expand Down Expand Up @@ -323,7 +352,7 @@ showTracers tr = Tracers {
-- | Applications for the node-to-node protocols
--
-- See 'Network.Mux.Types.MuxApplication'
data Apps m peer blk bCS bBF bTX a = Apps {
data Apps m peer blk bCS bBF bTX bKA a = Apps {
-- | Start a chain sync client that communicates with the given upstream
-- node.
aChainSyncClient :: BlockNodeToNodeVersion blk -> peer -> Channel m bCS -> m (a, Maybe bCS)
Expand All @@ -344,11 +373,17 @@ data Apps m peer blk bCS bBF bTX a = Apps {

-- | Start a transaction submission server.
, aTxSubmissionServer :: BlockNodeToNodeVersion blk -> peer -> Channel m bTX -> m (a, Maybe bTX)

-- | Start a keep-alive client.
, aKeepAliveClient :: BlockNodeToNodeVersion blk -> peer -> Channel m bKA -> m a

-- | Start a keep-alive server.
, aKeepAliveServer :: BlockNodeToNodeVersion blk -> peer -> Channel m bKA -> m a
}

-- | Construct the 'NetworkApplication' for the node-to-node protocols
mkApps
:: forall m remotePeer localPeer blk e bCS bBF bTX.
:: forall m remotePeer localPeer blk e bCS bBF bTX bKA.
( IOLike m
, MonadTimer m
, Ord remotePeer
Expand All @@ -357,10 +392,10 @@ mkApps
)
=> NodeKernel m remotePeer localPeer blk -- ^ Needed for bracketing only
-> Tracers m remotePeer blk e
-> Codecs blk e m bCS bCS bBF bBF bTX
-> Codecs blk e m bCS bCS bBF bBF bTX bKA
-> m (Maybe DiffTime)
-> Handlers m remotePeer blk
-> Apps m remotePeer blk bCS bBF bTX ()
-> Apps m remotePeer blk bCS bBF bTX bKA ()
mkApps kernel Tracers {..} Codecs {..} genChainSyncTimeout Handlers {..} =
Apps {..}
where
Expand Down Expand Up @@ -476,6 +511,40 @@ mkApps kernel Tracers {..} Codecs {..} genChainSyncTimeout Handlers {..} =
channel
(txSubmissionServerPeerPipelined (hTxSubmissionServer version them))

aKeepAliveClient
:: BlockNodeToNodeVersion blk
-> remotePeer
-> Channel m bKA
-> m ()
aKeepAliveClient version them channel = do
labelThisThread "KeepAliveClient"
startTs <- newTVarM Nothing
bracketKeepAliveClient (getFetchClientRegistry kernel) them $ \dqCtx -> do
runPeerWithLimits
nullTracer
cKeepAliveCodec
(byteLimitsKeepAlive (const 0)) -- TODO: Real Bytelimits, see #1727
timeLimitsKeepAlive
channel
$ keepAliveClientPeer
$ hKeepAliveClient version them dqCtx (KeepAliveInterval 10) startTs

aKeepAliveServer
:: BlockNodeToNodeVersion blk
-> remotePeer
-> Channel m bKA
-> m ()
aKeepAliveServer _version _them channel = do
labelThisThread "KeepAliveServer"
runPeerWithLimits
nullTracer
cKeepAliveCodec
(byteLimitsKeepAlive (const 0)) -- TODO: Real Bytelimits, see #1727
timeLimitsKeepAlive
channel
$ keepAliveServerPeer
$ keepAliveServer

{-------------------------------------------------------------------------------
Projections from 'Apps'
-------------------------------------------------------------------------------}
Expand All @@ -489,7 +558,7 @@ mkApps kernel Tracers {..} Codecs {..} genChainSyncTimeout Handlers {..} =
initiator
:: MiniProtocolParameters
-> BlockNodeToNodeVersion blk
-> Apps m (ConnectionId peer) blk b b b a
-> Apps m (ConnectionId peer) blk b b b b a
-> OuroborosApplication 'InitiatorMode peer b m a Void
initiator miniProtocolParameters version Apps {..} =
nodeToNodeProtocols
Expand All @@ -506,7 +575,9 @@ initiator miniProtocolParameters version Apps {..} =
blockFetchProtocol =
(InitiatorProtocolOnly (MuxPeerRaw (aBlockFetchClient version them))),
txSubmissionProtocol =
(InitiatorProtocolOnly (MuxPeerRaw (aTxSubmissionClient version them)))
(InitiatorProtocolOnly (MuxPeerRaw (aTxSubmissionClient version them))),
keepAliveProtocol =
(InitiatorProtocolOnly (MuxPeerRaw (aKeepAliveClient version them)))
})

-- | A projection from 'NetworkApplication' to a server-side
Expand All @@ -516,7 +587,7 @@ initiator miniProtocolParameters version Apps {..} =
responder
:: MiniProtocolParameters
-> BlockNodeToNodeVersion blk
-> Apps m (ConnectionId peer) blk b b b a
-> Apps m (ConnectionId peer) blk b b b b a
-> OuroborosApplication 'ResponderMode peer b m Void a
responder miniProtocolParameters version Apps {..} =
nodeToNodeProtocols
Expand All @@ -527,5 +598,7 @@ responder miniProtocolParameters version Apps {..} =
blockFetchProtocol =
(ResponderProtocolOnly (MuxPeerRaw (aBlockFetchServer version them))),
txSubmissionProtocol =
(ResponderProtocolOnly (MuxPeerRaw (aTxSubmissionServer version them)))
(ResponderProtocolOnly (MuxPeerRaw (aTxSubmissionServer version them))),
keepAliveProtocol =
(ResponderProtocolOnly (MuxPeerRaw (aKeepAliveServer version them)))
})
4 changes: 2 additions & 2 deletions ouroboros-consensus/src/Ouroboros/Consensus/Node.hs
Expand Up @@ -249,7 +249,7 @@ run runargs@RunNodeArgs{..} =
:: NodeArgs IO RemoteConnectionId LocalConnectionId blk
-> NodeKernel IO RemoteConnectionId LocalConnectionId blk
-> BlockNodeToNodeVersion blk
-> NTN.Apps IO RemoteConnectionId blk ByteString ByteString ByteString ()
-> NTN.Apps IO RemoteConnectionId blk ByteString ByteString ByteString ByteString ()
mkNodeToNodeApps nodeArgs nodeKernel version =
NTN.mkApps
nodeKernel
Expand Down Expand Up @@ -282,7 +282,7 @@ run runargs@RunNodeArgs{..} =
mkDiffusionApplications
:: MiniProtocolParameters
-> ( BlockNodeToNodeVersion blk
-> NTN.Apps IO RemoteConnectionId blk ByteString ByteString ByteString ()
-> NTN.Apps IO RemoteConnectionId blk ByteString ByteString ByteString ByteString ()
)
-> ( BlockNodeToClientVersion blk
-> NTC.Apps IO LocalConnectionId ByteString ByteString ByteString ()
Expand Down
6 changes: 6 additions & 0 deletions ouroboros-consensus/src/Ouroboros/Consensus/Node/Tracers.hs
Expand Up @@ -19,6 +19,7 @@ import Control.Tracer (Tracer, nullTracer, showTracing)
import Ouroboros.Network.Block (BlockNo, Point, SlotNo)
import Ouroboros.Network.BlockFetch (FetchDecision,
TraceFetchClientState, TraceLabelPeer)
import Ouroboros.Network.KeepAlive (TraceKeepAliveClient)
import Ouroboros.Network.TxSubmission.Inbound
(TraceTxSubmissionInbound)
import Ouroboros.Network.TxSubmission.Outbound
Expand All @@ -41,6 +42,7 @@ import Ouroboros.Consensus.MiniProtocol.ChainSync.Server
import Ouroboros.Consensus.MiniProtocol.LocalTxSubmission.Server
(TraceLocalTxSubmissionServerEvent (..))


{-------------------------------------------------------------------------------
All tracers of a node bundled together
-------------------------------------------------------------------------------}
Expand All @@ -58,6 +60,7 @@ data Tracers' remotePeer localPeer blk f = Tracers
, mempoolTracer :: f (TraceEventMempool blk)
, forgeTracer :: f (TraceForgeEvent blk)
, blockchainTimeTracer :: f TraceBlockchainTimeEvent
, keepAliveClientTracer :: f TraceKeepAliveClient

-- | Called on every slot with the possibly updated 'ForgeState'
--
Expand All @@ -82,6 +85,7 @@ instance (forall a. Semigroup (f a))
, forgeTracer = f forgeTracer
, forgeStateTracer = f forgeStateTracer
, blockchainTimeTracer = f blockchainTimeTracer
, keepAliveClientTracer = f keepAliveClientTracer
}
where
f :: forall a. Semigroup a
Expand All @@ -108,6 +112,7 @@ nullTracers = Tracers
, forgeTracer = nullTracer
, forgeStateTracer = nullTracer
, blockchainTimeTracer = nullTracer
, keepAliveClientTracer = nullTracer
}

showTracers :: ( Show blk
Expand All @@ -134,6 +139,7 @@ showTracers tr = Tracers
, forgeTracer = showTracing tr
, forgeStateTracer = showTracing tr
, blockchainTimeTracer = showTracing tr
, keepAliveClientTracer = showTracing tr
}

{-------------------------------------------------------------------------------
Expand Down
Expand Up @@ -17,6 +17,7 @@ import Control.Tracer (nullTracer)
import qualified Codec.CBOR.Read as CBOR
import Data.Functor.Identity (Identity (..))
import Data.ByteString.Lazy (ByteString)
import qualified Data.ByteString.Lazy as BL

import Network.TypedProtocol.Core
import Network.TypedProtocol.Proofs
Expand Down Expand Up @@ -164,4 +165,4 @@ prop_byteLimits (AnyMessageAndAgency agency msg) =
<= sizeLimitForState agency
where
Codec { encode } = (codecKeepAlive :: Codec KeepAlive CBOR.DeserialiseFailure IO ByteString)
ProtocolSizeLimits { sizeLimitForState, dataSize } = byteLimitsKeepAlive
ProtocolSizeLimits { sizeLimitForState, dataSize } = byteLimitsKeepAlive (fromIntegral . BL.length)
5 changes: 3 additions & 2 deletions ouroboros-network/src/Ouroboros/Network/BlockFetch.hs
Expand Up @@ -92,6 +92,7 @@ module Ouroboros.Network.BlockFetch (
newFetchClientRegistry,
bracketFetchClient,
bracketSyncWithFetchClient,
bracketKeepAliveClient,

-- * Re-export types used by 'BlockFetchConsensusInterface'
FetchMode (..),
Expand All @@ -116,8 +117,8 @@ import Ouroboros.Network.BlockFetch.ClientRegistry
( FetchClientPolicy(..)
, FetchClientRegistry, newFetchClientRegistry
, readFetchClientsStatus, readFetchClientsStateVars
, bracketFetchClient, bracketSyncWithFetchClient
, setFetchClientContext )
, bracketFetchClient, bracketKeepAliveClient
, bracketSyncWithFetchClient, setFetchClientContext )


-- | The consensus layer functionality that the block fetch logic requires.
Expand Down

0 comments on commit d6b975c

Please sign in to comment.