Skip to content

Commit

Permalink
ouroboros-network-framework: name some tvars & threads
Browse files Browse the repository at this point in the history
In particular:
* threads run by `runConnectedPeers`
* simple driver TVar

Most part of this patch is updating dependencies.
  • Loading branch information
coot committed Jun 24, 2022
1 parent a18fa24 commit ebac0db
Show file tree
Hide file tree
Showing 20 changed files with 116 additions and 85 deletions.
6 changes: 4 additions & 2 deletions ouroboros-network-framework/src/Ouroboros/Network/Channel.hs
Expand Up @@ -208,7 +208,7 @@ createConnectedBufferedChannelsUnbounded = do
--
-- This is primarily useful for testing protocols.
--
createConnectedBufferedChannels :: forall m a. MonadSTM m
createConnectedBufferedChannels :: forall m a. MonadLabelledSTM m
=> Natural -> m (Channel m a, Channel m a)
createConnectedBufferedChannels sz = do
(chan1, chan2) <- atomically $ createConnectedBufferedChannelsSTM sz
Expand All @@ -224,13 +224,15 @@ createConnectedBufferedChannels sz = do
-- | As 'createConnectedBufferedChannels', but in 'STM'.
--
-- TODO: it should return a pair of `Channel m a`.
createConnectedBufferedChannelsSTM :: MonadSTM m
createConnectedBufferedChannelsSTM :: MonadLabelledSTM m
=> Natural -> STM m (Channel (STM m) a, Channel (STM m) a)
createConnectedBufferedChannelsSTM sz = do
-- Create two TBQueues to act as the channel buffers (one for each
-- direction) and use them to make both ends of a bidirectional channel
bufferA <- newTBQueue sz
labelTBQueue bufferA "chann-a"
bufferB <- newTBQueue sz
labelTBQueue bufferB "chann-b"

