Skip to content

Commit

Permalink
Pass IsBigLedgerPeer to mini-protocol callbacks.
Browse files Browse the repository at this point in the history
The core change is adding a lambda in the initiator part of
`RunMiniProtocol`.  From there it is propagated up to
ouroboros-consensus-diffusion.
  • Loading branch information
coot committed Jun 2, 2023
1 parent fa015e0 commit 7a87c90
Show file tree
Hide file tree
Showing 17 changed files with 139 additions and 96 deletions.
7 changes: 4 additions & 3 deletions ouroboros-network-framework/demo/connection-manager.hs
Expand Up @@ -65,6 +65,7 @@ import qualified Ouroboros.Network.InboundGovernor.ControlChannel as Server
import Ouroboros.Network.IOManager
import Ouroboros.Network.Mux
import Ouroboros.Network.MuxMode
import Ouroboros.Network.PeerSelection.LedgerPeers.Type (IsBigLedgerPeer (..))
import Ouroboros.Network.Protocol.Handshake
import Ouroboros.Network.Protocol.Handshake.Codec
(timeLimitsHandshake)
Expand Down Expand Up @@ -338,7 +339,7 @@ withBidirectionalConnectionManager snocket makeBearer socket
-> RunMiniProtocol InitiatorResponderMode ByteString m () ()
reqRespInitiatorAndResponder protocolNum requestsVar =
InitiatorAndResponderProtocol
(MuxPeer
(const $ MuxPeer
(("Initiator",protocolNum,) `contramap` debugTracer) -- TraceSendRecv
(codecReqResp @Int @Int)
(Effect $ do
Expand Down Expand Up @@ -423,8 +424,8 @@ runInitiatorProtocols
Mux.StartEagerly
(runMuxPeer
(case miniProtocolRun ptcl of
InitiatorProtocolOnly initiator -> initiator
InitiatorAndResponderProtocol initiator _ -> initiator)
InitiatorProtocolOnly initiator -> initiator IsNotBigLedgerPeer
InitiatorAndResponderProtocol initiator _ -> initiator IsNotBigLedgerPeer)
. fromChannel)


Expand Down
8 changes: 4 additions & 4 deletions ouroboros-network-framework/demo/ping-pong.hs
Expand Up @@ -123,14 +123,14 @@ clientPingPong pipelined =
app = demoProtocol0 pingPongInitiator

pingPongInitiator | pipelined =
InitiatorProtocolOnly $
InitiatorProtocolOnly $ const $
MuxPeerPipelined
(contramap show stdoutTracer)
codecPingPong
(pingPongClientPeerPipelined (pingPongClientPipelinedMax 5))

| otherwise =
InitiatorProtocolOnly $
InitiatorProtocolOnly $ const $
MuxPeer
(contramap show stdoutTracer)
codecPingPong
Expand Down Expand Up @@ -225,14 +225,14 @@ clientPingPong2 =
app = demoProtocol1 pingpong pingpong'

pingpong =
InitiatorProtocolOnly $
InitiatorProtocolOnly $ const $
MuxPeer
(contramap (show . (,) (1 :: Int)) stdoutTracer)
codecPingPong
(pingPongClientPeer (pingPongClientCount 5))

pingpong'=
InitiatorProtocolOnly $
InitiatorProtocolOnly $ const $
MuxPeer
(contramap (show . (,) (2 :: Int)) stdoutTracer)
codecPingPong
Expand Down
63 changes: 40 additions & 23 deletions ouroboros-network-framework/src/Ouroboros/Network/Mux.hs
Expand Up @@ -68,6 +68,8 @@ import Ouroboros.Network.ControlMessage
import Ouroboros.Network.Driver
import Ouroboros.Network.Util.ShowProxy (ShowProxy)

import Ouroboros.Network.PeerSelection.LedgerPeers.Type (IsBigLedgerPeer (..))




Expand Down Expand Up @@ -206,6 +208,11 @@ instance Applicative TemperatureBundle where
-- Useful type synonyms
--

-- | 'MuxProtocolBundle' type alias captures context which is passed when
-- a connection was created: `ConnectionId` and `ControlMessageSTM`. Note that
-- `ControlMessageSTM` is shared between all mini-protocols of the same
-- temperature.
--
type MuxProtocolBundle (mode :: MuxMode) addr bytes m a b
= ConnectionId addr
-> ControlMessageSTM m
Expand All @@ -225,20 +232,25 @@ type MuxBundle (mode :: MuxMode) bytes m a b =
TemperatureBundle [MiniProtocol mode bytes m a b]


-- | 'RunMiniProtocol'. It also capture context (the `IsBigLedgerPeer`) which
-- is passed to the mini-protocol when a mini-protocol is started.
--
data RunMiniProtocol (mode :: MuxMode) bytes m a b where
InitiatorProtocolOnly
:: MuxPeer bytes m a
:: (IsBigLedgerPeer -> MuxPeer bytes m a)
-> RunMiniProtocol InitiatorMode bytes m a Void

ResponderProtocolOnly
:: MuxPeer bytes m b
-> RunMiniProtocol ResponderMode bytes m Void b

InitiatorAndResponderProtocol
:: MuxPeer bytes m a
:: (IsBigLedgerPeer -> MuxPeer bytes m a)
-> MuxPeer bytes m b
-> RunMiniProtocol InitiatorResponderMode bytes m a b

-- TODO: do we need this type? Don't we only use 'MuxPeerRaw'.
--
data MuxPeer bytes m a where
MuxPeer :: forall (pr :: PeerRole) ps (st :: ps) failure bytes m a.
( Show failure
Expand Down Expand Up @@ -267,19 +279,37 @@ data MuxPeer bytes m a where
:: (Channel m bytes -> m (a, Maybe bytes))
-> MuxPeer bytes m a

-- | Create non p2p mux application.
--
-- Note that callbacks will always receive `IsNotBigLedgerPeer`.
toApplication :: (MonadCatch m, MonadAsync m)
=> ConnectionId addr
-> ControlMessageSTM m
-> OuroborosApplication mode addr LBS.ByteString m a b
-> Mux.Compat.MuxApplication mode m a b
toApplication connectionId controlMessageSTM (OuroborosApplication ptcls) =
Mux.Compat.MuxApplication
[ Mux.Compat.MuxMiniProtocol {
Mux.Compat.miniProtocolNum = miniProtocolNum ptcl,
Mux.Compat.miniProtocolLimits = miniProtocolLimits ptcl,
Mux.Compat.miniProtocolRun = toMuxRunMiniProtocol (miniProtocolRun ptcl)
}
| ptcl <- ptcls connectionId controlMessageSTM ]
Mux.Compat.MuxApplication
[ Mux.Compat.MuxMiniProtocol {
Mux.Compat.miniProtocolNum = miniProtocolNum ptcl,
Mux.Compat.miniProtocolLimits = miniProtocolLimits ptcl,
Mux.Compat.miniProtocolRun = toMuxRunMiniProtocol (miniProtocolRun ptcl)
}
| ptcl <- ptcls connectionId controlMessageSTM ]
where
toMuxRunMiniProtocol :: forall mode m a b.
(MonadCatch m, MonadAsync m)
=> RunMiniProtocol mode LBS.ByteString m a b
-> Mux.Compat.RunMiniProtocol mode m a b
toMuxRunMiniProtocol (InitiatorProtocolOnly i) =
Mux.Compat.InitiatorProtocolOnly
(runMuxPeer (i IsNotBigLedgerPeer) . fromChannel)
toMuxRunMiniProtocol (ResponderProtocolOnly r) =
Mux.Compat.ResponderProtocolOnly
(runMuxPeer r . fromChannel)
toMuxRunMiniProtocol (InitiatorAndResponderProtocol i r) =
Mux.Compat.InitiatorAndResponderProtocol
(runMuxPeer (i IsNotBigLedgerPeer) . fromChannel)
(runMuxPeer r . fromChannel)


mkMuxApplicationBundle
Expand Down Expand Up @@ -322,20 +352,7 @@ mkMiniProtocolBundle = MiniProtocolBundle . foldMap fn
, Mux.ResponderDirection ]
]

toMuxRunMiniProtocol :: forall mode m a b.
(MonadCatch m, MonadAsync m)
=> RunMiniProtocol mode LBS.ByteString m a b
-> Mux.Compat.RunMiniProtocol mode m a b
toMuxRunMiniProtocol (InitiatorProtocolOnly i) =
Mux.Compat.InitiatorProtocolOnly (runMuxPeer i . fromChannel)
toMuxRunMiniProtocol (ResponderProtocolOnly r) =
Mux.Compat.ResponderProtocolOnly (runMuxPeer r . fromChannel)
toMuxRunMiniProtocol (InitiatorAndResponderProtocol i r) =
Mux.Compat.InitiatorAndResponderProtocol (runMuxPeer i . fromChannel)
(runMuxPeer r . fromChannel)

-- |
-- Run a @'MuxPeer'@ using either @'runPeer'@ or @'runPipelinedPeer'@.
-- | Run a @'MuxPeer'@ using either @'runPeer'@ or @'runPipelinedPeer'@.
--
runMuxPeer
:: ( MonadCatch m
Expand Down
Expand Up @@ -94,6 +94,7 @@ import Ouroboros.Network.InboundGovernor.State
import Ouroboros.Network.IOManager
import Ouroboros.Network.Mux
import Ouroboros.Network.MuxMode
import Ouroboros.Network.PeerSelection.LedgerPeers.Type (IsBigLedgerPeer (..))
import Ouroboros.Network.Protocol.Handshake
import Ouroboros.Network.Protocol.Handshake.Codec
(cborTermVersionDataCodec, noTimeLimitsHandshake,
Expand Down Expand Up @@ -414,7 +415,7 @@ withInitiatorOnlyConnectionManager name timeouts trTracer cmTracer snocket makeB
-> RunMiniProtocol InitiatorMode ByteString m [resp] Void
reqRespInitiator protocolNum nextRequest =
InitiatorProtocolOnly
(MuxPeerRaw $ \channel ->
(const $ MuxPeerRaw $ \channel ->
runPeerWithLimits
(WithName (name,"Initiator",protocolNum) `contramap` nullTracer)
-- TraceSendRecv
Expand Down Expand Up @@ -619,7 +620,7 @@ withBidirectionalConnectionManager name timeouts
-> RunMiniProtocol InitiatorResponderMode ByteString m [resp] acc
reqRespInitiatorAndResponder protocolNum accInit nextRequest =
InitiatorAndResponderProtocol
(MuxPeerRaw $ \channel ->
(const $ MuxPeerRaw $ \channel ->
runPeerWithLimits
(WithName (name,"Initiator",protocolNum) `contramap` nullTracer)
-- TraceSendRecv
Expand Down Expand Up @@ -718,8 +719,8 @@ runInitiatorProtocols
Mux.StartEagerly
(runMuxPeer
(case miniProtocolRun ptcl of
InitiatorProtocolOnly initiator -> initiator
InitiatorAndResponderProtocol initiator _ -> initiator)
InitiatorProtocolOnly initiator -> initiator IsNotBigLedgerPeer
InitiatorAndResponderProtocol initiator _ -> initiator IsNotBigLedgerPeer)
. fromChannel)

--
Expand Down
Expand Up @@ -233,7 +233,7 @@ prop_socket_send_recv initiatorAddr responderAddr configureSock f xs =
initiatorApp = testProtocols2 reqRespInitiator

reqRespInitiator =
InitiatorProtocolOnly $
InitiatorProtocolOnly $ const $
-- TODO: For the moment this needs MuxPeerRaw because it has to
-- do something with the result after the protocol is run.
-- This should be replaced with use of the handles.
Expand Down Expand Up @@ -494,7 +494,7 @@ prop_socket_client_connect_error _ xs =
app = testProtocols2 reqRespInitiator

reqRespInitiator =
InitiatorProtocolOnly $
InitiatorProtocolOnly $ const $
-- TODO: For the moment this needs MuxPeerRaw because it has to
-- do something with the result after the protocol is run.
-- This should be replaced with use of the handles.
Expand Down
Expand Up @@ -577,7 +577,7 @@ prop_send_recv f xs _first = ioProperty $ withIOManager $ \iocp -> do
initiatorApp = testProtocols2 reqRespInitiator

reqRespInitiator =
InitiatorProtocolOnly $
InitiatorProtocolOnly $ const $
MuxPeerRaw $ \channel -> do
(r, trailing) <- runPeer (tagTrace "Initiator" activeTracer)
ReqResp.codecReqResp
Expand Down Expand Up @@ -715,7 +715,7 @@ prop_send_recv_init_and_rsp f xs = ioProperty $ withIOManager $ \iocp -> do
reqResp ReqRspCfg {rrcTag, rrcServerVar, rrcClientVar, rrcSiblingVar} =
InitiatorAndResponderProtocol
-- Initiator
(MuxPeerRaw $ \channel -> do
(const $ MuxPeerRaw $ \channel -> do
(r, trailing) <- runPeer (tagTrace (rrcTag ++ " Initiator") activeTracer)
ReqResp.codecReqResp
channel
Expand Down Expand Up @@ -904,7 +904,7 @@ _demo = ioProperty $ withIOManager $ \iocp -> do

appReq =
testProtocols1 $
InitiatorProtocolOnly $
InitiatorProtocolOnly $ const $
MuxPeerRaw $ \_ -> error "req fail"

appRsp =
Expand Down
6 changes: 3 additions & 3 deletions ouroboros-network/demo/chain-sync.hs
Expand Up @@ -176,7 +176,7 @@ clientChainSync sockPaths = withIOManager $ \iocp ->
app = demoProtocol2 chainSync

chainSync =
InitiatorProtocolOnly $
InitiatorProtocolOnly $ const $
MuxPeer
(contramap show stdoutTracer)
codecChainSync
Expand Down Expand Up @@ -271,7 +271,7 @@ clientBlockFetch sockAddrs = withIOManager $ \iocp -> do
chainSync :: LocalConnectionId
-> RunMiniProtocol InitiatorMode LBS.ByteString IO () Void
chainSync connectionId =
InitiatorProtocolOnly $
InitiatorProtocolOnly $ const $
-- TODO: this currently needs MuxPeerRaw because of the resource
-- bracket
MuxPeerRaw $ \channel ->
Expand All @@ -295,7 +295,7 @@ clientBlockFetch sockAddrs = withIOManager $ \iocp -> do
blockFetch :: LocalConnectionId
-> RunMiniProtocol InitiatorMode LBS.ByteString IO () Void
blockFetch connectionId =
InitiatorProtocolOnly $
InitiatorProtocolOnly $ const $
-- TODO: this currently needs MuxPeerRaw because of the resource
-- bracket
MuxPeerRaw $ \channel ->
Expand Down
Expand Up @@ -23,6 +23,7 @@ import Control.Monad.Class.MonadTimer.SI
import System.Random (randomR)

import Ouroboros.Network.PeerSelection.Governor.Types
import Ouroboros.Network.PeerSelection.LedgerPeers (IsBigLedgerPeer (..))
import qualified Ouroboros.Network.PeerSelection.State.EstablishedPeers as EstablishedPeers
import Ouroboros.Network.PeerSelection.State.KnownPeers (setTepidFlag)
import qualified Ouroboros.Network.PeerSelection.State.LocalRootPeers as LocalRootPeers
Expand Down Expand Up @@ -102,7 +103,7 @@ belowTargetBigLedgerPeers actions
inProgressPromoteWarm = inProgressPromoteWarm
<> selectedToPromote
},
decisionJobs = [ jobPromoteWarmPeer actions policy peeraddr peerconn
decisionJobs = [ jobPromoteWarmPeer actions policy peeraddr IsBigLedgerPeer peerconn
| (peeraddr, peerconn) <- Map.assocs selectedToPromote' ]
}

Expand Down Expand Up @@ -183,7 +184,7 @@ belowTargetLocal actions
inProgressPromoteWarm = inProgressPromoteWarm
<> selectedToPromote
},
decisionJobs = [ jobPromoteWarmPeer actions policy peeraddr peerconn
decisionJobs = [ jobPromoteWarmPeer actions policy peeraddr IsNotBigLedgerPeer peerconn
| (peeraddr, peerconn) <- Map.assocs selectedToPromote' ]
}

Expand Down Expand Up @@ -263,7 +264,7 @@ belowTargetOther actions
inProgressPromoteWarm = inProgressPromoteWarm
<> selectedToPromote
},
decisionJobs = [ jobPromoteWarmPeer actions policy peeraddr peerconn
decisionJobs = [ jobPromoteWarmPeer actions policy peeraddr IsNotBigLedgerPeer peerconn
| (peeraddr, peerconn) <- Map.assocs selectedToPromote' ]
}

Expand All @@ -286,11 +287,12 @@ jobPromoteWarmPeer :: forall peeraddr peerconn m.
=> PeerSelectionActions peeraddr peerconn m
-> PeerSelectionPolicy peeraddr m
-> peeraddr
-> IsBigLedgerPeer
-> peerconn
-> Job () m (Completion m peeraddr peerconn)
jobPromoteWarmPeer PeerSelectionActions{peerStateActions = PeerStateActions {activatePeerConnection}}
PeerSelectionPolicy { policyErrorDelay }
peeraddr peerconn =
peeraddr isBigLedgerPeer peerconn =
Job job handler () "promoteWarmPeer"
where
handler :: SomeException -> m (Completion m peeraddr peerconn)
Expand Down Expand Up @@ -381,7 +383,7 @@ jobPromoteWarmPeer PeerSelectionActions{peerStateActions = PeerStateActions {act

job :: m (Completion m peeraddr peerconn)
job = do
activatePeerConnection peerconn
activatePeerConnection isBigLedgerPeer peerconn
return $ Completion $ \st@PeerSelectionState {
bigLedgerPeers,
activePeers,
Expand Down

0 comments on commit 7a87c90

Please sign in to comment.