Skip to content

Commit

Permalink
Snocket.toBearer: pass sduTimout
Browse files Browse the repository at this point in the history
This allows us to use a bearer with 10s timeout on receiving or sending
a single SDU for handshake protocol, and 30s timeout for all other
mini-protocols.
  • Loading branch information
coot committed Jul 6, 2020
1 parent 1361bb9 commit 4fdf2e0
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 63 deletions.
Expand Up @@ -24,7 +24,6 @@ import qualified Data.ByteString.Lazy as BL
import qualified Codec.CBOR.Read as CBOR
import qualified Codec.CBOR.Term as CBOR

import Network.Mux.Timeout
import Network.Mux.Trace
import Network.Mux.Types
import Network.TypedProtocol.Codec
Expand All @@ -44,40 +43,30 @@ import Ouroboros.Network.Protocol.Handshake.Server
handshakeProtocolNum :: MiniProtocolNum
handshakeProtocolNum = MiniProtocolNum 0

-- | Timeout for the complete handshake exchange.
--
handshakeTimeout :: DiffTime
handshakeTimeout = 10 -- 10 seconds

-- | Wrapper around initiator and responder errors experienced by tryHandshake.
--
data HandshakeException a =
HandshakeProtocolLimit ProtocolLimitFailure
| HandshakeProtocolError a
| HandshakeTimeout


