Skip to content

Commit

Permalink
control-message: updated ouroboros-consensus
Browse files Browse the repository at this point in the history
Only the client sides need to have access to `ControlMessageSTM`.
  • Loading branch information
coot committed Sep 21, 2020
1 parent e014695 commit 267a90a
Showing 1 changed file with 34 additions and 28 deletions.
62 changes: 34 additions & 28 deletions ouroboros-consensus/src/Ouroboros/Consensus/Network/NodeToNode.hs
Expand Up @@ -21,7 +21,8 @@ module Ouroboros.Consensus.Network.NodeToNode (
, nullTracers
, showTracers
-- * Applications
, App
, ClientApp
, ServerApp
, Apps (..)
, mkApps
-- ** Projections
Expand Down Expand Up @@ -115,6 +116,7 @@ data Handlers m peer blk = Handlers {
-- TODO block fetch client does not have GADT view of the handlers.
, hBlockFetchClient
:: NodeToNodeVersion
-> ControlMessageSTM m
-> BlockFetchClient (Header blk) blk m ()

, hBlockFetchServer
Expand All @@ -124,6 +126,7 @@ data Handlers m peer blk = Handlers {

, hTxSubmissionClient
:: NodeToNodeVersion
-> ControlMessageSTM m
-> peer
-> TxSubmissionClient (GenTxId blk) (GenTx blk) m ()

Expand Down Expand Up @@ -181,12 +184,13 @@ mkHandlers
(Node.blockFetchServerTracer tracers)
getChainDB
version
, hTxSubmissionClient = \version peer ->
, hTxSubmissionClient = \version controlMessageSTM peer ->
txSubmissionOutbound
(contramap (TraceLabelPeer peer) (Node.txOutboundTracer tracers))
(txSubmissionMaxUnacked miniProtocolParameters)
(getMempoolReader getMempool)
version
controlMessageSTM
, hTxSubmissionServer = \version peer ->
txSubmissionInbound
(contramap (TraceLabelPeer peer) (Node.txInboundTracer tracers))
Expand Down Expand Up @@ -351,43 +355,49 @@ showTracers tr = Tracers {
-------------------------------------------------------------------------------}

-- | A node-to-node application
type App m peer bytes a =
type ClientApp m peer bytes a =
NodeToNodeVersion
-> ControlMessageSTM m
-> peer
-> Channel m bytes
-> m (a, Maybe bytes)

type ServerApp m peer bytes a =
NodeToNodeVersion
-> peer
-> Channel m bytes
-> m (a, Maybe bytes)

-- | Applications for the node-to-node protocols
--
-- See 'Network.Mux.Types.MuxApplication'
data Apps m peer bCS bBF bTX bKA a = Apps {
-- | Start a chain sync client that communicates with the given upstream
-- node.
aChainSyncClient :: App m peer bCS a
aChainSyncClient :: ClientApp m peer bCS a

-- | Start a chain sync server.
, aChainSyncServer :: App m peer bCS a
, aChainSyncServer :: ServerApp m peer bCS a

-- | Start a block fetch client that communicates with the given
-- upstream node.
, aBlockFetchClient :: App m peer bBF a
, aBlockFetchClient :: ClientApp m peer bBF a

-- | Start a block fetch server.
, aBlockFetchServer :: App m peer bBF a
, aBlockFetchServer :: ServerApp m peer bBF a

-- | Start a transaction submission client that communicates with the
-- given upstream node.
, aTxSubmissionClient :: App m peer bTX a
, aTxSubmissionClient :: ClientApp m peer bTX a

-- | Start a transaction submission server.
, aTxSubmissionServer :: App m peer bTX a
, aTxSubmissionServer :: ServerApp m peer bTX a

-- | Start a keep-alive client.
, aKeepAliveClient :: App m peer bKA a
, aKeepAliveClient :: ClientApp m peer bKA a

-- | Start a keep-alive server.
, aKeepAliveServer :: App m peer bKA a
, aKeepAliveServer :: ServerApp m peer bKA a
}

-- | Construct the 'NetworkApplication' for the node-to-node protocols
Expand Down Expand Up @@ -446,11 +456,10 @@ mkApps kernel Tracers {..} Codecs {..} genChainSyncTimeout Handlers {..} =

aChainSyncServer
:: NodeToNodeVersion
-> ControlMessageSTM m
-> remotePeer
-> Channel m bCS
-> m ((), Maybe bCS)
aChainSyncServer version _controlMessageSTM them channel = do
aChainSyncServer version them channel = do
labelThisThread "ChainSyncServer"
withRegistry $ \registry -> do
chainSyncTimeout <- genChainSyncTimeout
Expand All @@ -469,7 +478,7 @@ mkApps kernel Tracers {..} Codecs {..} genChainSyncTimeout Handlers {..} =
-> remotePeer
-> Channel m bBF
-> m ((), Maybe bBF)
aBlockFetchClient version _controlMessageSTM them channel = do
aBlockFetchClient version controlMessageSTM them channel = do
labelThisThread "BlockFetchClient"
bracketFetchClient (getFetchClientRegistry kernel) them $ \clientCtx ->
runPipelinedPeerWithLimits
Expand All @@ -478,15 +487,14 @@ mkApps kernel Tracers {..} Codecs {..} genChainSyncTimeout Handlers {..} =
(byteLimitsBlockFetch (const 0)) -- TODO: Real Bytelimits, see #1727
timeLimitsBlockFetch
channel
$ hBlockFetchClient version clientCtx
$ hBlockFetchClient version controlMessageSTM clientCtx

aBlockFetchServer
:: NodeToNodeVersion
-> ControlMessageSTM m
-> remotePeer
-> Channel m bBF
-> m ((), Maybe bBF)
aBlockFetchServer version _controlMessageSTM them channel = do
aBlockFetchServer version them channel = do
labelThisThread "BlockFetchServer"
withRegistry $ \registry ->
runPeerWithLimits
Expand All @@ -504,23 +512,22 @@ mkApps kernel Tracers {..} Codecs {..} genChainSyncTimeout Handlers {..} =
-> remotePeer
-> Channel m bTX
-> m ((), Maybe bTX)
aTxSubmissionClient version _controlMessageSTM them channel = do
aTxSubmissionClient version controlMessageSTM them channel = do
labelThisThread "TxSubmissionClient"
runPeerWithLimits
(contramap (TraceLabelPeer them) tTxSubmissionTracer)
cTxSubmissionCodec
(byteLimitsTxSubmission (const 0)) -- TODO: Real Bytelimits, see #1727
timeLimitsTxSubmission
channel
(txSubmissionClientPeer (hTxSubmissionClient version them))
(txSubmissionClientPeer (hTxSubmissionClient version controlMessageSTM them))

aTxSubmissionServer
:: NodeToNodeVersion
-> ControlMessageSTM m
-> remotePeer
-> Channel m bTX
-> m ((), Maybe bTX)
aTxSubmissionServer version _controlMessageSTM them channel = do
aTxSubmissionServer version them channel = do
labelThisThread "TxSubmissionServer"
runPipelinedPeerWithLimits
(contramap (TraceLabelPeer them) tTxSubmissionTracer)
Expand Down Expand Up @@ -558,11 +565,10 @@ mkApps kernel Tracers {..} Codecs {..} genChainSyncTimeout Handlers {..} =

aKeepAliveServer
:: NodeToNodeVersion
-> ControlMessageSTM m
-> remotePeer
-> Channel m bKA
-> m ((), Maybe bKA)
aKeepAliveServer _version _controlMessageSTM _them channel = do
aKeepAliveServer _version _them channel = do
labelThisThread "KeepAliveServer"
runPeerWithLimits
nullTracer
Expand Down Expand Up @@ -621,14 +627,14 @@ responder
responder miniProtocolParameters version Apps {..} =
nodeToNodeProtocols
miniProtocolParameters
(\them controlMessageSTM -> NodeToNodeProtocols {
(\them _controlMessageSTM -> NodeToNodeProtocols {
chainSyncProtocol =
(ResponderProtocolOnly (MuxPeerRaw (aChainSyncServer version controlMessageSTM them))),
(ResponderProtocolOnly (MuxPeerRaw (aChainSyncServer version them))),
blockFetchProtocol =
(ResponderProtocolOnly (MuxPeerRaw (aBlockFetchServer version controlMessageSTM them))),
(ResponderProtocolOnly (MuxPeerRaw (aBlockFetchServer version them))),
txSubmissionProtocol =
(ResponderProtocolOnly (MuxPeerRaw (aTxSubmissionServer version controlMessageSTM them))),
(ResponderProtocolOnly (MuxPeerRaw (aTxSubmissionServer version them))),
keepAliveProtocol =
(ResponderProtocolOnly (MuxPeerRaw (aKeepAliveServer version controlMessageSTM them)))
(ResponderProtocolOnly (MuxPeerRaw (aKeepAliveServer version them)))
})
version

0 comments on commit 267a90a

Please sign in to comment.