Skip to content

Commit

Permalink
ouroboros-network-framework: refactored MuxPeer
Browse files Browse the repository at this point in the history
Removed `MuxPeer` and `MuxPeerPipelined` constructors.
  • Loading branch information
coot committed Mar 17, 2023
1 parent 46f85c3 commit 80daed5
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 85 deletions.
3 changes: 3 additions & 0 deletions ouroboros-network-framework/CHANGELOG.md
Expand Up @@ -43,6 +43,9 @@
- `Ouroboros.Network.Socket.withServerNode`
- inbound governor API

* `MuxPeer` and `MuxPeerPipelined` constructors of `MuxPeer` type were removed.
Use `mkMuxPeer` and `mkMuxPeerPipelined` instead.

### Non-breaking changes

## 0.1.0.0 -- YYYY-mm-dd
Expand Down
4 changes: 2 additions & 2 deletions ouroboros-network-framework/demo/connection-manager.hs
Expand Up @@ -333,7 +333,7 @@ withBidirectionalConnectionManager snocket makeBearer socket
InitiatorResponderMode peerAddr ByteString m () ()
reqRespInitiatorAndResponder protocolNum requestsVar =
InitiatorAndResponderProtocol
(const $ MuxPeer
(const $ mkMuxPeer
(("Initiator",protocolNum,) `contramap` debugTracer) -- TraceSendRecv
(codecReqResp @Int @Int)
(Effect $ do
Expand All @@ -345,7 +345,7 @@ withBidirectionalConnectionManager snocket makeBearer socket
LazySTM.writeTVar requestsVar rest $> reqs
[] -> pure []
pure $ reqRespClientPeer (reqRespClient reqs)))
(const $ MuxPeer
(const $ mkMuxPeer
(("Responder",protocolNum,) `contramap` debugTracer) -- TraceSendRecv
(codecReqResp @Int @Int)
(Effect $ reqRespServerPeer <$> reqRespServerId))
Expand Down
14 changes: 7 additions & 7 deletions ouroboros-network-framework/demo/ping-pong.hs
Expand Up @@ -125,14 +125,14 @@ clientPingPong pipelined =

pingPongInitiator | pipelined =
InitiatorProtocolOnly $ const $
MuxPeerPipelined
mkMuxPeerPipelined
(contramap show stdoutTracer)
codecPingPong
(pingPongClientPeerPipelined (pingPongClientPipelinedMax 5))

| otherwise =
InitiatorProtocolOnly $ const $
MuxPeer
mkMuxPeer
(contramap show stdoutTracer)
codecPingPong
(pingPongClientPeer (pingPongClientCount 5))
Expand Down Expand Up @@ -170,7 +170,7 @@ serverPingPong =

pingPongResponder =
ResponderProtocolOnly $ \_ctx ->
MuxPeer
mkMuxPeer
(contramap show stdoutTracer)
codecPingPong
(pingPongServerPeer pingPongServerStandard)
Expand Down Expand Up @@ -229,14 +229,14 @@ clientPingPong2 =

pingpong =
InitiatorProtocolOnly $ const $
MuxPeer
mkMuxPeer
(contramap (show . (,) (1 :: Int)) stdoutTracer)
codecPingPong
(pingPongClientPeer (pingPongClientCount 5))

pingpong'=
InitiatorProtocolOnly $ const $
MuxPeer
mkMuxPeer
(contramap (show . (,) (2 :: Int)) stdoutTracer)
codecPingPong
(pingPongClientPeer (pingPongClientCount 5))
Expand Down Expand Up @@ -287,14 +287,14 @@ serverPingPong2 =

pingpong =
ResponderProtocolOnly $ \_ctx ->
MuxPeer
mkMuxPeer
(contramap (show . (,) (1 :: Int)) stdoutTracer)
codecPingPong
(pingPongServerPeer pingPongServerStandard)

pingpong' =
ResponderProtocolOnly $ \_ctx ->
MuxPeer
mkMuxPeer
(contramap (show . (,) (2 :: Int)) stdoutTracer)
codecPingPong
(pingPongServerPeer pingPongServerStandard)
Expand Down
90 changes: 40 additions & 50 deletions ouroboros-network-framework/src/Ouroboros/Network/Mux.hs
Expand Up @@ -7,6 +7,7 @@
{-# LANGUAGE GADTs #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE PatternSynonyms #-}
{-# LANGUAGE PolyKinds #-}
{-# LANGUAGE QuantifiedConstraints #-}
{-# LANGUAGE ScopedTypeVariables #-}
Expand Down Expand Up @@ -34,7 +35,8 @@ module Ouroboros.Network.Mux
, RunMiniProtocolWithExpandedCtx
, RunMiniProtocolWithMinimalCtx
, MuxPeer (..)
, runMuxPeer
, mkMuxPeer
, mkMuxPeerPipelined
, toApplication
, mkMiniProtocolBundle
-- * Non-P2P API
Expand Down Expand Up @@ -276,35 +278,42 @@ type RunMiniProtocolWithMinimalCtx mode peerAddr bytes m a b =
(ResponderContext peerAddr)
bytes m a b

-- TODO: do we need this type? Don't we only use 'MuxPeerRaw'.
--
data MuxPeer bytes m a where
MuxPeer :: forall (pr :: PeerRole) ps (st :: ps) failure bytes m a.
( Show failure
, forall (st' :: ps). Show (ClientHasAgency st')
, forall (st' :: ps). Show (ServerHasAgency st')
, ShowProxy ps
)
=> Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> Peer ps pr st m a
-> MuxPeer bytes m a

MuxPeerPipelined
:: forall (pr :: PeerRole) ps (st :: ps) failure bytes m a.
( Show failure
, forall (st' :: ps). Show (ClientHasAgency st')
, forall (st' :: ps). Show (ServerHasAgency st')
, ShowProxy ps
)
=> Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> PeerPipelined ps pr st m a
-> MuxPeer bytes m a

MuxPeerRaw
:: (Channel m bytes -> m (a, Maybe bytes))
-> MuxPeer bytes m a

newtype MuxPeer bytes m a =
MuxPeerRaw { runMuxPeer :: Channel m bytes -> m (a, Maybe bytes) }


mkMuxPeer
:: forall (pr :: PeerRole) ps (st :: ps) failure bytes m a.
( MonadThrow m
, Show failure
, forall (st' :: ps). Show (ClientHasAgency st')
, forall (st' :: ps). Show (ServerHasAgency st')
, ShowProxy ps
)
=> Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> Peer ps pr st m a
-> MuxPeer bytes m a
mkMuxPeer tracer codec peer =
MuxPeerRaw $ \channel -> runPeer tracer codec channel peer


mkMuxPeerPipelined
:: forall (pr :: PeerRole) ps (st :: ps) failure bytes m a.
( MonadAsync m
, MonadThrow m
, Show failure
, forall (st' :: ps). Show (ClientHasAgency st')
, forall (st' :: ps). Show (ServerHasAgency st')
, ShowProxy ps
)
=> Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> PeerPipelined ps pr st m a
-> MuxPeer bytes m a
mkMuxPeerPipelined tracer codec peer =
MuxPeerRaw $ \channel -> runPipelinedPeer tracer codec channel peer


-- | Like 'MuxApplication' but using a 'MuxPeer' rather than a raw
Expand Down Expand Up @@ -355,8 +364,7 @@ contramapInitiatorCtx f (OuroborosApplication ptcls) = OuroborosApplication
--
-- Note that callbacks will always receive `IsNotBigLedgerPeer`.
toApplication :: forall mode initiatorCtx responderCtx m a b.
(MonadCatch m, MonadAsync m)
=> initiatorCtx
initiatorCtx
-> responderCtx
-> OuroborosApplication mode initiatorCtx responderCtx LBS.ByteString m a b
-> Mux.Compat.MuxApplication mode m a b
Expand Down Expand Up @@ -407,21 +415,3 @@ mkMiniProtocolBundle = MiniProtocolBundle . foldMap fn
InitiatorAndResponderProtocol{} -> [ Mux.InitiatorDirection
, Mux.ResponderDirection ]
]

-- | Run a @'MuxPeer'@ using either @'runPeer'@ or @'runPipelinedPeer'@.
--
runMuxPeer
:: ( MonadCatch m
, MonadAsync m
)
=> MuxPeer bytes m a
-> Channel m bytes
-> m (a, Maybe bytes)
runMuxPeer (MuxPeer tracer codec peer) channel =
runPeer tracer codec channel peer

runMuxPeer (MuxPeerPipelined tracer codec peer) channel =
runPipelinedPeer tracer codec channel peer

runMuxPeer (MuxPeerRaw action) channel =
action channel
8 changes: 4 additions & 4 deletions ouroboros-network/demo/chain-sync.hs
Expand Up @@ -177,7 +177,7 @@ clientChainSync sockPaths = withIOManager $ \iocp ->

chainSync =
InitiatorProtocolOnly $ const $
MuxPeer
mkMuxPeer
(contramap show stdoutTracer)
codecChainSync
(ChainSync.chainSyncClientPeer chainSyncClient)
Expand Down Expand Up @@ -214,7 +214,7 @@ serverChainSync sockAddr = withIOManager $ \iocp -> do

chainSync =
ResponderProtocolOnly $ \_ctx ->
MuxPeer
mkMuxPeer
(contramap show stdoutTracer)
codecChainSync
(ChainSync.chainSyncServerPeer (chainSyncServer prng))
Expand Down Expand Up @@ -460,7 +460,7 @@ serverBlockFetch sockAddr = withIOManager $ \iocp -> do
ResponderMode LocalAddress LBS.ByteString IO Void ()
chainSync =
ResponderProtocolOnly $ \_ctx ->
MuxPeer
mkMuxPeer
(contramap show stdoutTracer)
codecChainSync
(ChainSync.chainSyncServerPeer (chainSyncServer prng))
Expand All @@ -469,7 +469,7 @@ serverBlockFetch sockAddr = withIOManager $ \iocp -> do
ResponderMode LocalAddress LBS.ByteString IO Void ()
blockFetch =
ResponderProtocolOnly $ \_ctx ->
MuxPeer
mkMuxPeer
(contramap show stdoutTracer)
codecBlockFetch
(BlockFetch.blockFetchServerPeer (blockFetchServer prng))
Expand Down
4 changes: 2 additions & 2 deletions ouroboros-network/test/Test/Mux.hs
Expand Up @@ -113,7 +113,7 @@ demo chain0 updates delay = do

chainSyncInitator =
InitiatorProtocolOnly $ const $
MuxPeer
mkMuxPeer
nullTracer
(ChainSync.codecChainSync
encode decode
Expand All @@ -131,7 +131,7 @@ demo chain0 updates delay = do

chainSyncResponder =
ResponderProtocolOnly $ \_ctx ->
MuxPeer
mkMuxPeer
nullTracer
(ChainSync.codecChainSync
encode decode
Expand Down
24 changes: 12 additions & 12 deletions ouroboros-network/test/Test/Pipe.hs
Expand Up @@ -165,13 +165,13 @@ demo chain0 updates = do

chainSyncInitator =
InitiatorProtocolOnly $ const $
MuxPeer nullTracer
(ChainSync.codecChainSync encode decode
encode decode
(encodeTip encode) (decodeTip decode))
(ChainSync.chainSyncClientPeer
(ChainSync.chainSyncClientExample consumerVar
(consumerClient done target consumerVar)))
mkMuxPeer nullTracer
(ChainSync.codecChainSync encode decode
encode decode
(encodeTip encode) (decodeTip decode))
(ChainSync.chainSyncClientPeer
(ChainSync.chainSyncClientExample consumerVar
(consumerClient done target consumerVar)))

server :: ChainSyncServer block (Point block) (Tip block) IO ()
server = ChainSync.chainSyncServerExample () producerVar
Expand All @@ -182,11 +182,11 @@ demo chain0 updates = do

chainSyncResponder =
ResponderProtocolOnly $ \_ctx ->
MuxPeer nullTracer
(ChainSync.codecChainSync encode decode
encode decode
(encodeTip encode) (decodeTip decode))
(ChainSync.chainSyncServerPeer server)
mkMuxPeer nullTracer
(ChainSync.codecChainSync encode decode
encode decode
(encodeTip encode) (decodeTip decode))
(ChainSync.chainSyncServerPeer server)

clientBearer <- Mx.getBearer Mx.makePipeChannelBearer (-1) activeTracer chan1
serverBearer <- Mx.getBearer Mx.makePipeChannelBearer (-1) activeTracer chan2
Expand Down
16 changes: 8 additions & 8 deletions ouroboros-network/test/Test/Socket.hs
Expand Up @@ -129,11 +129,11 @@ demo chain0 updates = withIOManager $ \iocp -> do

chainSyncInitator =
InitiatorProtocolOnly $ const $
MuxPeer nullTracer
codecChainSync
(ChainSync.chainSyncClientPeer
(ChainSync.chainSyncClientExample consumerVar
(consumerClient done target consumerVar)))
mkMuxPeer nullTracer
codecChainSync
(ChainSync.chainSyncClientPeer
(ChainSync.chainSyncClientExample consumerVar
(consumerClient done target consumerVar)))

server :: ChainSync.ChainSyncServer block (Point block) (Tip block) IO ()
server = ChainSync.chainSyncServerExample () producerVar
Expand All @@ -145,9 +145,9 @@ demo chain0 updates = withIOManager $ \iocp -> do

chainSyncResponder =
ResponderProtocolOnly $ \_ctx ->
MuxPeer nullTracer
codecChainSync
(ChainSync.chainSyncServerPeer server)
mkMuxPeer nullTracer
codecChainSync
(ChainSync.chainSyncServerPeer server)

codecChainSync = ChainSync.codecChainSync encode decode
encode decode
Expand Down

0 comments on commit 80daed5

Please sign in to comment.