Skip to content

Commit

Permalink
cardano-ping as a demo
Browse files Browse the repository at this point in the history
A mux demo that uses ouroboros-network's keepalive protocol
to mimic the ping command.
  • Loading branch information
karknu committed Oct 23, 2020
1 parent 95c9cd2 commit 27b06f3
Show file tree
Hide file tree
Showing 2 changed files with 375 additions and 0 deletions.
341 changes: 341 additions & 0 deletions network-mux/demo/cardano-ping.hs
@@ -0,0 +1,341 @@
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE RankNTypes #-}

module Main (main) where

import qualified Codec.CBOR.Encoding as CBOR
import qualified Codec.CBOR.Decoding as CBOR
import qualified Codec.CBOR.Read as CBOR
import qualified Codec.CBOR.Write as CBOR
import Control.Exception
import Control.Monad (replicateM, when, unless)
import Control.Monad.Class.MonadAsync
import Control.Monad.Class.MonadTime
import Control.Monad.Class.MonadTimer hiding (timeout)
import Control.Tracer (nullTracer)
import Data.Aeson
import Data.ByteString.Lazy (ByteString)
import qualified Data.ByteString.Lazy as BL
import qualified Data.ByteString.Lazy.Char8 as C8 (putStrLn)
import Data.Maybe (fromMaybe)
import Data.Word
import Data.TDigest
import Data.Text (unpack)
import Network.Socket ( AddrInfo)
import qualified Network.Socket as Socket
import Text.Printf
import System.Environment (getArgs)
import System.Console.GetOpt
import System.IO (hFlush, hPutStr, stderr, stdout)

import Network.Mux.Bearer.Socket
import Network.Mux.Types
import Network.Mux.Timeout

mainnetMagic :: Word32
mainnetMagic = 764824073

handshakeNum :: MiniProtocolNum
handshakeNum = MiniProtocolNum 0

keepaliveNum :: MiniProtocolNum
keepaliveNum = MiniProtocolNum 8

data Flag = HostF String | PortF String | MagicF String | QuietF | JsonF deriving Show

options :: [OptDescr Flag]
options = [
Option "h" ["h"] (ReqArg HostF "host") "hostname/ip, e.g relay.iohk.example",
Option "m" ["magic"] (ReqArg MagicF "magic") ("magic, defaults to " ++ show mainnetMagic),
Option "j" ["json"] (NoArg JsonF ) "json output flag",
Option "p" ["port"] (ReqArg PortF "port") "portnumber, e.g 1234",
Option "q" ["quiet"] (NoArg QuietF ) "quiet flag, csv/json only output."
]

getHost :: [Flag] -> String
getHost [] = error "Specify host/ip with '-h <hostname>'"
getHost (HostF h:_) = h
getHost (_:xs) = getHost xs

getPort :: [Flag] -> String
getPort [] = error "Specify port with '-p <port number>'"
getPort (PortF p:_) = p
getPort (_:xs) = getPort xs

getQuiet :: [Flag] -> Bool
getQuiet [] = False
getQuiet (QuietF:_) = True
getQuiet (_:xs) = getQuiet xs

getMagic :: [Flag] -> Word32
getMagic [] = mainnetMagic
getMagic (MagicF m:_) = Prelude.read m
getMagic (_:xs) = getMagic xs

getJson :: [Flag] -> Bool
getJson [] = False
getJson (JsonF:_) = True
getJson (_:xs) = getJson xs

main :: IO ()
main = do
args <- getArgs
let (actions, _, _ ) = getOpt RequireOrder options args
host = getHost actions
port = getPort actions
quiet = getQuiet actions
magic = getMagic actions
jsonF = getJson actions
hints = Socket.defaultHints { Socket.addrSocketType = Socket.Stream }
addresses <- Socket.getAddrInfo (Just hints) (Just host) (Just port)

unless jsonF $
putStrLn " timestamp, host, cookie, sample, median, p90, mean, min, max, std"

caids <- mapM (async . pingClient quiet magic jsonF) addresses
mapM_ wait caids

data NodeToNodeVersion = NodeToNodeVersionV1 Word32
| NodeToNodeVersionV2 Word32
| NodeToNodeVersionV3 Word32
| NodeToNodeVersionV4 Word32 Bool
deriving (Eq, Ord, Show)

keepAliveReqEnc :: Word16 -> CBOR.Encoding
keepAliveReqEnc cookie =
CBOR.encodeWord 0
<> CBOR.encodeWord16 cookie

keepAliveReq :: Word16 -> ByteString
keepAliveReq = CBOR.toLazyByteString . keepAliveReqEnc


