Skip to content

Commit

Permalink
connection-manager: server
Browse files Browse the repository at this point in the history
On outound connection we need to be able to manage responders: not only
start them but also re-start them when they are finished.  The server
knows how to do it! This patch provides `ControlChannel`, which allows
to notifiy the server about a thread that it needs to monitor (start
& re-start responders when they finish).

This patch also extends the tests, which now runs multiple rounds of
`ReqResp` protocol, which forces the server to use the restarting logic.
  • Loading branch information
coot committed Sep 14, 2020
1 parent 33dc08d commit 26c33e9
Show file tree
Hide file tree
Showing 6 changed files with 518 additions and 36 deletions.
4 changes: 4 additions & 0 deletions ouroboros-network-framework/ouroboros-network-framework.cabal
Expand Up @@ -45,6 +45,8 @@ library
Ouroboros.Network.ConnectionManager.Core
Ouroboros.Network.ConnectionManager.ConnectionHandler
Ouroboros.Network.ConnectionManager.RethrowPolicy
Ouroboros.Network.ConnectionManager.Server
Ouroboros.Network.ConnectionManager.Server.ControlChannel
Ouroboros.Network.Server.ConnectionTable
Ouroboros.Network.Server.Socket
Ouroboros.Network.Server.RateLimiting
Expand Down Expand Up @@ -110,6 +112,7 @@ test-suite ouroboros-network-framework-tests
Network.TypedProtocol.ReqResp.Codec.CBOR
Test.Network.TypedProtocol.PingPong.Codec
Test.Network.TypedProtocol.ReqResp.Codec
Test.Ouroboros.Network.ConnectionManager.Server
Test.Ouroboros.Network.Driver
Test.Ouroboros.Network.Orphans
Test.Ouroboros.Network.Socket
Expand All @@ -132,6 +135,7 @@ test-suite ouroboros-network-framework-tests
, tasty
, tasty-quickcheck

, cardano-prelude
, contra-tracer

, io-sim
Expand Down
Expand Up @@ -31,15 +31,12 @@ import Control.Monad.Class.MonadTimer
import Control.Tracer (Tracer, contramap, traceWith)

import Data.ByteString.Lazy (ByteString)
import Data.Functor (void)
import Data.Foldable (traverse_)
import Data.Typeable (Typeable)

import Network.Mux hiding (miniProtocolNum)

import Ouroboros.Network.Mux
import Ouroboros.Network.Protocol.Handshake
import Ouroboros.Network.Channel (fromChannel)
import Ouroboros.Network.ConnectionId (ConnectionId (..))
import Ouroboros.Network.ConnectionManager.RethrowPolicy
import Ouroboros.Network.ConnectionManager.Types
Expand Down Expand Up @@ -166,9 +163,17 @@ makeConnectionHandler
-> MiniProtocolBundle muxMode
-> HandshakeArguments (ConnectionId peerAddr) versionNumber extra m
(OuroborosBundle muxMode peerAddr ByteString m a b)
-> (MuxPromise muxMode peerAddr versionNumber ByteString m a b -> m ())
-- ^ This method allows to pass control over responders to the server (for
-- outbound connections), see
-- 'Ouroboros.Network.ConnectionManager.Server.ControlChannel.newOutboundConnection'.
-> (ThreadId m, RethrowPolicy)
-> MuxConnectionHandler muxMode peerAddr versionNumber ByteString m a b
makeConnectionHandler muxTracer singMuxMode miniProtocolBundle handshakeArguments (mainThreadId, rethrowPolicy)=
makeConnectionHandler muxTracer singMuxMode
miniProtocolBundle
handshakeArguments
announceOutboundConnection
(mainThreadId, rethrowPolicy) =
ConnectionHandler $
case singMuxMode of
SInitiatorMode -> WithInitiatorMode outboundConnectionHandler
Expand Down Expand Up @@ -213,27 +218,19 @@ makeConnectionHandler muxTracer singMuxMode miniProtocolBundle handshakeArgument
(readTVar <$> scheduleStopVarBundle)
app
!mux <- newMux miniProtocolBundle
atomically $ writeTVar muxPromiseVar
(Promised
(MuxRunning connectionId
mux
muxApp
scheduleStopVarBundle))
let muxPromise =
MuxRunning
connectionId mux
muxApp scheduleStopVarBundle
atomically $ writeTVar muxPromiseVar (Promised muxPromise)

