Skip to content

Commit

Permalink
WIP: further refactor MuxPeer
Browse files Browse the repository at this point in the history
TODO: Squash onto previous commit.
  • Loading branch information
coot committed Mar 17, 2023
1 parent 80daed5 commit 2ba25a1
Show file tree
Hide file tree
Showing 15 changed files with 421 additions and 383 deletions.
Expand Up @@ -469,12 +469,12 @@ responder version Apps {..} =
nodeToClientProtocols
(NodeToClientProtocols {
localChainSyncProtocol =
ResponderProtocolOnly $ \ctx -> MuxPeerRaw (aChainSyncServer (rcConnectionId ctx)),
ResponderProtocolOnly $ MuxPeerRaw $ \ctx -> aChainSyncServer (rcConnectionId ctx),
localTxSubmissionProtocol =
ResponderProtocolOnly $ \ctx -> MuxPeerRaw (aTxSubmissionServer (rcConnectionId ctx)),
ResponderProtocolOnly $ MuxPeerRaw $ \ctx -> aTxSubmissionServer (rcConnectionId ctx),
localStateQueryProtocol =
ResponderProtocolOnly $ \ctx -> MuxPeerRaw (aStateQueryServer (rcConnectionId ctx)),
ResponderProtocolOnly $ MuxPeerRaw $ \ctx -> aStateQueryServer (rcConnectionId ctx),
localTxMonitorProtocol =
ResponderProtocolOnly $ \ctx -> MuxPeerRaw (aTxMonitorServer (rcConnectionId ctx))
ResponderProtocolOnly $ MuxPeerRaw $ \ctx -> aTxMonitorServer (rcConnectionId ctx)
})
version
Expand Up @@ -795,15 +795,15 @@ initiator miniProtocolParameters version ownPeerSharing Apps {..} =
-- a quadruple uniquely determining a connection).
(NodeToNodeProtocols {
chainSyncProtocol =
(InitiatorProtocolOnly (\ctx -> MuxPeerRaw (aChainSyncClient version ctx))),
(InitiatorProtocolOnly (MuxPeerRaw (\ctx -> aChainSyncClient version ctx))),
blockFetchProtocol =
(InitiatorProtocolOnly (\ctx -> MuxPeerRaw (aBlockFetchClient version ctx))),
(InitiatorProtocolOnly (MuxPeerRaw (\ctx -> aBlockFetchClient version ctx))),
txSubmissionProtocol =
(InitiatorProtocolOnly (\ctx -> MuxPeerRaw (aTxSubmission2Client version ctx))),
(InitiatorProtocolOnly (MuxPeerRaw (\ctx -> aTxSubmission2Client version ctx))),
keepAliveProtocol =
(InitiatorProtocolOnly (\ctx -> MuxPeerRaw (aKeepAliveClient version ctx))),
(InitiatorProtocolOnly (MuxPeerRaw (\ctx -> aKeepAliveClient version ctx))),
peerSharingProtocol =
(InitiatorProtocolOnly (\ctx -> MuxPeerRaw (aPeerSharingClient version ctx)))
(InitiatorProtocolOnly (MuxPeerRaw (\ctx -> aPeerSharingClient version ctx)))
})
version
ownPeerSharing
Expand All @@ -825,25 +825,25 @@ initiatorAndResponder miniProtocolParameters version ownPeerSharing Apps {..} =
(NodeToNodeProtocols {
chainSyncProtocol =
(InitiatorAndResponderProtocol
(\initiatorCtx -> MuxPeerRaw (aChainSyncClient version initiatorCtx))
(\responderCtx -> MuxPeerRaw (aChainSyncServer version responderCtx))),
(MuxPeerRaw (\initiatorCtx -> aChainSyncClient version initiatorCtx))
(MuxPeerRaw (\responderCtx -> aChainSyncServer version responderCtx))),
blockFetchProtocol =
(InitiatorAndResponderProtocol
(\initiatorCtx -> MuxPeerRaw (aBlockFetchClient version initiatorCtx))
(\responderCtx -> MuxPeerRaw (aBlockFetchServer version responderCtx))),
(MuxPeerRaw (\initiatorCtx -> aBlockFetchClient version initiatorCtx))
(MuxPeerRaw (\responderCtx -> aBlockFetchServer version responderCtx))),
txSubmissionProtocol =
(InitiatorAndResponderProtocol
(\initiatorCtx -> MuxPeerRaw (aTxSubmission2Client version initiatorCtx))
(\responderCtx -> MuxPeerRaw (aTxSubmission2Server version responderCtx))),
(MuxPeerRaw (\initiatorCtx -> aTxSubmission2Client version initiatorCtx))
(MuxPeerRaw (\responderCtx -> aTxSubmission2Server version responderCtx))),
keepAliveProtocol =
(InitiatorAndResponderProtocol
(\initiatorCtx -> MuxPeerRaw (aKeepAliveClient version initiatorCtx))
(\responderCtx -> MuxPeerRaw (aKeepAliveServer version responderCtx))),
(MuxPeerRaw (\initiatorCtx -> aKeepAliveClient version initiatorCtx))
(MuxPeerRaw (\responderCtx -> aKeepAliveServer version responderCtx))),