handshakeReqEnc :: [NodeToNodeVersion] -> CBOR.Encoding
handshakeReqEnc [] = error "null version list"
handshakeReqEnc versions =
CBOR.encodeListLen 2
<> CBOR.encodeWord 0
<> CBOR.encodeMapLen (fromIntegral $ length versions)
<> mconcat [ encodeVersion v
| v <- versions
]
where
encodeVersion :: NodeToNodeVersion -> CBOR.Encoding
encodeVersion (NodeToNodeVersionV1 magic) =
CBOR.encodeWord 1
<> CBOR.encodeInt (fromIntegral magic)
encodeVersion (NodeToNodeVersionV2 magic) =
CBOR.encodeWord 2
<> CBOR.encodeInt (fromIntegral magic)
encodeVersion (NodeToNodeVersionV3 magic) =
CBOR.encodeWord 3
<> CBOR.encodeInt (fromIntegral magic)
encodeVersion (NodeToNodeVersionV4 magic mode) =
CBOR.encodeWord 4
<> CBOR.encodeListLen 2
<> CBOR.encodeInt (fromIntegral magic )
<> CBOR.encodeBool mode

handshakeReq :: [NodeToNodeVersion] -> ByteString
handshakeReq [] = BL.empty
handshakeReq versions = CBOR.toLazyByteString $ handshakeReqEnc versions

data HandshakeFailure = UnknownVersionInRsp Word
| UnknownKey Word
| UnknownTag Word
| VersionMissmath [Word]
| DecodeError Word String
| Refused Word String
deriving Show

newtype KeepAliveFailure = KeepAliveFailureKey Word deriving Show

keepAliveRspDec :: CBOR.Decoder s (Either KeepAliveFailure Word16)
keepAliveRspDec = do
key <- CBOR.decodeWord
case key of
1 -> Right <$> CBOR.decodeWord16
k -> return $ Left $ KeepAliveFailureKey k

handshakeDec :: CBOR.Decoder s (Either HandshakeFailure NodeToNodeVersion)
handshakeDec = do
_ <- CBOR.decodeListLen
key <- CBOR.decodeWord
case key of
1 -> do
decodeVersion
2 -> do
_ <- CBOR.decodeListLen
tag <- CBOR.decodeWord
case tag of
0 -> do -- VersionMismatch
len <- CBOR.decodeListLen
x <- replicateM len CBOR.decodeWord
return $ Left $ VersionMissmath x
1 -> do -- HandshakeDecodeError
vn <- CBOR.decodeWord
msg <- unpack <$> CBOR.decodeString
return $ Left $ DecodeError vn msg
2 -> do -- Refused
vn <- CBOR.decodeWord
msg <- unpack <$> CBOR.decodeString
return $ Left $ Refused vn msg
_ -> return $ Left $ UnknownTag tag

k -> return $ Left $ UnknownKey k
where
decodeVersion = do
version <- CBOR.decodeWord
case version of
1 -> Right . NodeToNodeVersionV1 <$> CBOR.decodeWord32
2 -> Right . NodeToNodeVersionV2 <$> CBOR.decodeWord32
3 -> Right . NodeToNodeVersionV3 <$> CBOR.decodeWord32
4 -> do
magic <- CBOR.decodeWord32
Right . NodeToNodeVersionV4 magic <$> CBOR.decodeBool
v -> return $ Left $ UnknownVersionInRsp v


wrap :: MiniProtocolNum -> MiniProtocolDir -> BL.ByteString -> MuxSDU
wrap ptclNum ptclDir blob = MuxSDU {
msHeader = MuxSDUHeader {
mhTimestamp = RemoteClockModel 0,
mhNum = ptclNum,
mhDir = ptclDir,
mhLength = fromIntegral $ BL.length blob
}
, msBlob = blob
}


data StatPoint = StatPoint {
spTimestamp :: !UTCTime
, spHost :: !String
, spCookie :: !Word16
, spSample :: !Double
, spMedian :: !Double
, spP90 :: !Double
, spMean :: !Double
, spMin :: !Double
, spMax :: !Double
, spStd :: !Double
}

instance Show StatPoint where
show StatPoint {..} =
printf "%36s, %20s, %7d, %7.3f, %7.3f, %7.3f, %7.3f, %7.3f, %7.3f, %7.3f"
(show spTimestamp) spHost spCookie spSample spMedian spP90 spMean spMin spMax spStd

instance ToJSON StatPoint where
toJSON StatPoint {..} =
object [ "timestamp" .= spTimestamp
, "host" .= spHost
, "cookie" .= spCookie
, "sample" .= spSample
, "median" .= spMedian
, "p90" .= spP90
, "mean" .= spMean
, "min" .= spMin
, "max" .= spMax
]

toStatPoint :: UTCTime -> String -> Word16 -> Double -> TDigest 5 -> StatPoint
toStatPoint ts host cookie sample td =
StatPoint {
spTimestamp = ts
, spHost = host
, spCookie = cookie
, spSample = sample
, spMedian = quantile' 0.5
, spP90 = quantile' 0.9
, spMean = mean'
, spMin = minimumValue td
, spMax = maximumValue td
, spStd = stddev'
}
where
quantile' :: Double -> Double
quantile' q = fromMaybe 0 (quantile q td)

