Skip to content

Commit

Permalink
connection-handler & server: use the introduced api
Browse files Browse the repository at this point in the history
When we start a connection, the connection manager will create three
`RunOrStop` mutable variables for three kinds of applications: hot, warm
and established, which will be exposed using `MuxPromise`.  `MuxPromise`
also exposes `Bundle` of `MuxApplications`.  These both changes will
allow the peer-2-peer governor to start / stop appropriate protocols
when changing peer state between worm / hot and cold.
  • Loading branch information
coot committed Aug 3, 2020
1 parent d32ab38 commit 10070e2
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 37 deletions.
Expand Up @@ -83,35 +83,35 @@ sduHandshakeTimeout = 10
-- 'HandshakeException'
-- * 'MuxPromiseError' - the multiplexer thrown 'MuxError'.
--
data MuxPromise muxMode verionNumber bytes m where
data MuxPromise muxMode verionNumber bytes m a b where
MuxRunning
:: forall muxMode versionNumber bytes m a b.
!(Mux muxMode m)
-> ![MiniProtocol muxMode bytes m a b]
-> !(StrictTVar m RunOrStop)
-> MuxPromise muxMode versionNumber bytes m
-> !(MuxBundle muxMode bytes m a b)
-> !(Bundle (StrictTVar m RunOrStop))
-> MuxPromise muxMode versionNumber bytes m a b

MuxStopped
:: MuxPromise muxMode versionNumber bytes m
:: MuxPromise muxMode versionNumber bytes m a b

MuxPromiseHandshakeClientError
:: HasInitiator muxMode ~ True
=> !(HandshakeException (HandshakeClientProtocolError versionNumber))
-> MuxPromise muxMode versionNumber bytes m
-> MuxPromise muxMode versionNumber bytes m a b

MuxPromiseHandshakeServerError
:: HasResponder muxMode ~ True
=> !(HandshakeException (RefuseReason versionNumber))
-> MuxPromise muxMode versionNumber bytes m
-> MuxPromise muxMode versionNumber bytes m a b

MuxPromiseError
:: !SomeException
-> MuxPromise muxMode versionNumber bytes m
-> MuxPromise muxMode versionNumber bytes m a b


-- | A predicate which returns 'True' if connection handler thread has stopped running.
--
isConnectionHandlerRunning :: MuxPromise muxMode verionNumber bytes m -> Bool
isConnectionHandlerRunning :: MuxPromise muxMode verionNumber bytes m a b -> Bool
isConnectionHandlerRunning muxPromise =
case muxPromise of
MuxRunning{} -> True
Expand All @@ -123,17 +123,17 @@ isConnectionHandlerRunning muxPromise =

-- | Type of 'ConnectionHandler' implemented in this module.
--
type MuxConnectionHandler muxMode peerAddr versionNumber bytes m =
type MuxConnectionHandler muxMode peerAddr versionNumber bytes m a b =
ConnectionHandler muxMode
(ConnectionTrace versionNumber)
peerAddr
(MuxPromise muxMode versionNumber bytes m)
(MuxPromise muxMode versionNumber bytes m a b)
m

-- | Type alias for 'ConnectionManager' using 'MuxPromise'.
--
type MuxConnectionManager muxMode socket peerAddr versionNumber bytes m =
ConnectionManager muxMode socket peerAddr (MuxPromise muxMode versionNumber bytes m) m
type MuxConnectionManager muxMode socket peerAddr versionNumber bytes m a b =
ConnectionManager muxMode socket peerAddr (MuxPromise muxMode versionNumber bytes m a b) m