peerSharingProtocol =
(InitiatorAndResponderProtocol
(\initiatorCtx -> MuxPeerRaw (aPeerSharingClient version initiatorCtx))
(\responderCtx -> MuxPeerRaw (aPeerSharingServer version responderCtx)))
(MuxPeerRaw (\initiatorCtx -> aPeerSharingClient version initiatorCtx))
(MuxPeerRaw (\responderCtx -> aPeerSharingServer version responderCtx)))
})
version
ownPeerSharing
45 changes: 23 additions & 22 deletions ouroboros-network-framework/demo/connection-manager.hs
Expand Up @@ -55,7 +55,6 @@ import Network.TypedProtocol.ReqResp.Examples
import Network.TypedProtocol.ReqResp.Server
import Network.TypedProtocol.ReqResp.Type (ReqResp)

import Ouroboros.Network.Channel (fromChannel)
import Ouroboros.Network.ConnectionHandler
import Ouroboros.Network.ConnectionManager.Core
import Ouroboros.Network.ConnectionManager.Types
Expand Down Expand Up @@ -333,22 +332,26 @@ withBidirectionalConnectionManager snocket makeBearer socket
InitiatorResponderMode peerAddr ByteString m () ()
reqRespInitiatorAndResponder protocolNum requestsVar =
InitiatorAndResponderProtocol
(const $ mkMuxPeer
(("Initiator",protocolNum,) `contramap` debugTracer) -- TraceSendRecv
(codecReqResp @Int @Int)
(Effect $ do
reqs <-
atomically $ do
requests <- LazySTM.readTVar requestsVar
case requests of
(reqs : rest) -> do
LazySTM.writeTVar requestsVar rest $> reqs
[] -> pure []
pure $ reqRespClientPeer (reqRespClient reqs)))
(const $ mkMuxPeer
(("Responder",protocolNum,) `contramap` debugTracer) -- TraceSendRecv
(codecReqResp @Int @Int)
(Effect $ reqRespServerPeer <$> reqRespServerId))
(mkMuxPeer
(\_ctx -> ( ("Initiator",protocolNum,) `contramap` debugTracer -- TraceSendRecv
, codecReqResp @Int @Int
, Effect $ do
reqs <-
atomically $ do
requests <- LazySTM.readTVar requestsVar
case requests of
(reqs : rest) -> do
LazySTM.writeTVar requestsVar rest $> reqs
[] -> pure []
pure $ reqRespClientPeer (reqRespClient reqs)
)
))
(mkMuxPeer
(\_ctx -> ( ("Responder",protocolNum,) `contramap` debugTracer -- TraceSendRecv
, codecReqResp @Int @Int
, Effect $ reqRespServerPeer <$> reqRespServerId
)
))

reqRespServerId :: m (ReqRespServer Int Int m ())
reqRespServerId = pure go
Expand Down Expand Up @@ -419,11 +422,9 @@ runInitiatorProtocols
Mux.StartEagerly
(runMuxPeer
(case miniProtocolRun ptcl of
InitiatorProtocolOnly initiator -> initiator ctx
InitiatorAndResponderProtocol initiator _ -> initiator ctx)
. fromChannel)
where
ctx = getContext sing
InitiatorProtocolOnly initiator -> initiator
InitiatorAndResponderProtocol initiator _ -> initiator)
(getContext sing))


-- | Bidirectional send and receive.
Expand Down
78 changes: 43 additions & 35 deletions ouroboros-network-framework/demo/ping-pong.hs
Expand Up @@ -124,18 +124,20 @@ clientPingPong pipelined =
app = demoProtocol0 pingPongInitiator

pingPongInitiator | pipelined =
InitiatorProtocolOnly $ const $
mkMuxPeerPipelined
(contramap show stdoutTracer)
codecPingPong
(pingPongClientPeerPipelined (pingPongClientPipelinedMax 5))
InitiatorProtocolOnly $
mkMuxPeerPipelined $ \_ctx ->
(contramap show stdoutTracer
, codecPingPong
, pingPongClientPeerPipelined (pingPongClientPipelinedMax 5)
)

| otherwise =
InitiatorProtocolOnly $ const $
mkMuxPeer
(contramap show stdoutTracer)
codecPingPong
(pingPongClientPeer (pingPongClientCount 5))
InitiatorProtocolOnly $
mkMuxPeer $ \_ctx ->
( contramap show stdoutTracer
, codecPingPong
, pingPongClientPeer (pingPongClientCount 5)
)


pingPongClientCount :: Applicative m => Int -> PingPongClient m ()
Expand Down Expand Up @@ -169,11 +171,12 @@ serverPingPong =
app = demoProtocol0 pingPongResponder

pingPongResponder =
ResponderProtocolOnly $ \_ctx ->
mkMuxPeer
(contramap show stdoutTracer)
codecPingPong
(pingPongServerPeer pingPongServerStandard)
ResponderProtocolOnly $
mkMuxPeer $ \_ctx ->
( contramap show stdoutTracer
, codecPingPong
, pingPongServerPeer pingPongServerStandard
)

pingPongServerStandard
:: Applicative m
Expand Down Expand Up @@ -228,18 +231,20 @@ clientPingPong2 =
app = demoProtocol1 pingpong pingpong'

pingpong =
InitiatorProtocolOnly $ const $
mkMuxPeer
(contramap (show . (,) (1 :: Int)) stdoutTracer)
codecPingPong
(pingPongClientPeer (pingPongClientCount 5))
InitiatorProtocolOnly $
mkMuxPeer $ \_ctx ->
( contramap (show . (,) (1 :: Int)) stdoutTracer
, codecPingPong
, pingPongClientPeer (pingPongClientCount 5)
)

pingpong'=
InitiatorProtocolOnly $ const $
mkMuxPeer
(contramap (show . (,) (2 :: Int)) stdoutTracer)
codecPingPong
(pingPongClientPeer (pingPongClientCount 5))
InitiatorProtocolOnly $
mkMuxPeer $ \_ctx ->
( contramap (show . (,) (2 :: Int)) stdoutTracer
, codecPingPong
, pingPongClientPeer (pingPongClientCount 5)
)

pingPongClientPipelinedMax
:: forall m. Monad m
Expand Down Expand Up @@ -286,16 +291,19 @@ serverPingPong2 =
app = demoProtocol1 pingpong pingpong'

pingpong =
ResponderProtocolOnly $ \_ctx ->
mkMuxPeer
(contramap (show . (,) (1 :: Int)) stdoutTracer)
codecPingPong
(pingPongServerPeer pingPongServerStandard)
ResponderProtocolOnly $
mkMuxPeer $ \_ctx ->
( contramap (show . (,) (1 :: Int)) stdoutTracer
, codecPingPong
, pingPongServerPeer pingPongServerStandard
)

pingpong' =
ResponderProtocolOnly $ \_ctx ->
mkMuxPeer
(contramap (show . (,) (2 :: Int)) stdoutTracer)
codecPingPong
(pingPongServerPeer pingPongServerStandard)
ResponderProtocolOnly $
mkMuxPeer $ \_ctx ->
( contramap (show . (,) (2 :: Int)) stdoutTracer
, codecPingPong
, pingPongServerPeer pingPongServerStandard
)


Expand Up @@ -54,7 +54,6 @@ import Data.Void (Void)

import qualified Network.Mux as Mux

import Ouroboros.Network.Channel (fromChannel)
import Ouroboros.Network.ConnectionHandler
import Ouroboros.Network.ConnectionManager.Types hiding
(TrUnexpectedlyFalseAssertion)
Expand Down Expand Up @@ -494,15 +493,14 @@ runResponder mux
mux (miniProtocolNum miniProtocol)
Mux.ResponderDirectionOnly
startStrategy
-- TODO: eliminate 'fromChannel'
(runMuxPeer (responder responderContext) . fromChannel)
(runMuxPeer responder responderContext)

InitiatorAndResponderProtocol _ responder ->
Mux.runMiniProtocol
mux (miniProtocolNum miniProtocol)
Mux.ResponderDirection
startStrategy
(runMuxPeer (responder responderContext) . fromChannel)
(runMuxPeer responder responderContext)


--
Expand Down

0 comments on commit 2ba25a1

Please sign in to comment.