-- For outbound connections we need to on demand start receivers.
-- This is, in a sense, a no man land: the server will not act, as
-- it's only reacting to inbound connections, and it also does not
-- belong to initiator (peer-2-peer governor).
case (singMuxMode, muxApp) of
(SInitiatorResponderMode,
Bundle (WithHot hotPtcls)
(WithWarm warmPtcls)
(WithEstablished establishedPtcls)) -> do
-- TODO: #2221 restart responders
traverse_ (runResponder mux) hotPtcls
traverse_ (runResponder mux) warmPtcls
traverse_ (runResponder mux) establishedPtcls

case singMuxMode of
SInitiatorResponderMode ->
announceOutboundConnection muxPromise
_ -> pure ()

runMux (WithMuxBearer connectionId `contramap` muxTracer)
Expand Down Expand Up @@ -320,22 +317,6 @@ makeConnectionHandler muxTracer singMuxMode miniProtocolBundle handshakeArgument
-- know.


runResponder :: Mux InitiatorResponderMode m
-> MiniProtocol InitiatorResponderMode ByteString m a b -> m ()
runResponder mux MiniProtocol {
miniProtocolNum,
miniProtocolRun
} =
case miniProtocolRun of
InitiatorAndResponderProtocol _ responder ->
void $
runMiniProtocol
mux miniProtocolNum
ResponderDirection
StartOnDemand
(runMuxPeer responder . fromChannel)


--
-- Tracing
--
Expand Down
@@ -0,0 +1,228 @@
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}

-- | Server implementation based on 'ConnectionManager'
--
-- TODO: in the futures this should be moved to `Ouroboros.Network.Server`, but
-- to avoid confusion it will be kept here for now.
--
module Ouroboros.Network.ConnectionManager.Server
( ServerArguments (..)
, run
-- * Trace
, ServerTrace (..)
, AcceptConnectionsPolicyTrace (..)
) where

import Control.Exception (SomeException)
import Control.Monad (forever)
import Control.Monad.Class.MonadAsync
import Control.Monad.Class.MonadSTM.Strict
import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTime
import Control.Monad.Class.MonadTimer
import Control.Tracer (Tracer, contramap, traceWith)

import Data.ByteString.Lazy (ByteString)
import Data.Foldable (traverse_)
import Data.Void (Void)
import Data.List.NonEmpty (NonEmpty)
import qualified Data.List.NonEmpty as NonEmpty

import qualified Network.Mux as Mux

import Ouroboros.Network.ConnectionId
import Ouroboros.Network.ConnectionManager.Types
import Ouroboros.Network.ConnectionManager.ConnectionHandler
import Ouroboros.Network.ConnectionManager.Server.ControlChannel (ServerControlChannel)
import qualified Ouroboros.Network.ConnectionManager.Server.ControlChannel as Server
import Ouroboros.Network.Mux
import Ouroboros.Network.Channel (fromChannel)
import Ouroboros.Network.Server.RateLimiting
import Ouroboros.Network.Snocket


data ServerArguments (muxMode :: MuxMode) socket peerAddr versionNumber bytes m a b = ServerArguments {
serverSockets :: NonEmpty socket,
serverSnocket :: Snocket m socket peerAddr,
serverTracer :: Tracer m (ServerTrace peerAddr),
serverConnectionLimits :: AcceptedConnectionsLimit,
serverConnectionManager :: MuxConnectionManager muxMode socket peerAddr
versionNumber bytes m a b,

-- | Server control var is passed as an argument; this allows to use
-- the server to run and manage responders which needs to be started on
-- inbound connections.
--
serverControlChannel :: ServerControlChannel m muxMode peerAddr
versionNumber a b
}


run :: forall muxMode socket peerAddr versionNumber m a b.
( MonadAsync m
, MonadCatch m
, MonadDelay m
, MonadTime m
, Mux.HasResponder muxMode ~ True
)
=> ServerArguments muxMode socket peerAddr versionNumber ByteString m a b
-> m Void
run ServerArguments {
serverSockets,
serverSnocket,
serverTracer,
serverConnectionLimits,
serverConnectionManager,
serverControlChannel
} = do
let sockets = NonEmpty.toList serverSockets
localAddresses <- traverse (getLocalAddr serverSnocket) sockets
traceWith serverTracer (ServerStarted localAddresses)
-- concurrently run accept loops and the monitoring thread
runConcurrently
$ foldr1 (<>)
$ Concurrently monitoringThread
: (Concurrently . acceptLoop . accept serverSnocket) `map` sockets
`finally`
traceWith serverTracer ServerStopped
where
monitoringThread :: m Void
monitoringThread = forever $ do
controlMessage <- Server.readControlMessage serverControlChannel
case controlMessage of
Server.NewInboundConnection
(MuxRunning connectionId
mux
(Bundle
(WithHot hotPtls)
(WithWarm warmPtls)
(WithEstablished establishedPtls))
_) -> do
traceWith serverTracer
(ServerStartRespondersOnInboundConncetion
(remoteAddress connectionId))
traverse_
(\miniProtocol -> do
!controlMessageSTM
<- runResponder mux connectionId Mux.StartEagerly miniProtocol
Server.writeControlMessage serverControlChannel controlMessageSTM)
(establishedPtls ++ warmPtls ++ hotPtls)