-- | Try to complete either initiator or responder side of the Handshake protocol
-- within `handshakeTimeout` seconds.
--
tryHandshake :: ( MonadAsync m
, MonadFork m
, MonadMonotonicTime m
, MonadTimer m
tryHandshake :: forall m a r.
( MonadAsync m
, MonadMask m
, MonadThrow (STM m)
)
=> m (Either a r)
-> m (Either (HandshakeException a) r)
tryHandshake doHandshake = do
mapp <- withTimeoutSerial $ \timeoutFn -> timeoutFn handshakeTimeout $ try doHandshake
mapp <- try doHandshake
case mapp of
Nothing -> return $ Left HandshakeTimeout
Just (Left (err :: ProtocolLimitFailure)) ->
return $ Left $ HandshakeProtocolLimit err
Just (Right (Left err)) ->
return $ Left $ HandshakeProtocolError err
Just (Right (Right r)) -> return $ Right r
Left err ->
return $ Left $ HandshakeProtocolLimit err
Right (Left err) ->
return $ Left $ HandshakeProtocolError err
Right (Right r) -> return $ Right r


--
Expand Down
Expand Up @@ -80,9 +80,11 @@ byteLimitsHandshake = ProtocolSizeLimits stateToLimit (fromIntegral . BL.length)
stateToLimit (ClientAgency TokPropose) = maxTransmissionUnit
stateToLimit (ServerAgency TokConfirm) = maxTransmissionUnit

-- Time limits
-- We don't use a per-state timeout, instead there is an overall timeout
-- for the complete handshake protocol exchange.
-- | Time limits.
--
-- We use a bearer whcih has `10s` timout on sending or receiving a single
-- `MuxSDU`. Handshake messages must fit into a single `MuxSDU`, thus we don't
-- set another timeout here.
timeLimitsHandshake :: forall vNumber. ProtocolTimeLimits (Handshake vNumber CBOR.Term)
timeLimitsHandshake = ProtocolTimeLimits stateToLimit
where
Expand Down
27 changes: 4 additions & 23 deletions ouroboros-network-framework/src/Ouroboros/Network/Snocket.hs
Expand Up @@ -183,7 +183,7 @@ data Snocket m fd addr = Snocket {

, close :: fd -> m ()

, toBearer :: Tracer m MuxTrace -> fd -> (MuxBearer m)
, toBearer :: DiffTime -> Tracer m MuxTrace -> fd -> MuxBearer m
}


Expand Down Expand Up @@ -247,28 +247,9 @@ socketSnocket ioManager = Snocket {
, listen = \s -> Socket.listen s 8
, accept = berkeleyAccept ioManager
, close = Socket.close
, toBearer = Mx.socketAsMuxBearer sduTimeout
, toBearer = Mx.socketAsMuxBearer
}
where

-- We place an upper limit of 30s on the time we wait on receiving an SDU.
-- There is no upper bound on the time we wait when waiting for a new SDU.
-- This makes it possible for miniprotocols to use timeouts that are larger
-- than 30s or wait forever.
--
-- 30s for receiving an SDU corresponds to a minimum speed limit of 17kbps.
--
-- ( 8 -- mux header length
-- + 0xffff -- maximum SDU payload
-- )
-- * 8
-- = 524_344 -- maximum bits in an SDU
--
-- 524_344 / 30 / 1024 = 17kbps
--
sduTimeout :: DiffTime
sduTimeout = 30

openSocket :: AddressFamily SockAddr -> IO Socket
openSocket (SocketFamily family_) = do
sd <- Socket.socket family_ Socket.Stream Socket.defaultProtocol
Expand Down Expand Up @@ -353,7 +334,7 @@ namedPipeSnocket ioManager path = Snocket {

, close = Win32.closeHandle

, toBearer = namedPipeAsBearer
, toBearer = \_sduTimeout -> namedPipeAsBearer
}
where
localAddress :: LocalAddress
Expand Down Expand Up @@ -406,7 +387,7 @@ localSnocket ioManager _ = Snocket {
, open = openSocket
, openToConnect = \_addr -> openSocket LocalFamily
, close = Socket.close
, toBearer = Mx.socketAsMuxBearer (-1) -- Negative values means no timeout.
, toBearer = Mx.socketAsMuxBearer
}
where
toLocalAddress :: SockAddr -> LocalAddress
Expand Down
52 changes: 34 additions & 18 deletions ouroboros-network-framework/src/Ouroboros/Network/Socket.hs
Expand Up @@ -85,11 +85,9 @@ import Control.Tracer

import qualified Network.Mux.Compat as Mx
import Network.Mux.DeltaQ.TraceTransformer
import Network.Mux.Types (MuxBearer)

import Ouroboros.Network.ConnectionId
import Ouroboros.Network.Codec hiding (encode, decode)
import Ouroboros.Network.Driver.Limits
import Ouroboros.Network.Driver (TraceSendRecv)
import Ouroboros.Network.Mux
import Ouroboros.Network.ErrorPolicy
Expand Down Expand Up @@ -143,6 +141,29 @@ sockAddrFamily (Socket.SockAddrInet _ _ ) = Socket.AF_INET
sockAddrFamily (Socket.SockAddrInet6 _ _ _ _) = Socket.AF_INET6
sockAddrFamily (Socket.SockAddrUnix _ ) = Socket.AF_UNIX

-- | We place an upper limit of `30s` on the time we wait on receiving an SDU.
-- There is no upper bound on the time we wait when waiting for a new SDU.
-- This makes it possible for miniprotocols to use timeouts that are larger
-- than 30s or wait forever. `30s` for receiving an SDU corresponds to
-- a minimum speed limit of 17kbps.
--
-- ( 8 -- mux header length
-- + 0xffff -- maximum SDU payload
-- )
-- * 8
-- = 524_344 -- maximum bits in an SDU
--
-- 524_344 / 30 / 1024 = 17kbps
--
sduTimeout :: DiffTime
sduTimeout = 30

-- | For handshake, we put a limit of `10s` for sending or receiving a single
-- `MuxSDU`.
--
sduHandshakeTimeout :: DiffTime
sduHandshakeTimeout = 10


-- |
-- Connect to a remote node. It is using bracket to enclose the underlying
Expand Down Expand Up @@ -209,12 +230,11 @@ connectToNode'
connectToNode' sn handshakeCodec versionDataCodec NetworkConnectTracers {nctMuxTracer, nctHandshakeTracer } versions sd = do
connectionId <- ConnectionId <$> Snocket.getLocalAddr sn sd <*> Snocket.getRemoteAddr sn sd
muxTracer <- initDeltaQTracer' $ Mx.WithMuxBearer connectionId `contramap` nctMuxTracer
let bearer = Snocket.toBearer sn muxTracer sd
ts_start <- getMonotonicTime

app_e <-
runHandshakeClient
bearer
(Snocket.toBearer sn sduHandshakeTimeout muxTracer sd)
connectionId
-- TODO: push 'HandshakeArguments' up the call stack.
HandshakeArguments {
Expand All @@ -225,11 +245,6 @@ connectToNode' sn handshakeCodec versionDataCodec NetworkConnectTracers {nctMuxT
}
ts_end <- getMonotonicTime
case app_e of
Left HandshakeTimeout -> do
traceWith muxTracer $ Mx.MuxTraceHandshakeClientError ExceededTimeLimit
(diffTime ts_end ts_start)
throwIO ExceededTimeLimit

Left (HandshakeProtocolLimit err) -> do
traceWith muxTracer $ Mx.MuxTraceHandshakeClientError err (diffTime ts_end ts_start)
throwIO err
Expand All @@ -240,7 +255,10 @@ connectToNode' sn handshakeCodec versionDataCodec NetworkConnectTracers {nctMuxT

Right app -> do
traceWith muxTracer $ Mx.MuxTraceHandshakeClientEnd (diffTime ts_end ts_start)
Mx.muxStart muxTracer (toApplication connectionId (neverStop (Proxy :: Proxy IO)) app) bearer
Mx.muxStart
muxTracer
(toApplication connectionId (neverStop (Proxy :: Proxy IO)) app)
(Snocket.toBearer sn sduTimeout muxTracer sd)


-- Wraps a Socket inside a Snocket and calls connectToNode'
Expand Down Expand Up @@ -328,14 +346,12 @@ beginConnection sn muxTracer handshakeTracer handshakeCodec versionDataCodec acc
case accept of
AcceptConnection st' connectionId versions -> pure $ Server.Accept st' $ \sd -> do
muxTracer' <- initDeltaQTracer' $ Mx.WithMuxBearer connectionId `contramap` muxTracer
let bearer :: MuxBearer IO
bearer = Snocket.toBearer sn muxTracer' sd

traceWith muxTracer' $ Mx.MuxTraceHandshakeStart

app_e <-
runHandshakeServer
bearer
(Snocket.toBearer sn sduHandshakeTimeout muxTracer' sd)
connectionId
acceptVersion
HandshakeArguments {
Expand All @@ -346,10 +362,6 @@ beginConnection sn muxTracer handshakeTracer handshakeCodec versionDataCodec acc
}

case app_e of
Left HandshakeTimeout -> do
traceWith muxTracer' $ Mx.MuxTraceHandshakeServerError ExceededTimeLimit
throwIO ExceededTimeLimit

Left (HandshakeProtocolLimit err) -> do
traceWith muxTracer' $ Mx.MuxTraceHandshakeServerError err
throwIO err
Expand All @@ -360,7 +372,11 @@ beginConnection sn muxTracer handshakeTracer handshakeCodec versionDataCodec acc

Right (SomeResponderApplication app) -> do
traceWith muxTracer' $ Mx.MuxTraceHandshakeServerEnd
Mx.muxStart muxTracer' (toApplication connectionId (neverStop (Proxy :: Proxy IO)) app) bearer
Mx.muxStart
muxTracer'
(toApplication connectionId (neverStop (Proxy :: Proxy IO)) app)
(Snocket.toBearer sn sduTimeout muxTracer' sd)

RejectConnection st' _peerid -> pure $ Server.Reject st'

mkListeningSocket
Expand Down

0 comments on commit 4fdf2e0

Please sign in to comment.