-- | To be used as `makeConnectionHandler` field of 'ConnectionManagerArguments'.
--
Expand All @@ -160,8 +160,8 @@ makeConnectionHandler
-- evidence that we can use mux with it.
-> MiniProtocolBundle muxMode
-> HandshakeArguments (ConnectionId peerAddr) versionNumber extra m
(OuroborosApplication muxMode peerAddr ByteString m a b)
-> MuxConnectionHandler muxMode peerAddr versionNumber ByteString m
(OuroborosBundle muxMode peerAddr ByteString m a b)
-> MuxConnectionHandler muxMode peerAddr versionNumber ByteString m a b
makeConnectionHandler muxTracer singMuxMode miniProtocolBundle handshakeArguments =
ConnectionHandler $
case singMuxMode of
Expand All @@ -174,7 +174,7 @@ makeConnectionHandler muxTracer singMuxMode miniProtocolBundle handshakeArgument
:: HasInitiator muxMode ~ True
=> ConnectionHandlerFn (ConnectionTrace versionNumber)
peerAddr
(MuxPromise muxMode versionNumber ByteString m)
(MuxPromise muxMode versionNumber ByteString m a b)
m
outboundConnectionHandler muxPromiseVar tracer connectionId muxBearer =
exceptionHandling muxPromiseVar tracer $ do
Expand All @@ -188,23 +188,36 @@ makeConnectionHandler muxTracer singMuxMode miniProtocolBundle handshakeArgument
traceWith tracer (ConnectionTraceHandshakeClientError err)
Right app -> do
traceWith tracer ConnectionTraceHandshakeSuccess
scheduleStopVar <- newTVarM Run
let !ptcls = ouroborosProtocols connectionId (readTVar scheduleStopVar) app
!scheduleStopVarBundle
<- (\a b c -> Bundle (WithHot a) (WithWarm b) (WithEstablished c))
<$> newTVarM Run
<*> newTVarM Run
<*> newTVarM Run
let muxApp
= mkMuxApplicationBundle
connectionId
(readTVar <$> scheduleStopVarBundle)
app
!mux <- newMux miniProtocolBundle
atomically $ writeTVar muxPromiseVar
(Promised
(MuxRunning mux
ptcls
scheduleStopVar))
muxApp
scheduleStopVarBundle))

-- For outbound connections we need to on demand start receivers.
-- This is, in a sense, a no man land: the server will not act, as
-- it's only reacting to inbound connections, and it also does not
-- belong to initiator (peer-2-peer governor).
case singMuxMode of
SInitiatorResponderMode -> do
case (singMuxMode, muxApp) of
(SInitiatorResponderMode,
Bundle (WithHot hotPtcls)
(WithWarm warmPtcls)
(WithEstablished establishedPtcls)) -> do
-- TODO: #2221 restart responders
traverse_ (runResponder mux) ptcls
traverse_ (runResponder mux) hotPtcls
traverse_ (runResponder mux) warmPtcls
traverse_ (runResponder mux) establishedPtcls

_ -> pure ()

Expand All @@ -216,7 +229,7 @@ makeConnectionHandler muxTracer singMuxMode miniProtocolBundle handshakeArgument
:: HasResponder muxMode ~ True
=> ConnectionHandlerFn (ConnectionTrace versionNumber)
peerAddr
(MuxPromise muxMode versionNumber ByteString m)
(MuxPromise muxMode versionNumber ByteString m a b)
m
inboundConnectionHandler muxPromiseVar tracer connectionId muxBearer =
exceptionHandling muxPromiseVar tracer $ do
Expand All @@ -231,10 +244,22 @@ makeConnectionHandler muxTracer singMuxMode miniProtocolBundle handshakeArgument
traceWith tracer (ConnectionTraceHandshakeServerError err)
Right app -> do
traceWith tracer ConnectionTraceHandshakeSuccess
scheduleStopVar <- newTVarM Run
let !ptcls = ouroborosProtocols connectionId (readTVar scheduleStopVar) app
!scheduleStopVarBundle
<- (\a b c -> Bundle (WithHot a) (WithWarm b) (WithEstablished c))
<$> newTVarM Run
<*> newTVarM Run
<*> newTVarM Run
let muxApp
= mkMuxApplicationBundle
connectionId
(readTVar <$> scheduleStopVarBundle)
app
!mux <- newMux miniProtocolBundle
atomically $ writeTVar muxPromiseVar (Promised (MuxRunning mux ptcls scheduleStopVar))
atomically $ writeTVar muxPromiseVar
(Promised
(MuxRunning mux
muxApp
scheduleStopVarBundle))
runMux (WithMuxBearer connectionId `contramap` muxTracer)
mux (muxBearer sduTimeout)

