Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
A mux demo that uses ouroboros-network's keepalive protocol to mimic the ping command.
- Loading branch information
Showing
2 changed files
with
308 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,275 @@ | ||
{-# LANGUAGE BangPatterns #-} | ||
{-# LANGUAGE DataKinds #-} | ||
{-# LANGUAGE RankNTypes #-} | ||
|
||
module Main (main) where | ||
|
||
import qualified Codec.CBOR.Encoding as CBOR | ||
import qualified Codec.CBOR.Decoding as CBOR | ||
import qualified Codec.CBOR.Read as CBOR | ||
import qualified Codec.CBOR.Write as CBOR | ||
import Control.Exception | ||
import Control.Monad (replicateM, when, unless) | ||
import Control.Monad.Class.MonadAsync | ||
import Control.Monad.Class.MonadTime | ||
import Control.Monad.Class.MonadTimer hiding (timeout) | ||
import Control.Tracer (nullTracer) | ||
import Data.ByteString.Lazy (ByteString) | ||
import qualified Data.ByteString.Lazy as BL | ||
import Data.Maybe (fromMaybe) | ||
import Data.Word | ||
import Data.TDigest | ||
import Data.Text (unpack) | ||
import Network.Socket ( AddrInfo) | ||
import qualified Network.Socket as Socket | ||
import Text.Printf | ||
import System.Environment (getArgs) | ||
import System.Console.GetOpt | ||
import System.IO (hPutStr, stderr) | ||
|
||
import Network.Mux.Bearer.Socket | ||
import Network.Mux.Types | ||
import Network.Mux.Timeout | ||
|
||
mainnetMagic :: Word32 | ||
mainnetMagic = 764824073 | ||
|
||
handshakeNum :: MiniProtocolNum | ||
handshakeNum = MiniProtocolNum 0 | ||
|
||
keepaliveNum :: MiniProtocolNum | ||
keepaliveNum = MiniProtocolNum 8 | ||
|
||
data Flag = HostF String | PortF String | QuietF deriving Show | ||
|
||
options :: [OptDescr Flag] | ||
options = [ | ||
Option "h" ["h"] (ReqArg HostF "host") "hostname/ip, e.g relay.iohk.example", | ||
Option "p" ["port"] (ReqArg PortF "port") "portnumber, e.g 1234", | ||
Option "q" ["quiet"] (NoArg QuietF ) "quiet flag, csv only output." | ||
] | ||
|
||
getHost :: [Flag] -> String | ||
getHost [] = error "Specify host/ip with '-h <hostname>'" | ||
getHost (HostF h:_) = h | ||
getHost (_:xs) = getHost xs | ||
|
||
getPort :: [Flag] -> String | ||
getPort [] = error "Specify port with '-p <port number>'" | ||
getPort (PortF p:_) = p | ||
getPort (_:xs) = getPort xs | ||
|
||
getQuiet :: [Flag] -> Bool | ||
getQuiet [] = False | ||
getQuiet (QuietF:_) = True | ||
getQuiet (_:xs) = getQuiet xs | ||
main :: IO () | ||
main = do | ||
args <- getArgs | ||
let (actions, _, _ ) = getOpt RequireOrder options args | ||
host = getHost actions | ||
port = getPort actions | ||
quiet = getQuiet actions | ||
hints = Socket.defaultHints { Socket.addrSocketType = Socket.Stream } | ||
addresses <- Socket.getAddrInfo (Just hints) (Just host) (Just port) | ||
printf "%32s, %20s, %7s, %7s, %7s, %7s, %7s, %7s, %7s, %7s\n" "timestamp" "host" "cookie" "sample" "median" "p90" "mean" "min" "max" "std" | ||
caids <- mapM (async . pingClient quiet) addresses | ||
mapM_ wait caids | ||
|
||
data NodeToNodeVersion = NodeToNodeVersionV1 Word32 | ||
| NodeToNodeVersionV2 Word32 | ||
| NodeToNodeVersionV3 Word32 | ||
| NodeToNodeVersionV4 Word32 Bool | ||
deriving (Eq, Ord, Show) | ||
|
||
keepAliveReqEnc :: Word16 -> CBOR.Encoding | ||
keepAliveReqEnc cookie = | ||
CBOR.encodeWord 0 | ||
<> CBOR.encodeWord16 cookie | ||
|
||
keepAliveReq :: Word16 -> ByteString | ||
keepAliveReq = CBOR.toLazyByteString . keepAliveReqEnc | ||
|
||
|
||
|
||
handshakeReqEnc :: [NodeToNodeVersion] -> CBOR.Encoding | ||
handshakeReqEnc [] = error "null version list" | ||
handshakeReqEnc versions = | ||
CBOR.encodeListLen 2 | ||
<> CBOR.encodeWord 0 | ||
<> CBOR.encodeMapLen (fromIntegral $ length versions) | ||
<> mconcat [ encodeVersion v | ||
| v <- versions | ||
] | ||
where | ||
encodeVersion :: NodeToNodeVersion -> CBOR.Encoding | ||
encodeVersion (NodeToNodeVersionV1 magic) = | ||
CBOR.encodeWord 1 | ||
<> CBOR.encodeInt (fromIntegral magic) | ||
encodeVersion (NodeToNodeVersionV2 magic) = | ||
CBOR.encodeWord 2 | ||
<> CBOR.encodeInt (fromIntegral magic) | ||
encodeVersion (NodeToNodeVersionV3 magic) = | ||
CBOR.encodeWord 3 | ||
<> CBOR.encodeInt (fromIntegral magic) | ||
encodeVersion (NodeToNodeVersionV4 magic mode) = | ||
CBOR.encodeWord 4 | ||
<> CBOR.encodeListLen 2 | ||
<> CBOR.encodeInt (fromIntegral magic ) | ||
<> CBOR.encodeBool mode | ||
|
||
handshakeReq :: [NodeToNodeVersion] -> ByteString | ||
handshakeReq [] = BL.empty | ||
handshakeReq versions = CBOR.toLazyByteString $ handshakeReqEnc versions | ||
|
||
data HandshakeFailure = UnknownVersionInRsp Word | ||
| UnknownKey Word | ||
| UnknownTag Word | ||
| VersionMissmath [Word] | ||
| DecodeError Word String | ||
| Refused Word String | ||
deriving Show | ||
|
||
newtype KeepAliveFailure = KeepAliveFailureKey Word deriving Show | ||
|
||
keepAliveRspDec :: CBOR.Decoder s (Either KeepAliveFailure Word16) | ||
keepAliveRspDec = do | ||
key <- CBOR.decodeWord | ||
case key of | ||
1 -> Right <$> CBOR.decodeWord16 | ||
k -> return $ Left $ KeepAliveFailureKey k | ||
|
||
|
||
handshakeDec :: CBOR.Decoder s (Either HandshakeFailure NodeToNodeVersion) | ||
handshakeDec = do | ||
_ <- CBOR.decodeListLen | ||
key <- CBOR.decodeWord | ||
case key of | ||
1 -> do | ||
decodeVersion | ||
2 -> do | ||
_ <- CBOR.decodeListLen | ||
tag <- CBOR.decodeWord | ||
case tag of | ||
0 -> do -- VersionMismatch | ||
len <- CBOR.decodeListLen | ||
x <- replicateM len CBOR.decodeWord | ||
return $ Left $ VersionMissmath x | ||
1 -> do -- HandshakeDecodeError | ||
vn <- CBOR.decodeWord | ||
msg <- unpack <$> CBOR.decodeString | ||
return $ Left $ DecodeError vn msg | ||
2 -> do -- Refused | ||
vn <- CBOR.decodeWord | ||
msg <- unpack <$> CBOR.decodeString | ||
return $ Left $ Refused vn msg | ||
_ -> return $ Left $ UnknownTag tag | ||
|
||
k -> return $ Left $ UnknownKey k | ||
where | ||
decodeVersion = do | ||
version <- CBOR.decodeWord | ||
case version of | ||
1 -> Right . NodeToNodeVersionV1 <$> CBOR.decodeWord32 | ||
2 -> Right . NodeToNodeVersionV2 <$> CBOR.decodeWord32 | ||
3 -> Right . NodeToNodeVersionV3 <$> CBOR.decodeWord32 | ||
4 -> do | ||
magic <- CBOR.decodeWord32 | ||
Right . NodeToNodeVersionV4 magic <$> CBOR.decodeBool | ||
v -> return $ Left $ UnknownVersionInRsp v | ||
|
||
|
||
wrap :: MiniProtocolNum -> MiniProtocolDir -> BL.ByteString -> MuxSDU | ||
wrap ptclNum ptclDir blob = MuxSDU { | ||
msHeader = MuxSDUHeader { | ||
mhTimestamp = RemoteClockModel 0, | ||
mhNum = ptclNum, | ||
mhDir = ptclDir, | ||
mhLength = fromIntegral $ BL.length blob | ||
} | ||
, msBlob = blob | ||
} | ||
|
||
pingClient :: Bool -> AddrInfo -> IO () | ||
pingClient quiet peer = bracket | ||
(Socket.socket Socket.AF_INET Socket.Stream Socket.defaultProtocol) | ||
Socket.close | ||
(\sd -> withTimeoutSerial $ \timeoutfn -> do | ||
!t0_s <- getMonotonicTime | ||
Socket.connect sd (Socket.addrAddress peer) | ||
!t0_e <- getMonotonicTime | ||
(Just host, Just port) <- Socket.getNameInfo [Socket.NI_NUMERICHOST, Socket.NI_NUMERICSERV] | ||
True True (Socket.addrAddress peer) | ||
let peerStr = host ++ ":" ++ port | ||
unless quiet $ printf "%s TCP rtt: %.3f\n" peerStr $ toSample t0_e t0_s | ||
let timeout = 30 | ||
bearer = socketAsMuxBearer timeout nullTracer sd | ||
|
||
!t1_s <- write bearer timeoutfn $ wrap handshakeNum InitiatorDir | ||
(handshakeReq [ NodeToNodeVersionV1 mainnetMagic | ||
, NodeToNodeVersionV2 mainnetMagic | ||
, NodeToNodeVersionV3 mainnetMagic | ||
, NodeToNodeVersionV4 mainnetMagic False | ||
]) | ||
(msg, !t1_e) <- nextMsg bearer timeoutfn handshakeNum | ||
unless quiet $ printf "%s handshake rtt: %s\n" peerStr (show $ diffTime t1_e t1_s) | ||
case CBOR.deserialiseFromBytes handshakeDec msg of | ||
Left err -> do | ||
eprint $ printf "%s Decoding error %s\n" peerStr (show err) | ||
Right (_, Left err) -> do | ||
eprint $ printf "%s Protocol error %s\n" peerStr (show err) | ||
Right (_, Right version) -> do | ||
unless quiet $ printf "%s Negotiated version %s\n" peerStr (show version) | ||
case version of | ||
(NodeToNodeVersionV1 _) -> return () | ||
(NodeToNodeVersionV2 _) -> return () | ||
_ -> do | ||
keepAlive bearer timeoutfn peerStr (tdigest []) 0 | ||
|
||
) | ||
where | ||
toSample :: Time -> Time -> Double | ||
toSample t_e t_s = realToFrac $ diffTime t_e t_s | ||
|
||
quantile' :: Double -> TDigest 5 -> Double | ||
quantile' q td = fromMaybe 0 (quantile q td) | ||
|
||
mean' :: TDigest 5 -> Double | ||
mean' td = fromMaybe 0 (mean td) | ||
|
||
stddev' :: TDigest 5 -> Double | ||
stddev' td = fromMaybe 0 (stddev td) | ||
|
||
eprint :: String -> IO () | ||
eprint = hPutStr stderr | ||
|
||
nextMsg :: MuxBearer IO -> TimeoutFn IO -> MiniProtocolNum -> IO (BL.ByteString, Time) | ||
nextMsg bearer timeoutfn ptclNum = do | ||
(sdu, t_e) <- Network.Mux.Types.read bearer timeoutfn | ||
if mhNum (msHeader sdu) == ptclNum | ||
then return (msBlob sdu, t_e) | ||
else nextMsg bearer timeoutfn ptclNum | ||
|
||
|
||
keepAlive :: MuxBearer IO -> TimeoutFn IO -> String -> TDigest 5 -> Word16 -> IO () | ||
keepAlive bearer timeoutfn peerStr td cookie = do | ||
!t_s <- write bearer timeoutfn $ wrap keepaliveNum InitiatorDir (keepAliveReq cookie) | ||
(!msg, !t_e) <- nextMsg bearer timeoutfn keepaliveNum | ||
let rtt = toSample t_e t_s | ||
td' = insert rtt td | ||
case CBOR.deserialiseFromBytes keepAliveRspDec msg of | ||
Left err -> eprint $ printf "%s keepalive decoding error %s\n" peerStr (show err) | ||
Right (_, Left err) -> eprint $ printf "%s keepalive protocol error %s\n" peerStr (show err) | ||
Right (_, Right cookie') -> do | ||
when (cookie' /= cookie) $ eprint $ printf "%s cookie missmatch %d /= %d\n" | ||
peerStr cookie' cookie | ||
|
||
now <- getCurrentTime | ||
printf "%32s, %20s, %7d, %7.3f, %7.3f, %7.3f, %7.3f, %7.3f, %7.3f, %7.3f\n" | ||
(show now) peerStr cookie rtt (quantile' 0.5 td') (quantile' 0.9 td') (mean' td') | ||
(minimumValue td') (maximumValue td') (stddev' td') | ||
|
||
threadDelay 1 | ||
keepAlive bearer timeoutfn peerStr td' (cookie + 1) | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters