Skip to content

Commit

Permalink
Peer Sharing protocol integration
Browse files Browse the repository at this point in the history
  • Loading branch information
bolt12 committed Feb 8, 2023
1 parent 4af901c commit e7f7a03
Show file tree
Hide file tree
Showing 28 changed files with 750 additions and 157 deletions.
Expand Up @@ -108,7 +108,23 @@ import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Consensus.Util.Orphans ()
import Ouroboros.Consensus.Util.ResourceRegistry

import qualified Codec.CBOR.Decoding as CBOR
import qualified Codec.CBOR.Encoding as CBOR
import Control.Monad.Class.MonadMVar (MonadMVar)
import Ouroboros.Consensus.Storage.Serialisation (SerialisedHeader)
import qualified Ouroboros.Network.PeerSelection.PeerSharing.Type as PSTypes
import Ouroboros.Network.PeerSharing (PeerSharingController,
bracketPeerSharingClient, peerSharingClient,
peerSharingServer)
import Ouroboros.Network.Protocol.PeerSharing.Client
(PeerSharingClient, peerSharingClientPeer)
import Ouroboros.Network.Protocol.PeerSharing.Codec
(byteLimitsPeerSharing, codecPeerSharing,
codecPeerSharingId, timeLimitsPeerSharing)
import Ouroboros.Network.Protocol.PeerSharing.Server
(PeerSharingServer, peerSharingServerPeer)
import Ouroboros.Network.Protocol.PeerSharing.Type (PeerSharing,
PeerSharingAmount)

{-------------------------------------------------------------------------------
Handlers
Expand Down Expand Up @@ -168,24 +184,40 @@ data Handlers m addr blk = Handlers {
:: NodeToNodeVersion
-> ConnectionId addr
-> KeepAliveServer m ()

, hPeerSharingClient
:: NodeToNodeVersion
-> ControlMessageSTM m
-> addr
-> PeerSharingController addr m
-> m (PeerSharingClient addr m ())

, hPeerSharingServer
:: NodeToNodeVersion
-> addr
-> PeerSharingServer addr m
}

mkHandlers
:: forall m blk addrNTN addrNTC.
( IOLike m
, MonadTime m
, MonadTimer m
, MonadMVar m
, LedgerSupportsMempool blk
, HasTxId (GenTx blk)
, LedgerSupportsProtocol blk
, Ord addrNTN
)
=> NodeKernelArgs m addrNTN addrNTC blk
-> NodeKernel m addrNTN addrNTC blk
-> (PeerSharingAmount -> m [addrNTN])
-- ^ Peer Sharing result computation callback
-> Handlers m addrNTN blk
mkHandlers
NodeKernelArgs {keepAliveRng, miniProtocolParameters}
NodeKernel {getChainDB, getMempool, getTopLevelConfig, getTracers = tracers} =
NodeKernel {getChainDB, getMempool, getTopLevelConfig, getTracers = tracers}
computePeers =
Handlers {
hChainSyncClient = \peer ->
chainSyncClient
Expand Down Expand Up @@ -222,30 +254,38 @@ mkHandlers
version
, hKeepAliveClient = \_version -> keepAliveClient (Node.keepAliveClientTracer tracers) keepAliveRng
, hKeepAliveServer = \_version _peer -> keepAliveServer
, hPeerSharingClient = \_version controlMessageSTM _peer -> peerSharingClient controlMessageSTM
, hPeerSharingServer = \_version _peer -> peerSharingServer computePeers
}

{-------------------------------------------------------------------------------
Codecs
-------------------------------------------------------------------------------}

-- | Node-to-node protocol codecs needed to run 'Handlers'.
data Codecs blk e m bCS bSCS bBF bSBF bTX bKA = Codecs {
data Codecs blk addr e m bCS bSCS bBF bSBF bTX bKA bPS = Codecs {
cChainSyncCodec :: Codec (ChainSync (Header blk) (Point blk) (Tip blk)) e m bCS
, cChainSyncCodecSerialised :: Codec (ChainSync (SerialisedHeader blk) (Point blk) (Tip blk)) e m bSCS
, cBlockFetchCodec :: Codec (BlockFetch blk (Point blk)) e m bBF
, cBlockFetchCodecSerialised :: Codec (BlockFetch (Serialised blk) (Point blk)) e m bSBF
, cTxSubmission2Codec :: Codec (TxSubmission2 (GenTxId blk) (GenTx blk)) e m bTX
, cKeepAliveCodec :: Codec KeepAlive e m bKA
, cPeerSharingCodec :: Codec (PeerSharing addr) e m bPS
}

-- | Protocol codecs for the node-to-node protocols
defaultCodecs :: forall m blk. (IOLike m, SerialiseNodeToNodeConstraints blk)
defaultCodecs :: forall m blk addr.
( IOLike m
, SerialiseNodeToNodeConstraints blk
)
=> CodecConfig blk
-> BlockNodeToNodeVersion blk
-> (addr -> CBOR.Encoding)
-> (forall s . CBOR.Decoder s addr)
-> NodeToNodeVersion
-> Codecs blk DeserialiseFailure m
ByteString ByteString ByteString ByteString ByteString ByteString
defaultCodecs ccfg version _nodeToNodeVersion = Codecs {
-> Codecs blk addr DeserialiseFailure m
ByteString ByteString ByteString ByteString ByteString ByteString ByteString
defaultCodecs ccfg version encAddr decAddr _nodeToNodeVersion = Codecs {
cChainSyncCodec =
codecChainSync
enc
Expand Down Expand Up @@ -286,6 +326,8 @@ defaultCodecs ccfg version _nodeToNodeVersion = Codecs {
dec

, cKeepAliveCodec = codecKeepAlive_v2

, cPeerSharingCodec = codecPeerSharing encAddr decAddr
}
where
p :: Proxy blk
Expand All @@ -299,20 +341,22 @@ defaultCodecs ccfg version _nodeToNodeVersion = Codecs {

-- | Identity codecs used in tests.
identityCodecs :: Monad m
=> Codecs blk CodecFailure m
=> Codecs blk addr CodecFailure m
(AnyMessage (ChainSync (Header blk) (Point blk) (Tip blk)))
(AnyMessage (ChainSync (SerialisedHeader blk) (Point blk) (Tip blk)))
(AnyMessage (BlockFetch blk (Point blk)))
(AnyMessage (BlockFetch (Serialised blk) (Point blk)))
(AnyMessage (TxSubmission2 (GenTxId blk) (GenTx blk)))
(AnyMessage KeepAlive)
(AnyMessage (PeerSharing addr))
identityCodecs = Codecs {
cChainSyncCodec = codecChainSyncId
, cChainSyncCodecSerialised = codecChainSyncId
, cBlockFetchCodec = codecBlockFetchId
, cBlockFetchCodecSerialised = codecBlockFetchId
, cTxSubmission2Codec = codecTxSubmission2Id
, cKeepAliveCodec = codecKeepAliveId
, cPeerSharingCodec = codecPeerSharingId
}

{-------------------------------------------------------------------------------
Expand Down Expand Up @@ -393,7 +437,7 @@ type ServerApp m peer bytes a =
-- | Applications for the node-to-node protocols
--
-- See 'Network.Mux.Types.MuxApplication'
data Apps m addr bCS bBF bTX bKA a b = Apps {
data Apps m addr bCS bBF bTX bKA bPS a b = Apps {
-- | Start a chain sync client that communicates with the given upstream
-- node.
aChainSyncClient :: ClientApp m (ConnectionId addr) bCS a
Expand All @@ -420,6 +464,12 @@ data Apps m addr bCS bBF bTX bKA a b = Apps {

-- | Start a keep-alive server.
, aKeepAliveServer :: ServerApp m (ConnectionId addr) bKA b

-- | Start a peer-sharing client.
, aPeerSharingClient :: ClientApp m addr bPS a

-- | Start a peer-sharing server.
, aPeerSharingServer :: ServerApp m addr bPS b
}


Expand Down Expand Up @@ -473,7 +523,7 @@ byteLimits = ByteLimits {

-- | Construct the 'NetworkApplication' for the node-to-node protocols
mkApps
:: forall m addrNTN addrNTC blk e bCS bBF bTX bKA.
:: forall m addrNTN addrNTC blk e bCS bBF bTX bKA bPS.
( IOLike m
, MonadTimer m
, Ord addrNTN
Expand All @@ -486,12 +536,12 @@ mkApps
)
=> NodeKernel m addrNTN addrNTC blk -- ^ Needed for bracketing only
-> Tracers m (ConnectionId addrNTN) blk e
-> (NodeToNodeVersion -> Codecs blk e m bCS bCS bBF bBF bTX bKA)
-> (NodeToNodeVersion -> Codecs blk addrNTN e m bCS bCS bBF bBF bTX bKA bPS)
-> ByteLimits bCS bBF bTX bKA
-> m ChainSyncTimeout
-> ReportPeerMetrics m (ConnectionId addrNTN)
-> Handlers m addrNTN blk
-> Apps m addrNTN bCS bBF bTX bKA NodeToNodeInitiatorResult ()
-> Apps m addrNTN bCS bBF bTX bKA bPS NodeToNodeInitiatorResult ()
mkApps kernel Tracers {..} mkCodecs ByteLimits {..} genChainSyncTimeout ReportPeerMetrics {..} Handlers {..} =
Apps {..}
where
Expand Down Expand Up @@ -664,6 +714,43 @@ mkApps kernel Tracers {..} mkCodecs ByteLimits {..} genChainSyncTimeout ReportPe
$ keepAliveServerPeer
$ keepAliveServer


aPeerSharingClient
:: NodeToNodeVersion
-> ControlMessageSTM m
-> addrNTN
-> Channel m bPS
-> m (NodeToNodeInitiatorResult, Maybe bPS)
aPeerSharingClient version controlMessageSTM them channel = do
labelThisThread "PeerSharingClient"
bracketPeerSharingClient (getPeerSharingRegistry kernel) them
$ \controller -> do
psClient <- hPeerSharingClient version controlMessageSTM them controller
((), trailing) <- runPeerWithLimits
nullTracer
(cPeerSharingCodec (mkCodecs version))
(byteLimitsPeerSharing (const 0))
timeLimitsPeerSharing
channel
(peerSharingClientPeer psClient)
return (NoInitiatorResult, trailing)

aPeerSharingServer
:: NodeToNodeVersion
-> addrNTN
-> Channel m bPS
-> m ((), Maybe bPS)
aPeerSharingServer version them channel = do
labelThisThread "PeerSharingServer"
runPeerWithLimits
nullTracer
(cPeerSharingCodec (mkCodecs version))
(byteLimitsPeerSharing (const 0))
timeLimitsPeerSharing
channel
$ peerSharingServerPeer
$ hPeerSharingServer version them

{-------------------------------------------------------------------------------
Projections from 'Apps'
-------------------------------------------------------------------------------}
Expand All @@ -677,9 +764,10 @@ mkApps kernel Tracers {..} mkCodecs ByteLimits {..} genChainSyncTimeout ReportPe
initiator
:: MiniProtocolParameters
-> NodeToNodeVersion
-> Apps m addr b b b b a c
-> PSTypes.PeerSharing
-> Apps m addr b b b b b a c
-> OuroborosBundle 'InitiatorMode addr b m a Void
initiator miniProtocolParameters version Apps {..} =
initiator miniProtocolParameters version ownPeerSharing Apps {..} =
nodeToNodeProtocols
miniProtocolParameters
-- TODO: currently consensus is using 'ConnectionId' for its 'peer' type.
Expand All @@ -696,9 +784,12 @@ initiator miniProtocolParameters version Apps {..} =
txSubmissionProtocol =
(InitiatorProtocolOnly (MuxPeerRaw (aTxSubmission2Client version controlMessageSTM them))),
keepAliveProtocol =
(InitiatorProtocolOnly (MuxPeerRaw (aKeepAliveClient version controlMessageSTM them)))
(InitiatorProtocolOnly (MuxPeerRaw (aKeepAliveClient version controlMessageSTM them))),
peerSharingProtocol =
(InitiatorProtocolOnly (MuxPeerRaw (aPeerSharingClient version controlMessageSTM (remoteAddress them))))
})
version
ownPeerSharing

-- | A bi-directional network applicaiton.
--
Expand All @@ -708,9 +799,10 @@ initiator miniProtocolParameters version Apps {..} =
initiatorAndResponder
:: MiniProtocolParameters
-> NodeToNodeVersion
-> Apps m addr b b b b a c
-> PSTypes.PeerSharing
-> Apps m addr b b b b b a c
-> OuroborosBundle 'InitiatorResponderMode addr b m a c
initiatorAndResponder miniProtocolParameters version Apps {..} =
initiatorAndResponder miniProtocolParameters version ownPeerSharing Apps {..} =
nodeToNodeProtocols
miniProtocolParameters
(\them controlMessageSTM -> NodeToNodeProtocols {
Expand All @@ -729,6 +821,12 @@ initiatorAndResponder miniProtocolParameters version Apps {..} =
keepAliveProtocol =
(InitiatorAndResponderProtocol
(MuxPeerRaw (aKeepAliveClient version controlMessageSTM them))
(MuxPeerRaw (aKeepAliveServer version them)))
(MuxPeerRaw (aKeepAliveServer version them))),

peerSharingProtocol =
(InitiatorAndResponderProtocol
(MuxPeerRaw (aPeerSharingClient version controlMessageSTM (remoteAddress them)))
(MuxPeerRaw (aPeerSharingServer version (remoteAddress them))))
})
version
ownPeerSharing

0 comments on commit e7f7a03

Please sign in to comment.