Skip to content

Commit

Permalink
connection-manager: ConnectionHandler
Browse files Browse the repository at this point in the history
ConnectionHandler needs `MiniProtocolBundle` before the connection
manager starts.  This type only contains static information about
mini-protocols, but this forces on us to use two connection managers:
one for node-to-node connections and another one for node-to-client
connections, but this is ok as these resources are indepenedent.
  • Loading branch information
coot committed Aug 3, 2020
1 parent fb6413f commit 53eae8b
Show file tree
Hide file tree
Showing 6 changed files with 330 additions and 5 deletions.
Expand Up @@ -43,6 +43,7 @@ library
Ouroboros.Network.ConnectionId
Ouroboros.Network.ConnectionManager.Types
Ouroboros.Network.ConnectionManager.Core
Ouroboros.Network.ConnectionManager.ConnectionHandler
Ouroboros.Network.Server.ConnectionTable
Ouroboros.Network.Server.Socket
Ouroboros.Network.Server.RateLimiting
Expand Down
@@ -0,0 +1,305 @@
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}

-- | Implementation of 'ConnectionHandler'
--
module Ouroboros.Network.ConnectionManager.ConnectionHandler
( MuxPromise (..)
, MuxConnectionHandler
, makeConnectionHandler
, MuxConnectionManager
-- * tracing
, ConnectionTrace (..)
) where

import Control.Monad (when)
import Control.Monad.Class.MonadAsync
import Control.Monad.Class.MonadFork
import Control.Monad.Class.MonadSTM.Strict
import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTime
import Control.Monad.Class.MonadTimer
import Control.Tracer (Tracer, contramap, traceWith)

import Data.ByteString.Lazy (ByteString)
import Data.Functor (void)
import Data.Foldable (traverse_)
import Data.Typeable (Typeable)

import Network.Mux hiding (miniProtocolNum)

import Ouroboros.Network.Mux
import Ouroboros.Network.Protocol.Handshake
import Ouroboros.Network.Channel (fromChannel)
import Ouroboros.Network.ConnectionId (ConnectionId)
import Ouroboros.Network.ConnectionManager.Types

-- | We place an upper limit of `30s` on the time we wait on receiving an SDU.
-- There is no upper bound on the time we wait when waiting for a new SDU.
-- This makes it possible for miniprotocols to use timeouts that are larger
-- than 30s or wait forever. `30s` for receiving an SDU corresponds to
-- a minimum speed limit of 17kbps.
--
-- ( 8 -- mux header length
-- + 0xffff -- maximum SDU payload
-- )
-- * 8
-- = 524_344 -- maximum bits in an SDU
--
-- 524_344 / 30 / 1024 = 17kbps
--
sduTimeout :: DiffTime
sduTimeout = 30


-- | For handshake, we put a limit of `10s` for sending or receiving a single
-- `MuxSDU`.
--
sduHandshakeTimeout :: DiffTime
sduHandshakeTimeout = 10


-- | States of the connection handler thread.
--
-- * 'MuxRunning' - sucessful Handshake, mux started
-- * 'MuxStopped' - either mux was gracefully stopped (using 'Mux' or by
-- 'killThread'; the latter is done by
-- 'Ouoroboros.Network.ConnectinoManager.withConnectionManager')
-- * 'MuxPromiseHandshakeClientError'
-- - the connection handler thread was running client side
-- of the handshake negotiation, which failed with
-- 'HandshakeException'
-- * 'MuxPromiseHandshakeServerError'
-- - the conneciton hndler thread was running server side
-- of the handshake protocol, which faile with
-- 'HandshakeException'
-- * 'MuxPromiseError' - the multiplexer thrown 'MuxError'.
--
data MuxPromise muxMode verionNumber bytes m where
MuxRunning
:: forall muxMode versionNumber bytes m a b.
!(Mux muxMode m)
-> ![MiniProtocol muxMode bytes m a b]
-> !(StrictTVar m RunOrStop)
-> MuxPromise muxMode versionNumber bytes m

