Skip to content
Permalink
Browse files

Merge #725

725: Mux - documentation & refactoring r=coot a=coot

This PR contains small changes to mux:

* hides creation of a `Wanton` inside a `muxChannel` (that's the only code change!)
* expands on isometric flow control in mux
* add `Network.Mux.Codec` module with `encodeMuxSDU and `decodeMuxSDU`

Co-authored-by: Marcin Szamotulski <profunctor@pm.me>
  • Loading branch information...
iohk-bors and coot committed Jul 10, 2019
2 parents f3f5f49 + afff08d commit 47927538a80c64106db255a2df5865311627afe4
@@ -38,6 +38,7 @@ library
hs-source-dirs: src
exposed-modules: Network.Mux
Network.Mux.Channel
Network.Mux.Codec
Network.Mux.Egress
Network.Mux.Ingress
Network.Mux.Interface
@@ -54,6 +55,7 @@ test-suite test-network-mux
main-is: Main.hs
other-modules: Network.Mux
Network.Mux.Channel
Network.Mux.Codec
Network.Mux.Egress
Network.Mux.Ingress
Network.Mux.Interface
@@ -6,19 +6,17 @@
{-# LANGUAGE TypeFamilies #-}

module Network.Mux (
MiniProtocolLimits (..)
muxStart
, muxBearerSetState
, MuxSDU (..)
, MiniProtocolLimits (..)
, ProtocolEnum (..)
, MiniProtocolId (..)
, MiniProtocolMode (..)
, MuxBearerState (..)
, MuxError (..)
, MuxErrorType (..)
, MuxSDU (..)
, RemoteClockModel (..)
, encodeMuxSDU
, decodeMuxSDUHeader
, muxBearerSetState
, muxStart
) where

import Control.Monad
@@ -40,13 +38,51 @@ import Network.Mux.Types

-- | muxStart starts a mux bearer for the specified protocols corresponding to
-- one of the provided Versions.
--
-- __Isometric flow control: analysis of head-of-line blocking of the ingress side of the multiplexer__
--
-- For each mini-protocol (enumeratated by @ptcl@), mux will create two
-- channels. One for initiator and one for the responder. Each channel will use
-- a single 'Wanton'. When it is filled, it is put in a common queue
-- 'tsrQueue'. This means that the queue is bound by @2 * |ptcl|@. Every side
-- of a mini-protocol is served by a single 'Wanton': when an applicaiton sends
-- data, the channel will try to put it into the 'Wanton' (which might block).
-- 'Wanton's are taken from the 'tsrQueue' queue by one of mux threads. This
-- elimnates head of line blocking: each mini-protocol thread can block on
-- puting more bytes into its 'Wanton', but it cannot block the other
-- mini-protocols or the thread that is reading the 'tsrQueue' queue. This is
-- ensured since the 'muxChannel' will put only a non-empty 'Wanton' to the
-- 'tsrQueue' queue, and on such wantons the queue is never blocked. This means
-- that the only way the queue can block is when its empty, which means that
-- none of the mini-protocols wanted to send. The egress part will read
-- a 'Wanton', take a fixed amount of bytes encode them in as an 'MuxSDU'; if
-- there are leftovers it will put them back in the 'Wanton' and place it at the
-- end of the queue (reading and writting to it will happen in a single STM
-- transaction which assures that the order of requests from a mini-protocol is
-- preserved.
--
-- Properties:
--
-- * at any given time the 'tsrQueue' contains at most one
-- 'TranslocationServiceRequest' from a given mini-protocol of the given
-- 'MiniProtocolMode', thus the queue contains at most @2 * |ptcl|@
-- translocation requests.
-- * at any given time each @TranslocationServiceRequest@ contains a non-empty
-- 'Wanton'
--
-- TODO: replace MonadSay with iohk-monitoring-framework.
muxStart :: forall m appType ptcl a b.
( MonadAsync m, MonadSay m, MonadSTM m, MonadThrow m, MonadThrow (STM m)
, MonadMask m , Ord ptcl, Enum ptcl, Bounded ptcl, Show ptcl, MiniProtocolLimits ptcl)
=> MuxApplication appType ptcl m BL.ByteString a b
-> MuxBearer ptcl m
-> m ()
--
muxStart :: forall m appType ptcl a b. ( MonadAsync m , MonadSay m , MonadSTM
m , MonadThrow m , MonadThrow (STM m) , MonadMask m
, Ord ptcl
, Enum ptcl
, Bounded ptcl
, Show ptcl
, MiniProtocolLimits ptcl
)
=> MuxApplication appType ptcl m BL.ByteString a b
-> MuxBearer ptcl m
-> m ()
muxStart app bearer = do
tbl <- setupTbl
tq <- atomically $ newTBQueue 100
@@ -86,20 +122,15 @@ muxStart app bearer = do
-> m [m ()]
mpsJob cnt pmss mpdId = do

w_i <- atomically newEmptyTMVar
w_r <- atomically newEmptyTMVar

let initiatorChannel :: Channel m BL.ByteString
initiatorChannel = muxChannel pmss
(AppProtocolId mpdId)
ModeInitiator
w_i cnt
initiatorChannel <- muxChannel pmss
(AppProtocolId mpdId)
ModeInitiator
cnt

responderChannel :: Channel m BL.ByteString
responderChannel = muxChannel pmss
(AppProtocolId mpdId)
ModeResponder
w_r cnt
responderChannel <- muxChannel pmss
(AppProtocolId mpdId)
ModeResponder
cnt

return $ case app of
MuxInitiatorApplication initiator -> [ initiator mpdId initiatorChannel >> mpsJobExit cnt ]
@@ -132,19 +163,34 @@ muxControl pmss md = do
retry
throwM $ MuxError MuxControlProtocolError "MuxControl message on mature MuxBearer" callStack

-- | muxChannel creates a duplex channel for a specific 'MiniProtocolId' and 'MiniProtocolMode'.
muxChannel :: (MonadSTM m, MonadSay m, MonadThrow m, Ord ptcl, Enum ptcl, Show ptcl
, MiniProtocolLimits ptcl , HasCallStack) =>
PerMuxSharedState ptcl m ->
MiniProtocolId ptcl ->
MiniProtocolMode ->
TMVar m BL.ByteString ->
TVar m Int ->
Channel m BL.ByteString
muxChannel pmss mid md w cnt =
Channel {send, recv}
-- | muxChannel creates a duplex channel for a specific 'MiniProtocolId' and
-- 'MiniProtocolMode'.
--
muxChannel
:: forall m ptcl.
( MonadSTM m
, MonadSay m
, MonadThrow m
, Ord ptcl
, Enum ptcl
, Show ptcl
, MiniProtocolLimits ptcl
, HasCallStack
)
=> PerMuxSharedState ptcl m
-> MiniProtocolId ptcl
-> MiniProtocolMode
-> TVar m Int
-> m (Channel m BL.ByteString)
muxChannel pmss mid md cnt = do
w <- newEmptyTMVarM
return $ Channel { send = send (Wanton w)
, recv}
where
send encoding = do
send :: Wanton m
-> BL.ByteString
-> m ()
send want@(Wanton w) encoding = do
-- We send CBOR encoded messages by encoding them into by ByteString
-- forwarding them to the 'mux' thread, see 'Desired servicing semantics'.
-- This check is dependant on the good will of the sender and a receiver can't
@@ -157,7 +203,9 @@ muxChannel pmss mid md w cnt =
callStack
atomically $ modifyTVar' cnt (+ 1)
atomically $ putTMVar w encoding
atomically $ writeTBQueue (tsrQueue pmss) (TLSRDemand mid md (Wanton w))
atomically $ writeTBQueue (tsrQueue pmss) (TLSRDemand mid md want)

recv :: m (Maybe BL.ByteString)
recv = do
-- We receive CBOR encoded messages as ByteStrings (possibly partial) from the
-- matching ingress queueu. This is the same queue the 'demux' thread writes to.
@@ -20,6 +20,7 @@ import System.IO (Handle, hClose, hFlush)
import qualified Network.Mux as Mx
import Network.Mux.Types (MuxBearer)
import qualified Network.Mux.Types as Mx
import qualified Network.Mux.Codec as Mx
import qualified Network.Mux.Interface as Mx
import qualified Network.Mux.Time as Mx

@@ -48,7 +49,7 @@ pipeAsMuxBearer pcRead pcWrite = do
=> IO (Mx.MuxSDU ptcl, Time IO)
readPipe = do
hbuf <- recvLen' pcRead 8 []
case Mx.decodeMuxSDUHeader hbuf of
case Mx.decodeMuxSDU hbuf of
Left e -> throwM e
Right header -> do
--say $ printf "decoded mux header, goint to read %d bytes" (Mx.msLength header)
@@ -20,6 +20,7 @@ import qualified Network.Mux as Mx
import qualified Network.Mux.Interface as Mx
import Network.Mux.Types (MuxBearer)
import qualified Network.Mux.Types as Mx
import qualified Network.Mux.Codec as Mx
import Network.Mux.Time as Mx


@@ -49,7 +50,7 @@ queuesAsMuxBearer writeQueue readQueue sduSize traceQueue = do
readMux = do
buf <- atomically $ readTBQueue readQueue
let (hbuf, payload) = BL.splitAt 8 buf
case Mx.decodeMuxSDUHeader hbuf of
case Mx.decodeMuxSDU hbuf of
Left e -> throwM e
Right header -> do
ts <- getMonotonicTime
@@ -27,6 +27,7 @@ import qualified Network.Socket.ByteString.Lazy as Socket (recv, sendAll)
import qualified Network.Mux as Mx
import Network.Mux.Types (MuxBearer)
import qualified Network.Mux.Types as Mx
import qualified Network.Mux.Codec as Mx
import qualified Network.Mux.Time as Mx

hexDump :: BL.ByteString -> String -> IO ()
@@ -57,7 +58,7 @@ socketAsMuxBearer sd = do
hbuf <- recvLen' True 8 []
--say "read"
--hexDump hbuf ""
case Mx.decodeMuxSDUHeader hbuf of
case Mx.decodeMuxSDU hbuf of
Left e -> throwM e
Right header -> do
-- say $ printf "decoded mux header, goint to read %d bytes" (Mx.msLength header)
@@ -0,0 +1,76 @@
module Network.Mux.Codec where

import qualified Data.Binary.Get as Bin
import qualified Data.Binary.Put as Bin
import Data.Bits
import qualified Data.ByteString.Lazy as BL
import Data.Word
import GHC.Stack

import Network.Mux.Types


-- | Encode a 'MuxSDU' as a 'ByteString'.
--
-- > Binary format used by 'encodeMuxSDU' and 'decodeMuxSDUHeader'
-- > 0 1 2 3
-- > 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
-- > +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-- > | transmission time |
-- > +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-- > |M| conversation id | length |
-- > +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
--
-- All fields are in big endian byteorder.
--
encodeMuxSDU :: ProtocolEnum ptcl => MuxSDU ptcl -> BL.ByteString
encodeMuxSDU sdu =
let hdr = Bin.runPut enc in
BL.append hdr $ msBlob sdu
where
enc = do
Bin.putWord32be $ unRemoteClockModel $ msTimestamp sdu
putId (msId sdu) (putMode $ msMode sdu)
Bin.putWord16be $ fromIntegral $ BL.length $ msBlob sdu

putId ptcl mode = Bin.putWord16be $ fromProtocolEnum ptcl .|. mode

putMode :: MiniProtocolMode -> Word16
putMode ModeInitiator = 0
putMode ModeResponder = 0x8000


-- | Decode a 'MuSDU' header. A left inverse of 'encodeMuxSDU'.
--
decodeMuxSDU :: (HasCallStack , ProtocolEnum ptcl)
=> BL.ByteString -> Either MuxError (MuxSDU ptcl)
decodeMuxSDU buf =
case Bin.runGetOrFail dec buf of
Left (_, _, e) -> Left $ MuxError MuxDecodeError e callStack
Right (_, _, ph) ->
let mode = getMode $ mshIdAndMode ph
mid_m = getId $ mshIdAndMode ph .&. 0x7fff in
case mid_m of
Left e -> Left $ MuxError MuxUnknownMiniProtocol ("id = " ++ show e) callStack
Right mid -> Right $ MuxSDU {
msTimestamp = mshTimestamp ph
, msId = mid
, msMode = mode
, msLength = mshLength ph
, msBlob = BL.empty
}
where
dec = do
ts <- Bin.getWord32be
mid <- Bin.getWord16be
len <- Bin.getWord16be
return $ MuxSDUHeader (RemoteClockModel ts) mid len

getMode mid =
if mid .&. 0x8000 == 0 then ModeInitiator
else ModeResponder

getId :: ProtocolEnum ptcl => Word16 -> Either Word16 (MiniProtocolId ptcl)
getId n | Just ptcl <- toProtocolEnum n
= Right ptcl
getId a = Left a
@@ -3,49 +3,18 @@
{-# LANGUAGE TypeFamilies #-}

module Network.Mux.Egress (
encodeMuxSDU
, mux
mux
-- $egress
-- $servicingsSemantics
) where

import Control.Monad
import qualified Data.Binary.Put as Bin
import Data.Bits
import qualified Data.ByteString.Lazy as BL
import Data.Word

import Control.Monad.Class.MonadSTM

import Network.Mux.Types

-- | Encode a 'MuxSDU' as a 'ByteString'.
--
-- > Binary format used by 'encodeMuxSDU' and 'decodeMuxSDUHeader'
-- > 0 1 2 3
-- > 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
-- > +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-- > | transmission time |
-- > +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-- > |M| conversation id | length |
-- > +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
--
-- All fields are in big endian byteorder.
encodeMuxSDU :: ProtocolEnum ptcl => MuxSDU ptcl -> BL.ByteString
encodeMuxSDU sdu =
let hdr = Bin.runPut enc in
BL.append hdr $ msBlob sdu
where
enc = do
Bin.putWord32be $ unRemoteClockModel $ msTimestamp sdu
putId (msId sdu) (putMode $ msMode sdu)
Bin.putWord16be $ fromIntegral $ BL.length $ msBlob sdu

putId ptcl mode = Bin.putWord16be $ fromProtocolEnum ptcl .|. mode

putMode :: MiniProtocolMode -> Word16
putMode ModeInitiator = 0
putMode ModeResponder = 0x8000
import Network.Mux.Codec

-- $servicingsSemantics
-- = Desired Servicing Semantics

0 comments on commit 4792753

Please sign in to comment.
You can’t perform that action at this time.