Server.NewOutboundConnection
(MuxRunning connectionId
mux
(Bundle
(WithHot hotPtls)
(WithWarm warmPtls)
(WithEstablished establishedPtls))
_) -> do
traceWith serverTracer
(ServerStartRespondersOnOutboundConnection
(remoteAddress connectionId))
traverse_
(\miniProtocol -> do
!controlMessageSTM
<- runResponder mux connectionId Mux.StartOnDemand miniProtocol
Server.writeControlMessage serverControlChannel controlMessageSTM)
(establishedPtls ++ warmPtls ++ hotPtls)

-- There was a problem establishing mux (e.g. a Handshake error or
-- an IO exception during Handshake negotation). We don't need to act
-- on it, the connection manager will close the connection.
Server.NewInboundConnection _ ->
pure ()
Server.NewOutboundConnection _ ->
pure ()

Server.MiniProtocolException {} ->
-- error handling is done in 'ConnectionHandler'
pure ()

Server.MiniProtocolCompleted mux connectionId miniProtocol@MiniProtocol { miniProtocolNum } -> do
traceWith
serverTracer
(ServerMiniProtocolRestarted (remoteAddress connectionId) miniProtocolNum)
controlMessageSTM <- runResponder mux connectionId Mux.StartOnDemand miniProtocol
Server.writeControlMessage serverControlChannel controlMessageSTM


runResponder :: Mux.Mux muxMode m
-> ConnectionId peerAddr
-> Mux.StartOnDemandOrEagerly
-> MiniProtocol muxMode ByteString m a b
-> m (STM m (Server.ControlMessage muxMode peerAddr versionNumber m a b))
runResponder mux connectionId
startOnDemandOrEagerly
miniProtocol@MiniProtocol {
miniProtocolNum,
miniProtocolRun
} =
(fmap . fmap)
(\(x :: Either SomeException b) ->
case x of
Left err -> Server.MiniProtocolException mux connectionId miniProtocolNum err
Right _ -> Server.MiniProtocolCompleted mux connectionId miniProtocol)
$ case miniProtocolRun of
ResponderProtocolOnly responder ->
Mux.runMiniProtocol
mux miniProtocolNum
Mux.ResponderDirectionOnly
startOnDemandOrEagerly
-- TODO: eliminate 'fromChannel'
(runMuxPeer responder . fromChannel)
InitiatorAndResponderProtocol _ responder ->
Mux.runMiniProtocol
mux miniProtocolNum
Mux.ResponderDirection
startOnDemandOrEagerly
(runMuxPeer responder . fromChannel)


acceptLoop :: Accept m SomeException peerAddr socket
-> m Void
acceptLoop acceptOne = do
runConnectionRateLimits
(ServerAcceptPolicyTrace `contramap` serverTracer)
(numberOfConnections serverConnectionManager)
serverConnectionLimits
result <- runAccept acceptOne
case result of
(AcceptException err, acceptNext) -> do
traceWith serverTracer (ServerAcceptError err)
acceptLoop acceptNext
(Accepted socket peerAddr, acceptNext) -> do
traceWith serverTracer (ServerAcceptConnection peerAddr)
!muxPromise <-
includeInboundConnection
serverConnectionManager
socket peerAddr
Server.writeControlMessage
serverControlChannel
(Server.NewInboundConnection <$> muxPromise)
acceptLoop acceptNext

--
-- Trace
--

data ServerTrace peerAddr
= ServerAcceptConnection !peerAddr
| ServerStartRespondersOnInboundConncetion !peerAddr
| ServerStartRespondersOnOutboundConnection !peerAddr
| ServerAcceptError !SomeException
| ServerAcceptPolicyTrace !AcceptConnectionsPolicyTrace
| ServerStarted ![peerAddr]
| ServerStopped
| ServerFatalError !peerAddr !MiniProtocolNum !SomeException
| ServerPeerError !peerAddr !MiniProtocolNum !SomeException
| ServerError !peerAddr !MiniProtocolNum !SomeException
| ServerMiniProtocolRestarted !peerAddr !MiniProtocolNum
deriving Show

0 comments on commit 26c33e9

Please sign in to comment.