Expand All @@ -244,7 +269,7 @@ makeConnectionHandler muxTracer singMuxMode miniProtocolBundle handshakeArgument
exceptionHandling :: forall x.
StrictTVar m
(Promise
(MuxPromise muxMode versionNumber ByteString m))
(MuxPromise muxMode versionNumber ByteString m a b))
-> Tracer m (ConnectionTrace versionNumber)
-> m x -> m x
exceptionHandling muxPromiseVar tracer io =
Expand Down
Expand Up @@ -48,23 +48,23 @@ import Ouroboros.Network.Server.RateLimiting
import Ouroboros.Network.Snocket


data ServerArguments (muxMode :: MuxMode) socket peerAddr versionNumber versionDict bytes m = ServerArguments {
data ServerArguments (muxMode :: MuxMode) socket peerAddr versionNumber versionDict bytes m a b = ServerArguments {
serverSocket :: socket,
serverSnocket :: Snocket m socket peerAddr,
serverTracer :: Tracer m (ServerTrace peerAddr),
serverConnectionLimits :: AcceptedConnectionsLimit,
serverConnectionManager :: MuxConnectionManager muxMode socket peerAddr
versionNumber bytes m
versionNumber bytes m a b
}

run :: forall muxMode socket peerAddr versionNumber versionDict m.
run :: forall muxMode socket peerAddr versionNumber versionDict m a b.
( MonadAsync m
, MonadCatch m
, MonadDelay m
, MonadTime m
, Mux.HasResponder muxMode ~ True
)
=> ServerArguments muxMode socket peerAddr versionNumber versionDict ByteString m
=> ServerArguments muxMode socket peerAddr versionNumber versionDict ByteString m a b
-> m Void
run ServerArguments {
serverSocket,
Expand All @@ -87,12 +87,13 @@ run ServerArguments {
-- the 'a' and the list of all other unresolved transations (otherwise we
-- would leaked memory). It is implemented in terms of 'Alternative' for
-- testing purposes.
peekSTM :: StrictSeq (STM m a) -> STM m (a, StrictSeq (STM m a))
peekSTM :: forall x. StrictSeq (STM m x) -> STM m (x, StrictSeq (STM m x))
peekSTM = peekAlt


monitoring :: TVar m
(StrictSeq
(STM m (MuxPromise muxMode verionNumber ByteString m)))
(STM m (MuxPromise muxMode verionNumber ByteString m a b)))
-> m Void
monitoring muxVars = do
muxPromise <- atomically $ do
Expand All @@ -101,8 +102,15 @@ run ServerArguments {
writeTVar muxVars muxs'
pure muxPromise
case muxPromise of
MuxRunning mux ptcls _scheduleStopVar ->
traverse_ (runResponder mux) ptcls
MuxRunning mux
(Bundle
(WithHot hotPtls)
(WithWarm warmPtls)
(WithEstablished establishedPtls))
_ -> do
traverse_ (runResponder mux) hotPtls
traverse_ (runResponder mux) warmPtls
traverse_ (runResponder mux) establishedPtls
_ -> pure ()
monitoring muxVars

Expand Down Expand Up @@ -133,7 +141,7 @@ run ServerArguments {
acceptLoop :: TVar m
(StrictSeq
(STM m
(MuxPromise muxMode versionNumber ByteString m)))
(MuxPromise muxMode versionNumber ByteString m a b)))
-> Accept m SomeException peerAddr socket
-> m Void
acceptLoop muxVars acceptOne = do
Expand Down

0 comments on commit 10070e2

Please sign in to comment.