Skip to content

Commit

Permalink
Rename RunOrStop as ControlMessage
Browse files Browse the repository at this point in the history
Also provide Quiesce contstructor, only usefule for warm protocols.
  • Loading branch information
coot committed Aug 3, 2020
1 parent 10070e2 commit d04048d
Show file tree
Hide file tree
Showing 12 changed files with 77 additions and 75 deletions.
6 changes: 3 additions & 3 deletions cardano-client/src/Cardano/Client/Subscription.hs
Expand Up @@ -15,7 +15,7 @@ module Cardano.Client.Subscription (
, MuxTrace
, RunMiniProtocol (..)
, WithMuxBearer
, RunOrStop (..)
, ControlMessage (..)
, cChainSyncCodec
, cStateQueryCodec
, cTxSubmissionCodec
Expand All @@ -33,7 +33,7 @@ import Network.Mux.Trace (MuxTrace, WithMuxBearer)
import Ouroboros.Network.Magic (NetworkMagic)
import Ouroboros.Network.Mux (MuxMode (..), MuxPeer (..),
OuroborosApplication, RunMiniProtocol (..),
RunOrStop (..))
ControlMessage (..))
import Ouroboros.Network.NodeToClient (ClientSubscriptionParams (..),
ConnectionId, LocalAddress,
NetworkClientSubcriptionTracers,
Expand Down Expand Up @@ -84,7 +84,7 @@ versionedProtocols ::
-> ( NodeToClientVersion
-> ClientCodecs blk m
-> ConnectionId LocalAddress
-> STM m RunOrStop
-> STM m ControlMessage
-> NodeToClientProtocols appType bytes m a b)
-- ^ callback which receives codecs, connection id and STM action which
-- can be checked if the networking runtime system requests the protocols
Expand Down
4 changes: 2 additions & 2 deletions ouroboros-network-framework/demo/ping-pong.hs
Expand Up @@ -94,7 +94,7 @@ maximumMiniProtocolLimits =
demoProtocol0 :: RunMiniProtocol appType bytes m a b
-> OuroborosApplication appType addr bytes m a b
demoProtocol0 pingPong =
OuroborosApplication $ \_connectionId _shouldStopSTM -> [
OuroborosApplication $ \_connectionId _controlMessageSTM -> [
MiniProtocol {
miniProtocolNum = MiniProtocolNum 2,
miniProtocolLimits = maximumMiniProtocolLimits,
Expand Down Expand Up @@ -184,7 +184,7 @@ demoProtocol1 :: RunMiniProtocol appType bytes m a b
-> RunMiniProtocol appType bytes m a b
-> OuroborosApplication appType addr bytes m a b
demoProtocol1 pingPong pingPong' =
OuroborosApplication $ \_connectionId _shouldStopSTM -> [
OuroborosApplication $ \_connectionId _controlMessageSTM -> [
MiniProtocol {
miniProtocolNum = MiniProtocolNum 2,
miniProtocolLimits = maximumMiniProtocolLimits,
Expand Down
Expand Up @@ -88,7 +88,7 @@ data MuxPromise muxMode verionNumber bytes m a b where
:: forall muxMode versionNumber bytes m a b.
!(Mux muxMode m)
-> !(MuxBundle muxMode bytes m a b)
-> !(Bundle (StrictTVar m RunOrStop))
-> !(Bundle (StrictTVar m ControlMessage))
-> MuxPromise muxMode versionNumber bytes m a b

MuxStopped
Expand Down Expand Up @@ -190,9 +190,9 @@ makeConnectionHandler muxTracer singMuxMode miniProtocolBundle handshakeArgument
traceWith tracer ConnectionTraceHandshakeSuccess
!scheduleStopVarBundle
<- (\a b c -> Bundle (WithHot a) (WithWarm b) (WithEstablished c))
<$> newTVarM Run
<*> newTVarM Run
<*> newTVarM Run
<$> newTVarM Continue
<*> newTVarM Continue
<*> newTVarM Continue
let muxApp
= mkMuxApplicationBundle
connectionId
Expand Down Expand Up @@ -246,9 +246,9 @@ makeConnectionHandler muxTracer singMuxMode miniProtocolBundle handshakeArgument
traceWith tracer ConnectionTraceHandshakeSuccess
!scheduleStopVarBundle
<- (\a b c -> Bundle (WithHot a) (WithWarm b) (WithEstablished c))
<$> newTVarM Run
<*> newTVarM Run
<*> newTVarM Run
<$> newTVarM Continue
<*> newTVarM Continue
<*> newTVarM Continue
let muxApp
= mkMuxApplicationBundle
connectionId
Expand Down
77 changes: 39 additions & 38 deletions ouroboros-network-framework/src/Ouroboros/Network/Mux.hs
Expand Up @@ -31,10 +31,9 @@ module Ouroboros.Network.Mux
, runMuxPeer
, toApplication
, mkMuxApplicationBundle
, ouroborosProtocols
, RunOrStop (..)
, ScheduledStop
, neverStop
, ControlMessage (..)
, ControlMessageSTM
, continueForever

-- * Re-exports
-- | from "Network.Mux"
Expand Down Expand Up @@ -69,25 +68,40 @@ import Ouroboros.Network.Driver
import Ouroboros.Network.Util.ShowProxy (ShowProxy)


data RunOrStop = Run | Stop
-- | Control signal sent to a mini-protocol. expected to exit, on 'Continue' it
-- should continue its operation
--
data ControlMessage =
-- | Continue operation.
Continue

-- | Hold on, e.g. do not sent messages until resumed. This is not used for
-- any hot protocol.
--
| Quiesce

-- | The client is expected to terminate as soon as possible.
--
| Terminate
deriving (Eq, Show)

-- | 'ScheduleStop' should depend on `muxMode` (we only need to shedule stop
-- for intiator side). This is not done only because this would break tests,
-- bue once the old api is removed it should be possible.
type ScheduledStop m = STM m RunOrStop
-- | 'ControlMessageSTM' should depend on `muxMode` (we only need to shedule
-- stop for intiator side). This is not done only because this would break
-- tests, but once the old api is removed it should be possible.
--
type ControlMessageSTM m = STM m ControlMessage

neverStop :: Applicative (STM m)
continueForever :: Applicative (STM m)
=> proxy m
-> ScheduledStop m
neverStop _ = pure Run
-> ControlMessageSTM m
continueForever _ = pure Continue

-- | Like 'MuxApplication' but using a 'MuxPeer' rather than a raw
-- @Channel -> m a@ action.
--
newtype OuroborosApplication (mode :: MuxMode) addr bytes m a b =
OuroborosApplication
(ConnectionId addr -> STM m RunOrStop -> [MiniProtocol mode bytes m a b])
(ConnectionId addr -> ControlMessageSTM m -> [MiniProtocol mode bytes m a b])


-- | There are three kinds of applications: warm, hot and established (ones
Expand Down Expand Up @@ -191,7 +205,7 @@ instance Applicative Bundle where

type MuxProtocolBundle (mode :: MuxMode) addr bytes m a b
= ConnectionId addr
-> STM m RunOrStop
-> ControlMessageSTM m
-> [MiniProtocol mode bytes m a b]

type OuroborosBundle (mode :: MuxMode) addr bytes m a b =
Expand Down Expand Up @@ -257,46 +271,46 @@ data MuxPeer bytes m a where

toApplication :: (MonadCatch m, MonadAsync m)
=> ConnectionId addr
-> ScheduledStop m
-> ControlMessageSTM m
-> OuroborosApplication mode addr LBS.ByteString m a b
-> Mux.MuxApplication mode m a b
toApplication connectionId scheduleStop (OuroborosApplication ptcls) =
toApplication connectionId controlMessageSTM (OuroborosApplication ptcls) =
Mux.MuxApplication
[ Mux.MuxMiniProtocol {
Mux.miniProtocolNum = miniProtocolNum ptcl,
Mux.miniProtocolLimits = miniProtocolLimits ptcl,
Mux.miniProtocolRun = toMuxRunMiniProtocol (miniProtocolRun ptcl)
}
| ptcl <- ptcls connectionId scheduleStop ]
| ptcl <- ptcls connectionId controlMessageSTM ]


mkMuxApplicationBundle
:: forall mode addr bytes m a b.
ConnectionId addr
-> Bundle (ScheduledStop m)
-> Bundle (ControlMessageSTM m)
-> OuroborosBundle mode addr bytes m a b
-> MuxBundle mode bytes m a b
mkMuxApplicationBundle connectionId
(Bundle
hotScheduleStop
warmScheduleStop
establishedScheduleStop)
hotControlMessageSTM
warmControlMessageSTM
establishedControlMessageSTM)
(Bundle
hotApp
warmApp
establishedApp) =
Bundle {
withHot =
mkApplication hotScheduleStop hotApp,
mkApplication hotControlMessageSTM hotApp,

withWarm =
mkApplication warmScheduleStop warmApp,
mkApplication warmControlMessageSTM warmApp,

withEstablished =
mkApplication establishedScheduleStop establishedApp
mkApplication establishedControlMessageSTM establishedApp
}
where
mkApplication :: WithProtocolTemperature pt (ScheduledStop m)
mkApplication :: WithProtocolTemperature pt (ControlMessageSTM m)
-> WithProtocolTemperature pt (MuxProtocolBundle mode addr bytes m a b)
-> WithProtocolTemperature pt [MiniProtocol mode bytes m a b]
mkApplication (WithHot scheduleStop) (WithHot app) =
Expand All @@ -321,19 +335,6 @@ toMuxRunMiniProtocol (InitiatorAndResponderProtocol i r) =
Mux.InitiatorAndResponderProtocol (runMuxPeer i . fromChannel)
(runMuxPeer r . fromChannel)

ouroborosProtocols :: ConnectionId addr
-> ScheduledStop m
-> OuroborosApplication mode addr bytes m a b
-> [MiniProtocol mode bytes m a b]
ouroborosProtocols connectionId scheduleStop (OuroborosApplication ptcls) =
[ MiniProtocol {
miniProtocolNum = miniProtocolNum ptcl,
miniProtocolLimits = miniProtocolLimits ptcl,
miniProtocolRun = miniProtocolRun ptcl
}
| ptcl <- ptcls connectionId scheduleStop ]


-- |
-- Run a @'MuxPeer'@ using either @'runPeer'@ or @'runPipelinedPeer'@.
--
Expand Down
4 changes: 2 additions & 2 deletions ouroboros-network-framework/src/Ouroboros/Network/Socket.hs
Expand Up @@ -257,7 +257,7 @@ connectToNode' sn handshakeCodec versionDataCodec NetworkConnectTracers {nctMuxT
traceWith muxTracer $ Mx.MuxTraceHandshakeClientEnd (diffTime ts_end ts_start)
Mx.muxStart
muxTracer
(toApplication connectionId (neverStop (Proxy :: Proxy IO)) app)
(toApplication connectionId (continueForever (Proxy :: Proxy IO)) app)
(Snocket.toBearer sn sduTimeout muxTracer sd)


Expand Down Expand Up @@ -374,7 +374,7 @@ beginConnection sn muxTracer handshakeTracer handshakeCodec versionDataCodec acc
traceWith muxTracer' $ Mx.MuxTraceHandshakeServerEnd
Mx.muxStart
muxTracer'
(toApplication connectionId (neverStop (Proxy :: Proxy IO)) app)
(toApplication connectionId (continueForever (Proxy :: Proxy IO)) app)
(Snocket.toBearer sn sduTimeout muxTracer' sd)

RejectConnection st' _peerid -> pure $ Server.Reject st'
Expand Down
Expand Up @@ -118,7 +118,7 @@ defaultMiniProtocolLimit = 3000000
testProtocols2 :: RunMiniProtocol appType bytes m a b
-> OuroborosApplication appType addr bytes m a b
testProtocols2 reqResp =
OuroborosApplication $ \_connectionId _shouldStopSTM -> [
OuroborosApplication $ \_connectionId _controlMessageSTM -> [
MiniProtocol {
miniProtocolNum = MiniProtocolNum 4,
miniProtocolLimits = MiniProtocolLimits {
Expand Down Expand Up @@ -344,7 +344,7 @@ prop_socket_recv_error f rerr =
localAddress = Socket.addrAddress muxAddress,
remoteAddress
}
Mx.muxStart nullTracer (toApplication connectionId (neverStop (Proxy :: Proxy IO)) app) bearer
Mx.muxStart nullTracer (toApplication connectionId (continueForever (Proxy :: Proxy IO)) app) bearer
)
$ \muxAsync -> do

Expand Down
Expand Up @@ -72,7 +72,7 @@ defaultMiniProtocolLimit = 3000000
testProtocols1 :: RunMiniProtocol appType bytes m a b
-> OuroborosApplication appType addr bytes m a b
testProtocols1 chainSync =
OuroborosApplication $ \_connectionId _shouldStopSTM -> [
OuroborosApplication $ \_connectionId _controlMessageSTM -> [
MiniProtocol {
miniProtocolNum = MiniProtocolNum 2,
miniProtocolLimits = MiniProtocolLimits {
Expand All @@ -88,7 +88,7 @@ testProtocols1 chainSync =
testProtocols2 :: RunMiniProtocol appType bytes m a b
-> OuroborosApplication appType addr bytes m a b
testProtocols2 reqResp =
OuroborosApplication $ \_connectionId _shouldStopSTM -> [
OuroborosApplication $ \_connectionId _controlMessageSTM -> [
MiniProtocol {
miniProtocolNum = MiniProtocolNum 4,
miniProtocolLimits = MiniProtocolLimits {
Expand Down
17 changes: 9 additions & 8 deletions ouroboros-network/src/Ouroboros/Network/KeepAlive.hs
Expand Up @@ -14,7 +14,7 @@ import Control.Monad.Class.MonadSTM
import Control.Monad.Class.MonadTime (DiffTime)
import Control.Monad.Class.MonadTimer

import Ouroboros.Network.Mux (RunOrStop (..), ScheduledStop)
import Ouroboros.Network.Mux (ControlMessage (..), ControlMessageSTM)
import Ouroboros.Network.Protocol.KeepAlive.Client
import Ouroboros.Network.Protocol.KeepAlive.Server

Expand All @@ -27,27 +27,28 @@ keepAliveClient
( MonadSTM m
, MonadTimer m
)
=> ScheduledStop m
=> ControlMessageSTM m
-> KeepAliveInterval
-> KeepAliveClient m ()
keepAliveClient shouldStopSTM KeepAliveInterval { keepAliveInterval } =
keepAliveClient controlMessageSTM KeepAliveInterval { keepAliveInterval } =
SendMsgKeepAlive go
where
decisionSTM :: TVar m Bool
-> STM m RunOrStop
-> STM m ControlMessage
decisionSTM delayVar =
do
readTVar delayVar >>= fmap (const Run) . check
readTVar delayVar >>= fmap (const Continue) . check
`orElse`
shouldStopSTM
controlMessageSTM

go :: m (KeepAliveClient m ())
go = do
delayVar <- registerDelay keepAliveInterval
decision <- atomically (decisionSTM delayVar)
case decision of
Run -> pure (SendMsgKeepAlive go)
Stop -> pure (SendMsgDone (pure ()))
Terminate -> pure (SendMsgDone (pure ()))
-- on 'Continue' and 'Quiesce', keep going.
_ -> pure (SendMsgKeepAlive go)


keepAliveServer
Expand Down
8 changes: 4 additions & 4 deletions ouroboros-network/src/Ouroboros/Network/NodeToClient.hs
Expand Up @@ -161,12 +161,12 @@ data NodeToClientProtocols appType bytes m a b = NodeToClientProtocols {
-- wireshark plugins.
--
nodeToClientProtocols
:: (ConnectionId addr -> STM m RunOrStop -> NodeToClientProtocols appType bytes m a b)
:: (ConnectionId addr -> STM m ControlMessage -> NodeToClientProtocols appType bytes m a b)
-> NodeToClientVersion
-> OuroborosApplication appType addr bytes m a b
nodeToClientProtocols protocols version =
OuroborosApplication $ \connectionId shouldStopSTM ->
case protocols connectionId shouldStopSTM of
OuroborosApplication $ \connectionId controlMessageSTM ->
case protocols connectionId controlMessageSTM of
NodeToClientProtocols {
localChainSyncProtocol,
localTxSubmissionProtocol,
Expand Down Expand Up @@ -215,7 +215,7 @@ nodeToClientHandshakeCodec = codecHandshake nodeToClientVersionCodec
versionedNodeToClientProtocols
:: NodeToClientVersion
-> NodeToClientVersionData
-> (ConnectionId LocalAddress -> STM m RunOrStop -> NodeToClientProtocols appType bytes m a b)
-> (ConnectionId LocalAddress -> STM m ControlMessage -> NodeToClientProtocols appType bytes m a b)
-> Versions NodeToClientVersion
DictVersion
(OuroborosApplication appType LocalAddress bytes m a b)
Expand Down
6 changes: 3 additions & 3 deletions ouroboros-network/src/Ouroboros/Network/NodeToNode.hs
Expand Up @@ -220,16 +220,16 @@ defaultMiniProtocolParameters = MiniProtocolParameters {
--
nodeToNodeProtocols
:: MiniProtocolParameters
-> (ConnectionId addr -> STM m RunOrStop -> NodeToNodeProtocols appType bytes m a b)
-> (ConnectionId addr -> STM m ControlMessage -> NodeToNodeProtocols appType bytes m a b)
-> OuroborosApplication appType addr bytes m a b
nodeToNodeProtocols MiniProtocolParameters {
chainSyncPipeliningHighMark,
blockFetchPipeliningMax,
txSubmissionMaxUnacked
}
protocols =
OuroborosApplication $ \connectionId shouldStopSTM ->
case protocols connectionId shouldStopSTM of
OuroborosApplication $ \connectionId controlMessageSTM ->
case protocols connectionId controlMessageSTM of
NodeToNodeProtocols {
chainSyncProtocol,
blockFetchProtocol,
Expand Down
4 changes: 2 additions & 2 deletions ouroboros-network/test/Test/Mux.hs
Expand Up @@ -151,15 +151,15 @@ demo chain0 updates delay = do
activeTracer
(Mx.toApplication
(ConnectionId "client" "server")
(neverStop (Proxy :: Proxy m))
(continueForever (Proxy :: Proxy m))
consumerApp)
clientBearer
serverAsync <- async $
Mx.muxStart
activeTracer
(Mx.toApplication
(ConnectionId "server" "client")
(neverStop (Proxy :: Proxy m))
(continueForever (Proxy :: Proxy m))
producerApp)
serverBearer

Expand Down

0 comments on commit d04048d

Please sign in to comment.