Skip to content

Commit

Permalink
connection-manager: server
Browse files Browse the repository at this point in the history
On outound connection we need to be able to manage responders: not only
start them but also re-start them when they are finished.  The server
knows how to do it! This patch provides `ControlChannel`, which allows
to notifiy the server about a thread that it needs to monitor (start
& re-start responders when they finish).

This patch also extends the tests, which now runs multiple rounds of
`ReqResp` protocol, which forces the server to use the restarting logic.
  • Loading branch information
coot committed Aug 3, 2020
1 parent 3f76735 commit 92e249d
Show file tree
Hide file tree
Showing 5 changed files with 515 additions and 215 deletions.
Expand Up @@ -45,6 +45,7 @@ library
Ouroboros.Network.ConnectionManager.Core
Ouroboros.Network.ConnectionManager.ConnectionHandler
Ouroboros.Network.ConnectionManager.Server
Ouroboros.Network.ConnectionManager.Server.ControlChannel
Ouroboros.Network.Server.ConnectionTable
Ouroboros.Network.Server.Socket
Ouroboros.Network.Server.RateLimiting
Expand Down
Expand Up @@ -30,15 +30,12 @@ import Control.Monad.Class.MonadTimer
import Control.Tracer (Tracer, contramap, traceWith)

import Data.ByteString.Lazy (ByteString)
import Data.Functor (void)
import Data.Foldable (traverse_)
import Data.Typeable (Typeable)

import Network.Mux hiding (miniProtocolNum)

import Ouroboros.Network.Mux
import Ouroboros.Network.Protocol.Handshake
import Ouroboros.Network.Channel (fromChannel)
import Ouroboros.Network.ConnectionId (ConnectionId)
import Ouroboros.Network.ConnectionManager.Types

Expand Down Expand Up @@ -163,8 +160,15 @@ makeConnectionHandler
-> MiniProtocolBundle muxMode
-> HandshakeArguments (ConnectionId peerAddr) versionNumber extra m
(OuroborosBundle muxMode peerAddr ByteString m a b)
-> (MuxPromise muxMode peerAddr versionNumber ByteString m a b -> m ())
-- ^ This method allows to pass control over responders to the server (for
-- outbound connections), see
-- 'Ouroboros.Network.ConnectionManager.Server.ControlChannel.newOutboundConnection'.
-> MuxConnectionHandler muxMode peerAddr versionNumber ByteString m a b
makeConnectionHandler muxTracer singMuxMode miniProtocolBundle handshakeArguments =
makeConnectionHandler muxTracer singMuxMode
miniProtocolBundle
handshakeArguments
announceOutboundConnection =
ConnectionHandler $
case singMuxMode of
SInitiatorMode -> WithInitiatorMode outboundConnectionHandler
Expand Down Expand Up @@ -201,27 +205,19 @@ makeConnectionHandler muxTracer singMuxMode miniProtocolBundle handshakeArgument
(readTVar <$> scheduleStopVarBundle)
app
!mux <- newMux miniProtocolBundle
atomically $ writeTVar muxPromiseVar
(Promised
(MuxRunning connectionId
mux
muxApp
scheduleStopVarBundle))
let muxPromise =
MuxRunning
connectionId mux
muxApp scheduleStopVarBundle
atomically $ writeTVar muxPromiseVar (Promised muxPromise)

-- For outbound connections we need to on demand start receivers.
-- This is, in a sense, a no man land: the server will not act, as
-- it's only reacting to inbound connections, and it also does not
-- belong to initiator (peer-2-peer governor).
case (singMuxMode, muxApp) of
(SInitiatorResponderMode,
Bundle (WithHot hotPtcls)
(WithWarm warmPtcls)
(WithEstablished establishedPtcls)) -> do
-- TODO: #2221 restart responders
traverse_ (runResponder mux) hotPtcls
traverse_ (runResponder mux) warmPtcls
traverse_ (runResponder mux) establishedPtcls

case singMuxMode of
SInitiatorResponderMode ->
announceOutboundConnection muxPromise
_ -> pure ()

runMux (WithMuxBearer connectionId `contramap` muxTracer)
Expand Down Expand Up @@ -296,22 +292,6 @@ makeConnectionHandler muxTracer singMuxMode miniProtocolBundle handshakeArgument
throwM e


runResponder :: Mux InitiatorResponderMode m
-> MiniProtocol InitiatorResponderMode ByteString m a b -> m ()
runResponder mux MiniProtocol {
miniProtocolNum,
miniProtocolRun
} =
case miniProtocolRun of
InitiatorAndResponderProtocol _ responder ->
void $
runMiniProtocol
mux miniProtocolNum
ResponderDirection
StartOnDemand
(runMuxPeer responder . fromChannel)


--
-- Tracing
--
Expand Down

0 comments on commit 92e249d

Please sign in to comment.