MuxStopped
:: MuxPromise muxMode versionNumber bytes m

MuxPromiseHandshakeClientError
:: HasInitiator muxMode ~ True
=> !(HandshakeException (HandshakeClientProtocolError versionNumber))
-> MuxPromise muxMode versionNumber bytes m

MuxPromiseHandshakeServerError
:: HasResponder muxMode ~ True
=> !(HandshakeException (RefuseReason versionNumber))
-> MuxPromise muxMode versionNumber bytes m

MuxPromiseError
:: !SomeException
-> MuxPromise muxMode versionNumber bytes m


-- | A predicate which returns 'True' if connection handler thread has stopped running.
--
isConnectionHandlerRunning :: MuxPromise muxMode verionNumber bytes m -> Bool
isConnectionHandlerRunning muxPromise =
case muxPromise of
MuxRunning{} -> True
MuxPromiseHandshakeClientError{} -> False
MuxPromiseHandshakeServerError{} -> False
MuxPromiseError{} -> False
MuxStopped -> False


-- | Type of 'ConnectionHandler' implemented in this module.
--
type MuxConnectionHandler muxMode peerAddr versionNumber bytes m =
ConnectionHandler muxMode
(ConnectionTrace versionNumber)
peerAddr
(MuxPromise muxMode versionNumber bytes m)
m

-- | Type alias for 'ConnectionManager' using 'MuxPromise'.
--
type MuxConnectionManager muxMode socket peerAddr versionNumber bytes m =
ConnectionManager muxMode socket peerAddr (MuxPromise muxMode versionNumber bytes m) m

-- | To be used as `makeConnectionHandler` field of 'ConnectionManagerArguments'.
--
-- Note: We need to pass `MiniProtocolBundle` what forces us to have two
-- different `ConnectionManager`s: one for `node-to-client` and another for
-- `node-to-node` connections. But this is ok, as these resources are
-- independent.
--
makeConnectionHandler
:: forall peerAddr muxMode versionNumber extra m a b.
( MonadAsync m
, MonadCatch m
, MonadFork m
, MonadThrow (STM m)
, MonadTime m
, MonadTimer m
, MonadMask m
, Ord versionNumber
, Typeable versionNumber
)
=> Tracer m (WithMuxBearer (ConnectionId peerAddr) MuxTrace)
-> SingInitiatorResponderMode muxMode
-- ^ describe whether this is outbound or inbound connection, and bring
-- evidence that we can use mux with it.
-> MiniProtocolBundle muxMode
-> HandshakeArguments (ConnectionId peerAddr) versionNumber extra m
(OuroborosApplication muxMode peerAddr ByteString m a b)
-> MuxConnectionHandler muxMode peerAddr versionNumber ByteString m
makeConnectionHandler muxTracer singMuxMode miniProtocolBundle handshakeArguments =
ConnectionHandler $
case singMuxMode of
SInitiatorMode -> WithInitiatorMode outboundConnectionHandler
SResponderMode -> WithResponderMode inboundConnectionHandler
SInitiatorResponderMode -> WithInitiatorResponderMode outboundConnectionHandler
inboundConnectionHandler
where
outboundConnectionHandler
:: HasInitiator muxMode ~ True
=> ConnectionHandlerFn (ConnectionTrace versionNumber)
peerAddr
(MuxPromise muxMode versionNumber ByteString m)
m
outboundConnectionHandler muxPromiseVar tracer connectionId muxBearer =
exceptionHandling muxPromiseVar tracer $ do
traceWith tracer ConnectionStart
hsResult <- runHandshakeClient (muxBearer sduHandshakeTimeout)
connectionId
handshakeArguments
case hsResult of
Left !err -> do
atomically $ writeTVar muxPromiseVar (Promised (MuxPromiseHandshakeClientError err))
traceWith tracer (ConnectionTraceHandshakeClientError err)
Right app -> do
traceWith tracer ConnectionTraceHandshakeSuccess
scheduleStopVar <- newTVarM Run
let !ptcls = ouroborosProtocols connectionId (readTVar scheduleStopVar) app
!mux <- newMux miniProtocolBundle
atomically $ writeTVar muxPromiseVar
(Promised
(MuxRunning mux
ptcls
scheduleStopVar))