return (queuesAsChannel bufferB bufferA,
queuesAsChannel bufferA bufferB)
Expand Down
21 changes: 12 additions & 9 deletions ouroboros-network-framework/src/Ouroboros/Network/Driver/Limits.hs
Expand Up @@ -521,12 +521,13 @@ runPeerWithLimits tracer codec slimits tlimits channel peer =
-- for example 'createConnectedChannels'.
--
runConnectedPeersWithLimits :: forall ps pr pl pl' st failure bytes m a b.
( MonadAsync m
, MonadFork m
, MonadMask m
( MonadAsync m
, MonadLabelledSTM m
, MonadFork m
, MonadMask m
, MonadMonotonicTime m
, MonadTimer m
, MonadThrow (STM m)
, MonadTimer m
, MonadThrow (STM m)
, Exception failure
, ShowProxy ps
, forall (st' :: ps) sing. sing ~ Sing st' => Show sing
Expand All @@ -542,11 +543,13 @@ runConnectedPeersWithLimits :: forall ps pr pl pl' st failure bytes m a b.
runConnectedPeersWithLimits createChannels tracer codec slimits tlimits client server =
createChannels >>= \(clientChannel, serverChannel) ->

(fst <$> runPeerWithLimits
tracerClient codec slimits tlimits
clientChannel client)
(do labelThisThread "client"
fst <$> runPeerWithLimits
tracerClient codec slimits tlimits
clientChannel client)
`concurrently`
(fst <$> runPeer tracerServer codec serverChannel server)
(do labelThisThread "server"
fst <$> runPeer tracerServer codec serverChannel server)
where
tracerClient = contramap ((,) Client) tracer
tracerServer = contramap ((,) Server) tracer
37 changes: 23 additions & 14 deletions ouroboros-network-framework/src/Ouroboros/Network/Driver/Simple.hs
Expand Up @@ -119,9 +119,10 @@ instance Exception DecoderFailure where


driverSimple :: forall ps (pr :: PeerRole) failure bytes m.
( MonadAsync m
, MonadMask m
, MonadThrow (STM m)
( MonadAsync m
, MonadLabelledSTM m
, MonadMask m
, MonadThrow (STM m)
, Exception failure
)
=> Tracer m (TraceSendRecv ps)
Expand All @@ -132,6 +133,7 @@ driverSimple :: forall ps (pr :: PeerRole) failure bytes m.
)
driverSimple tracer Codec{encode, decode} channel@Channel{send} = do
v <- newTVarIO Nothing
labelTVarIO v "driver-var"
return
( Driver { sendMessage
, recvMessage
Expand Down Expand Up @@ -253,9 +255,10 @@ driverSimple tracer Codec{encode, decode} channel@Channel{send} = do
--
runPeer
:: forall ps (st :: ps) pr pl failure bytes m a .
( MonadAsync m
, MonadMask m
, MonadThrow (STM m)
( MonadAsync m
, MonadLabelledSTM m
, MonadMask m
, MonadThrow (STM m)
, Exception failure
)
=> Tracer m (TraceSendRecv ps)
Expand Down Expand Up @@ -337,9 +340,10 @@ data Role = Client | Server
-- for example 'createConnectedChannels'.
--
runConnectedPeers :: forall ps pr pl pl' st failure bytes m a b.
( MonadAsync m
, MonadMask m
, MonadThrow (STM m)
( MonadAsync m
, MonadLabelledSTM m
, MonadMask m
, MonadThrow (STM m)
, Exception failure
)
=> m (Channel m bytes, Channel m bytes)
Expand All @@ -351,9 +355,13 @@ runConnectedPeers :: forall ps pr pl pl' st failure bytes m a b.
runConnectedPeers createChannels tracer codec client server =
createChannels >>= \(clientChannel, serverChannel) ->

(fst <$> runPeer tracerClient codec clientChannel client)
(do labelThisThread "client"
fst <$> runPeer tracerClient codec clientChannel client
)
`concurrently`
(fst <$> runPeer tracerServer codec serverChannel server)
(do labelThisThread "server"
fst <$> runPeer tracerServer codec serverChannel server
)
where
tracerClient = contramap ((,) Client) tracer
tracerServer = contramap ((,) Server) tracer
Expand All @@ -363,9 +371,10 @@ runConnectedPeers createChannels tracer codec client server =
-- 'Handshake' protocol which knows how to decode different versions.
--
runConnectedPeersAsymmetric
:: ( MonadAsync m
, MonadMask m
, MonadThrow (STM m)
:: ( MonadAsync m
, MonadLabelledSTM m
, MonadMask m
, MonadThrow (STM m)
, Exception failure
)
=> m (Channel m bytes, Channel m bytes)
Expand Down
Expand Up @@ -85,14 +85,15 @@ import Ouroboros.Network.Server.RateLimiting
-- other is useful for running a server for the /Node-To-Client protocol/.
--
inboundGovernor :: forall (muxMode :: MuxMode) socket peerAddr versionNumber m a b.
( MonadAsync m
, MonadCatch m
, MonadEvaluate m
, MonadThrow m
, MonadThrow (STM m)
, MonadTime m
, MonadTimer m
, MonadMask m
( MonadAsync m
, MonadCatch m
, MonadEvaluate m
, MonadLabelledSTM m
, MonadThrow m
, MonadThrow (STM m)
, MonadTime m
, MonadTimer m
, MonadMask m
, Ord peerAddr
, HasResponder muxMode ~ True
)
Expand Down Expand Up @@ -467,10 +468,11 @@ inboundGovernor trTracer tracer serverControlChannel inboundIdleTimeout
--
runResponder :: forall (mode :: MuxMode) m a b.
( HasResponder mode ~ True
, MonadAsync m
, MonadCatch m
, MonadMask m
, MonadThrow (STM m)
, MonadAsync m
, MonadLabelledSTM m
, MonadCatch m
, MonadMask m
, MonadThrow (STM m)
)
=> Mux.Mux mode m
-> MiniProtocol mode ByteString m a b
Expand Down
21 changes: 12 additions & 9 deletions ouroboros-network-framework/src/Ouroboros/Network/Mux.hs
Expand Up @@ -314,9 +314,10 @@ data MuxPeer bytes m a where
:: (Channel m bytes -> m (a, Maybe bytes))
-> MuxPeer bytes m a

toApplication :: ( MonadAsync m
, MonadMask m
, MonadThrow (STM m)
toApplication :: ( MonadAsync m
, MonadLabelledSTM m
, MonadMask m
, MonadThrow (STM m)
)
=> ConnectionId addr
-> ControlMessageSTM m
Expand Down Expand Up @@ -373,9 +374,10 @@ mkMiniProtocolBundle = MiniProtocolBundle . foldMap fn
]

toMuxRunMiniProtocol :: forall mode m a b.
( MonadAsync m
, MonadMask m
, MonadThrow (STM m)
( MonadAsync m
, MonadLabelledSTM m
, MonadMask m
, MonadThrow (STM m)
)
=> RunMiniProtocol mode LBS.ByteString m a b
-> Mux.Compat.RunMiniProtocol mode m a b
Expand All @@ -391,9 +393,10 @@ toMuxRunMiniProtocol (InitiatorAndResponderProtocol i r) =
-- Run a @'MuxPeer'@ using either @'runPeer'@ or @'runPipelinedPeer'@.
--
runMuxPeer
:: ( MonadAsync m
, MonadMask m
, MonadThrow (STM m)
:: ( MonadAsync m
, MonadLabelledSTM m
, MonadMask m
, MonadThrow (STM m)
)
=> MuxPeer bytes m a
-> Channel m bytes
Expand Down
16 changes: 10 additions & 6 deletions ouroboros-network-framework/test/Test/Ouroboros/Network/Driver.hs
Expand Up @@ -268,6 +268,7 @@ prop_channel_reqresp_IO (ReqRespPayloadWithLimit limit payload) =

prop_channel_ping_pong
:: ( MonadAsync m
, MonadLabelledSTM m
, MonadMask m
, MonadTest m
, MonadThrow (STM m)
Expand All @@ -282,7 +283,7 @@ prop_channel_ping_pong a b n tr = do
exploreRaces
(_, r) <- runConnectedPeers (bimap (delayChannel a)
(delayChannel b)
<$> createConnectedBufferedChannelsUnbounded)
<$> createConnectedChannels)
tr
codecPingPong client server
return (r == n)
Expand Down Expand Up @@ -326,11 +327,12 @@ prop_channel_ping_pong_IO (NonNegative a) (NonNegative b) (NonNegative n) =


prop_channel_ping_pong_stm
:: ( MonadAsync m
, MonadMask m
, MonadTest m
, MonadThrow (STM m)
, MonadTimer m
:: ( MonadAsync m
, MonadLabelledSTM m
, MonadMask m
, MonadTest m
, MonadThrow (STM m)
, MonadTimer m
)
=> DiffTime
-> DiffTime
Expand Down Expand Up @@ -394,6 +396,7 @@ prop_channel_ping_pong_stm_IO (NonNegative a) (NonNegative b)
prop_channel_ping_pong_with_limits
:: ( MonadAsync m
, MonadFork m
, MonadLabelledSTM m
, MonadMask m
, MonadMonotonicTime m
, MonadTest m
Expand Down Expand Up @@ -426,6 +429,7 @@ prop_channel_ping_pong_with_limits a b n tr slimits tlimits = do
prop_channel_ping_pong_with_limits_stm
:: ( MonadAsync m
, MonadFork m
, MonadLabelledSTM m
, MonadMask m
, MonadMonotonicTime m
, MonadTest m
Expand Down
Expand Up @@ -669,13 +669,14 @@ reqRespTimeLimits = ProtocolTimeLimits { timeLimitForState }
--
runInitiatorProtocols
:: forall muxMode m a b.
( MonadAsync m
, MonadCatch m
, MonadMask m
, MonadSTM m
, MonadThrow (STM m)
( MonadAsync m
, MonadCatch m
, MonadLabelledSTM m
, MonadMask m
, MonadSTM m
, MonadThrow (STM m)
, HasInitiator muxMode ~ True
, MonadSay m
, MonadSay m
)
=> SingMuxMode muxMode
-> Mux.Mux muxMode m
Expand Down
Expand Up @@ -17,7 +17,7 @@ import Data.Singletons

import Control.Monad.Class.MonadAsync (MonadAsync)
import Control.Monad.Class.MonadST (MonadST)
import Control.Monad.Class.MonadSTM (MonadSTM, STM)
import Control.Monad.Class.MonadSTM (MonadSTM, MonadLabelledSTM, STM)
import Control.Monad.Class.MonadThrow (MonadCatch, MonadMask, MonadThrow)
import Control.Monad.IOSim (runSimOrThrow)
import Control.Tracer (nullTracer)
Expand Down Expand Up @@ -243,8 +243,8 @@ prop_connect_pipelined5 (TestChainAndPoints chain points)

-- | Run a simple block-fetch client and server using connected channels.
--
prop_channel :: ( MonadAsync m, MonadCatch m, MonadMask m, MonadST m
, MonadThrow m, MonadThrow (STM m) )
prop_channel :: ( MonadAsync m, MonadCatch m, MonadLabelledSTM m, MonadMask m
, MonadST m, MonadThrow m, MonadThrow (STM m) )
=> m (Channel m ByteString, Channel m ByteString)
-> Chain Block -> [Point Block] -> m Property
prop_channel createChannels chain points = do
Expand Down
Expand Up @@ -512,7 +512,7 @@ chainSyncDemo
( MonadAsync m
, MonadMask m
, MonadST m
, MonadSTM m
, MonadLabelledSTM m
, MonadFork m
, MonadThrow m
, MonadThrow (STM m)
Expand Down Expand Up @@ -577,7 +577,7 @@ chainSyncDemoPipelined
:: forall m.
( MonadMask m
, MonadST m
, MonadSTM m
, MonadLabelledSTM m
, MonadFork m
, MonadAsync m
, MonadThrow m
Expand Down
Expand Up @@ -442,6 +442,7 @@ prop_connect (ArbitraryVersions clientVersions serverVersions) =
--
prop_channel :: ( MonadAsync m
, MonadCatch m
, MonadLabelledSTM m
, MonadMask m
, MonadST m
, MonadThrow m
Expand Down Expand Up @@ -517,6 +518,7 @@ prop_pipe_IO (ArbitraryVersions clientVersions serverVersions) =
prop_channel_asymmetric
:: ( MonadAsync m
, MonadCatch m
, MonadLabelledSTM m
, MonadMask m
, MonadST m
, MonadThrow m
Expand Down Expand Up @@ -842,6 +844,7 @@ prop_acceptOrRefuse_symmetric_NodeToClient (ArbitraryNodeToClientVersions a)
prop_channel_simultaneous_open
:: ( MonadAsync m
, MonadCatch m
, MonadLabelledSTM m
, MonadMask m
, MonadST m
, MonadThrow m
Expand Down
Expand Up @@ -98,7 +98,7 @@ prop_connect f (NonNegative n) =
--

prop_channel :: ( MonadST m
, MonadSTM m
, MonadLabelledSTM m
, MonadAsync m
, MonadCatch m
, MonadMask m
Expand Down
Expand Up @@ -191,7 +191,7 @@ prop_connect input =
prop_channel :: ( MonadAsync m
, MonadCatch m
, MonadLabelledSTM m
, MonadST m
, MonadST m
, MonadMask m
, MonadThrow m
, MonadThrow (STM m)
Expand Down
Expand Up @@ -15,7 +15,7 @@ import Data.ByteString.Lazy (ByteString)

import Control.Monad.Class.MonadAsync
import Control.Monad.Class.MonadST
import Control.Monad.Class.MonadSTM (STM)
import Control.Monad.Class.MonadSTM (MonadLabelledSTM, STM)
import Control.Monad.Class.MonadThrow
import Control.Monad.IOSim
import qualified Control.Monad.ST as ST
Expand Down Expand Up @@ -147,8 +147,8 @@ prop_connect (slot, txs) =

-- | Run a local tx-monitor client and server using connected channels.
--
prop_channel :: ( MonadAsync m, MonadCatch m, MonadMask m, MonadST m
, MonadThrow m, MonadThrow (STM m) )
prop_channel :: ( MonadAsync m, MonadCatch m, MonadLabelledSTM m, MonadMask m
, MonadST m, MonadThrow m, MonadThrow (STM m) )
=> m (Channel m ByteString, Channel m ByteString)
-> (SlotNo, [Tx])
-> m Bool
Expand Down

0 comments on commit ebac0db

Please sign in to comment.