mean' :: Double
mean' = fromMaybe 0 (mean td)

stddev' :: Double
stddev' = fromMaybe 0 (stddev td)


pingClient :: Bool -> Word32 -> Bool -> AddrInfo -> IO ()
pingClient quiet magic jsonF peer = bracket
(Socket.socket (Socket.addrFamily peer) Socket.Stream Socket.defaultProtocol)
Socket.close
(\sd -> withTimeoutSerial $ \timeoutfn -> do
!t0_s <- getMonotonicTime
Socket.connect sd (Socket.addrAddress peer)
!t0_e <- getMonotonicTime
(Just host, Just port) <- Socket.getNameInfo [Socket.NI_NUMERICHOST, Socket.NI_NUMERICSERV]
True True (Socket.addrAddress peer)
let peerStr = host ++ ":" ++ port
unless quiet $ printf "%s TCP rtt: %.3f\n" peerStr $ toSample t0_e t0_s
let timeout = 30
bearer = socketAsMuxBearer timeout nullTracer sd

!t1_s <- write bearer timeoutfn $ wrap handshakeNum InitiatorDir
(handshakeReq [ NodeToNodeVersionV1 magic
, NodeToNodeVersionV2 magic
, NodeToNodeVersionV3 magic
, NodeToNodeVersionV4 magic False
])
(msg, !t1_e) <- nextMsg bearer timeoutfn handshakeNum
unless quiet $ printf "%s handshake rtt: %s\n" peerStr (show $ diffTime t1_e t1_s)
case CBOR.deserialiseFromBytes handshakeDec msg of
Left err -> do
eprint $ printf "%s Decoding error %s\n" peerStr (show err)
Right (_, Left err) -> do
eprint $ printf "%s Protocol error %s\n" peerStr (show err)
Right (_, Right version) -> do
unless quiet $ printf "%s Negotiated version %s\n" peerStr (show version)
case version of
(NodeToNodeVersionV1 _) -> return ()
(NodeToNodeVersionV2 _) -> return ()
_ -> do
keepAlive bearer timeoutfn peerStr (tdigest []) 0

)
where
toSample :: Time -> Time -> Double
toSample t_e t_s = realToFrac $ diffTime t_e t_s

eprint :: String -> IO ()
eprint = hPutStr stderr

nextMsg :: MuxBearer IO -> TimeoutFn IO -> MiniProtocolNum -> IO (BL.ByteString, Time)
nextMsg bearer timeoutfn ptclNum = do
(sdu, t_e) <- Network.Mux.Types.read bearer timeoutfn
if mhNum (msHeader sdu) == ptclNum
then return (msBlob sdu, t_e)
else nextMsg bearer timeoutfn ptclNum

keepAlive :: MuxBearer IO -> TimeoutFn IO -> String -> TDigest 5 -> Word16 -> IO ()
keepAlive bearer timeoutfn peerStr td cookie = do
!t_s <- write bearer timeoutfn $ wrap keepaliveNum InitiatorDir (keepAliveReq cookie)
(!msg, !t_e) <- nextMsg bearer timeoutfn keepaliveNum
let rtt = toSample t_e t_s
td' = insert rtt td
case CBOR.deserialiseFromBytes keepAliveRspDec msg of
Left err -> eprint $ printf "%s keepalive decoding error %s\n" peerStr (show err)
Right (_, Left err) -> eprint $ printf "%s keepalive protocol error %s\n" peerStr (show err)
Right (_, Right cookie') -> do
when (cookie' /= cookie) $ eprint $ printf "%s cookie missmatch %d /= %d\n"
peerStr cookie' cookie

now <- getCurrentTime
let point = toStatPoint now peerStr cookie rtt td'
if jsonF
then C8.putStrLn (encode point)
else print point
hFlush stdout
threadDelay 1
keepAlive bearer timeoutfn peerStr td' (cookie + 1)
34 changes: 34 additions & 0 deletions network-mux/network-mux.cabal
Expand Up @@ -161,3 +161,37 @@ executable mux-demo
-Wpartial-fields
-Widentities
-Wredundant-constraints

executable cardano-ping
hs-source-dirs: demo, test
main-is: cardano-ping.hs
other-modules: Test.Mux.ReqResp
build-depends: base,
aeson,
directory,
network-mux,
io-sim-classes,
contra-tracer,
stm,

binary,
bytestring,
cborg,
network,
serialise,
tdigest,
text
if os(windows)
buildable: False
else
buildable: True
default-language: Haskell2010
ghc-options: -threaded
-Wall
-fno-ignore-asserts
-Wcompat
-Wincomplete-uni-patterns
-Wincomplete-record-updates
-Wpartial-fields
-Widentities
-Wredundant-constraints

0 comments on commit 27b06f3

Please sign in to comment.