-- For outbound connections we need to on demand start receivers.
-- This is, in a sense, a no man land: the server will not act, as
-- it's only reacting to inbound connections, and it also does not
-- belong to initiator (peer-2-peer governor).
case singMuxMode of
SInitiatorResponderMode -> do
-- TODO: #2221 restart responders
traverse_ (runResponder mux) ptcls

_ -> pure ()

runMux (WithMuxBearer connectionId `contramap` muxTracer)
mux (muxBearer sduTimeout)


inboundConnectionHandler
:: HasResponder muxMode ~ True
=> ConnectionHandlerFn (ConnectionTrace versionNumber)
peerAddr
(MuxPromise muxMode versionNumber ByteString m)
m
inboundConnectionHandler muxPromiseVar tracer connectionId muxBearer =
exceptionHandling muxPromiseVar tracer $ do
traceWith tracer ConnectionStart
hsResult <- runHandshakeServer (muxBearer sduHandshakeTimeout)
connectionId
(\_ _ _ -> Accept) -- we accept all connections
handshakeArguments
case hsResult of
Left !err -> do
atomically $ writeTVar muxPromiseVar (Promised (MuxPromiseHandshakeServerError err))
traceWith tracer (ConnectionTraceHandshakeServerError err)
Right app -> do
traceWith tracer ConnectionTraceHandshakeSuccess
scheduleStopVar <- newTVarM Run
let !ptcls = ouroborosProtocols connectionId (readTVar scheduleStopVar) app
!mux <- newMux miniProtocolBundle
atomically $ writeTVar muxPromiseVar (Promised (MuxRunning mux ptcls scheduleStopVar))
runMux (WithMuxBearer connectionId `contramap` muxTracer)
mux (muxBearer sduTimeout)

-- minimal error handling, just to make adequate changes to
-- `muxPromiseVar`; Classification of errors is done by
-- 'withConnectionManager' when the connection handler thread is started..
exceptionHandling :: forall x.
StrictTVar m
(Promise
(MuxPromise muxMode versionNumber ByteString m))
-> Tracer m (ConnectionTrace versionNumber)
-> m x -> m x
exceptionHandling muxPromiseVar tracer io =
io
-- the default for normal exit and unhandled error is to write
-- `MusStopped`, but we don't want to override handshake errors.
`finally` do
atomically $ do
st <- readTVar muxPromiseVar
when (case st of
Promised muxPromise -> isConnectionHandlerRunning muxPromise
Empty -> True)
$ writeTVar muxPromiseVar (Promised MuxStopped)
traceWith tracer ConnectionStopped

-- if 'MuxError' was thrown by the conneciton handler, let the other side
-- know.
`catch` \(e :: SomeException) -> do
atomically (writeTVar muxPromiseVar (Promised (MuxPromiseError e)))
throwM e


runResponder :: Mux InitiatorResponderMode m
-> MiniProtocol InitiatorResponderMode ByteString m a b -> m ()
runResponder mux MiniProtocol {
miniProtocolNum,
miniProtocolRun
} =
case miniProtocolRun of
InitiatorAndResponderProtocol _ responder ->
void $
runMiniProtocol
mux miniProtocolNum
ResponderDirection
StartOnDemand
(runMuxPeer responder . fromChannel)


--
-- Tracing
--


-- | 'ConnectionTrace' is embedded into 'ConnectionManagerTrace' with
-- 'Ouroboros.Network.ConnectionMamanger.Types.ConnectionTrace' constructor.
--
-- TODO: when 'Handshake' will get it's own tracer, independent of 'Mux', it
-- should be embedded into 'ConnectionTrace'.
--
data ConnectionTrace versionNumber =
ConnectionStart
| ConnectionTraceHandshakeSuccess
| ConnectionTraceHandshakeClientError
!(HandshakeException (HandshakeClientProtocolError versionNumber))
| ConnectionTraceHandshakeServerError
!(HandshakeException (RefuseReason versionNumber))
| ConnectionStopped
deriving Show
Expand Up @@ -196,9 +196,12 @@ withConnectionManager ConnectionManagerArguments {
muxPromise
(ConnectionTrace connectionId `contramap` tracer)
connectionId
(toBearer connectionSnocket
(WithMuxBearer connectionId `contramap` muxTracer)
socket))
(\bearerTimeout ->
toBearer
connectionSnocket
bearerTimeout
(WithMuxBearer connectionId `contramap` muxTracer)
socket))
`finally` cleanup
)
(\e -> case rethrowPolicy peerAddr e of
Expand Down
Expand Up @@ -40,6 +40,7 @@ module Ouroboros.Network.ConnectionManager.Types
import Control.Exception ( Exception
, SomeException )
import Control.Monad.Class.MonadSTM.Strict
import Control.Monad.Class.MonadTime (DiffTime)
import Control.Tracer (Tracer)
import Data.Typeable (Typeable)

Expand Down Expand Up @@ -119,7 +120,7 @@ type ConnectionHandlerFn handlerTrace peerAddr muxPromise m
= StrictTVar m (Promise muxPromise)
-> Tracer m handlerTrace
-> ConnectionId peerAddr
-> MuxBearer m
-> (DiffTime -> MuxBearer m)
-> m ()


Expand Down
16 changes: 15 additions & 1 deletion ouroboros-network-framework/src/Ouroboros/Network/Mux.hs
Expand Up @@ -14,7 +14,9 @@ module Ouroboros.Network.Mux
, MiniProtocolLimits (..)
, RunMiniProtocol (..)
, MuxPeer (..)
, runMuxPeer
, toApplication
, ouroborosProtocols
, RunOrStop (..)
, ScheduledStop
, neverStop
Expand Down Expand Up @@ -140,7 +142,6 @@ toApplication connectionId scheduleStop (OuroborosApplication ptcls) =
Mux.miniProtocolRun = toMuxRunMiniProtocol (miniProtocolRun ptcl)
}
| ptcl <- ptcls connectionId scheduleStop ]
where

toMuxRunMiniProtocol :: forall mode m a b.
(MonadCatch m, MonadAsync m)
Expand All @@ -154,6 +155,19 @@ toMuxRunMiniProtocol (InitiatorAndResponderProtocol i r) =
Mux.InitiatorAndResponderProtocol (runMuxPeer i . fromChannel)
(runMuxPeer r . fromChannel)

ouroborosProtocols :: ConnectionId addr
-> ScheduledStop m
-> OuroborosApplication mode addr bytes m a b
-> [MiniProtocol mode bytes m a b]
ouroborosProtocols connectionId scheduleStop (OuroborosApplication ptcls) =
[ MiniProtocol {
miniProtocolNum = miniProtocolNum ptcl,
miniProtocolLimits = miniProtocolLimits ptcl,
miniProtocolRun = miniProtocolRun ptcl
}
| ptcl <- ptcls connectionId scheduleStop ]


-- |
-- Run a @'MuxPeer'@ using either @'runPeer'@ or @'runPipelinedPeer'@.
--
Expand Down
Expand Up @@ -52,6 +52,7 @@ handshakeProtocolNum = MiniProtocolNum 0
data HandshakeException a =
HandshakeProtocolLimit ProtocolLimitFailure
| HandshakeProtocolError a
deriving Show


-- | Try to complete either initiator or responder side of the Handshake protocol
Expand Down

0 comments on commit 53eae8b

Please sign in to comment.