Skip to content
Permalink
Browse files

Merge #1016

1016: Added handshake tracers r=coot a=coot

* in 'withServerNode' and friends
* in 'RunNetworkArgs' inside ouroboros-consensus

Co-authored-by: Marcin Szamotulski <profunctor@pm.me>
  • Loading branch information...
iohk-bors and coot committed Sep 11, 2019
2 parents 7ebe200 + cf1ab60 commit f110dd802655c5c4a8100e546d8a02691e24d2db
@@ -35,6 +35,8 @@ import Data.ByteString.Lazy (ByteString)
import Data.Proxy (Proxy (..))
import Data.Time.Clock (secondsToDiffTime)
import Network.Socket as Socket
import qualified Codec.CBOR.Term as CBOR
import qualified Codec.CBOR.Read as CBOR

import Control.Monad.Class.MonadAsync

@@ -239,6 +241,15 @@ data RunNetworkArgs peer blk = RunNetworkArgs
{ rnaIpSubscriptionTracer :: Tracer IO (WithIPList (SubscriptionTrace Socket.SockAddr))
-- ^ IP subscription tracer
, rnaDnsSubscriptionTracer :: Tracer IO (WithDomainName (SubscriptionTrace Socket.SockAddr))
, rnaHandshakeTracer :: Tracer IO (TraceSendRecv
(Handshake NodeToNodeVersion CBOR.Term)
peer
(DecoderFailureOrTooMuchInput CBOR.DeserialiseFailure))
-- ^ Handshake protocol tracer
, rnaHandshakeLocalTracer :: Tracer IO (TraceSendRecv
(Handshake NodeToClientVersion CBOR.Term)
peer
(DecoderFailureOrTooMuchInput CBOR.DeserialiseFailure))
-- ^ DNS subscription tracer
, rnaDnsResolverTracer :: Tracer IO (WithDomainName DnsTrace)
-- ^ DNS resolver tracer
@@ -297,6 +308,7 @@ initNetwork registry nodeArgs kernel RunNetworkArgs{..} = do
runLocalServer = do
(connTable :: ConnectionTable IO Socket.SockAddr) <- newConnectionTable
NodeToClient.withServer_V1
rnaHandshakeLocalTracer
connTable
rnaMyLocalAddr
rnaMkPeer
@@ -307,6 +319,7 @@ initNetwork registry nodeArgs kernel RunNetworkArgs{..} = do
runPeerServer :: ConnectionTable IO Socket.SockAddr -> IO ()
runPeerServer connTable =
NodeToNode.withServer_V1
rnaHandshakeTracer
connTable
rnaMyAddr
rnaMkPeer
@@ -317,7 +330,7 @@ initNetwork registry nodeArgs kernel RunNetworkArgs{..} = do
runIpSubscriptionWorker :: ConnectionTable IO Socket.SockAddr -> IO ()
runIpSubscriptionWorker connTable = ipSubscriptionWorker_V1
rnaIpSubscriptionTracer
nullTracer -- TODO add hanshake protocol tracer to 'ProtocolTracers'
rnaHandshakeTracer
rnaMkPeer
connTable
-- the comments in dnsSbuscriptionWorker call apply
@@ -329,15 +342,15 @@ initNetwork registry nodeArgs kernel RunNetworkArgs{..} = do
, ispValency = length rnaIpProducers
}
nodeToNodeVersionData
(initiatorNetworkApplication networkApps)
(initiatorNetworkApplication networkApps)

runDnsSubscriptionWorker :: ConnectionTable IO Socket.SockAddr
-> DnsSubscriptionTarget
-> IO ()
runDnsSubscriptionWorker connTable dnsProducer = dnsSubscriptionWorker_V1
rnaDnsSubscriptionTracer
rnaDnsResolverTracer
nullTracer -- TODO add hanshake protocol tracer to 'ProtocolTracers'
rnaHandshakeTracer
rnaMkPeer
connTable
-- IPv4 address
@@ -182,6 +182,7 @@ serverPingPong :: IO ()
serverPingPong = do
tbl <- newConnectionTable
withServerNode
nullTracer
tbl
defaultLocalSocketAddrInfo
(\(DictVersion codec)-> encodeTerm codec)
@@ -281,6 +282,7 @@ serverPingPong2 :: IO ()
serverPingPong2 = do
tbl <- newConnectionTable
withServerNode
nullTracer
tbl
defaultLocalSocketAddrInfo
(\(DictVersion codec)-> encodeTerm codec)
@@ -354,6 +356,7 @@ serverChainSync :: FilePath -> IO ()
serverChainSync sockAddr = do
tbl <- newConnectionTable
withServerNode
nullTracer
tbl
(mkLocalSocketAddrInfo sockAddr)
(\(DictVersion codec)-> encodeTerm codec)
@@ -551,6 +554,7 @@ serverBlockFetch :: FilePath -> IO ()
serverBlockFetch sockAddr = do
tbl <- newConnectionTable
withServerNode
nullTracer
tbl
(mkLocalSocketAddrInfo sockAddr)
(\(DictVersion codec)-> encodeTerm codec)
@@ -23,6 +23,9 @@ module Ouroboros.Network.NodeToClient (
-- * Re-exported clients
, chainSyncClientNull
, localTxSubmissionClientNull
, TraceSendRecv (..)
, DecoderFailureOrTooMuchInput
, Handshake
) where

import Control.Concurrent.Async (Async)
@@ -48,7 +51,7 @@ import Ouroboros.Network.Protocol.Handshake.Type
import Ouroboros.Network.Protocol.Handshake.Version
import Ouroboros.Network.Socket
import Network.TypedProtocol.Driver.ByteLimit (DecoderFailureOrTooMuchInput)
import Network.TypedProtocol.Driver (TraceSendRecv)
import Network.TypedProtocol.Driver (TraceSendRecv (..))
import Control.Tracer (Tracer)


@@ -170,7 +173,8 @@ connectTo_V1 tracer peeridFn versionData application =
--
withServer
:: HasResponder appType ~ True
=> ConnectionTable IO Socket.SockAddr
=> Tracer IO (TraceSendRecv (Handshake NodeToClientVersion CBOR.Term) peerid (DecoderFailureOrTooMuchInput DeserialiseFailure))
-> ConnectionTable IO Socket.SockAddr
-> Socket.AddrInfo
-> (Socket.SockAddr -> Socket.SockAddr -> peerid)
-- ^ create peerid from local address and remote address
@@ -179,8 +183,9 @@ withServer
(OuroborosApplication appType peerid NodeToClientProtocols IO BL.ByteString a b)
-> (Async () -> IO t)
-> IO t
withServer tbl addr peeridFn acceptVersion versions k =
withServer tracer tbl addr peeridFn acceptVersion versions k =
withServerNode
tracer
tbl
addr
(\(DictVersion codec) -> encodeTerm codec)
@@ -195,7 +200,8 @@ withServer tbl addr peeridFn acceptVersion versions k =
--
withServer_V1
:: (HasResponder appType ~ True)
=> ConnectionTable IO Socket.SockAddr
=> Tracer IO (TraceSendRecv (Handshake NodeToClientVersion CBOR.Term) peerid (DecoderFailureOrTooMuchInput DeserialiseFailure))
-> ConnectionTable IO Socket.SockAddr
-> Socket.AddrInfo
-> (Socket.SockAddr -> Socket.SockAddr -> peerid)
-- ^ create peerid from local address and remote address
@@ -208,9 +214,9 @@ withServer_V1
-- 'OuroborosInitiatorAndResponderApplication'.
-> (Async () -> IO t)
-> IO t
withServer_V1 tbl addr peeridFn versionData application =
withServer_V1 tracer tbl addr peeridFn versionData application =
withServer
tbl addr peeridFn
tracer tbl addr peeridFn
(\(DictVersion _) -> acceptEq)
(simpleSingletonVersions
NodeToClientV_1
@@ -38,6 +38,9 @@ module Ouroboros.Network.NodeToNode (
-- * Re-exports
, ConnectionTable
, newConnectionTable
, TraceSendRecv (..)
, DecoderFailureOrTooMuchInput
, Handshake
) where

import Control.Concurrent.Async (Async)
@@ -78,7 +81,7 @@ import Ouroboros.Network.Subscription.Dns ( DnsSubscriptionTarget (..)
, WithDomainName (..)
)
import Network.TypedProtocol.Driver.ByteLimit (DecoderFailureOrTooMuchInput)
import Network.TypedProtocol.Driver (TraceSendRecv)
import Network.TypedProtocol.Driver (TraceSendRecv (..))
import Control.Tracer (Tracer)


@@ -183,9 +186,9 @@ connectTo_V1
-> Maybe Socket.AddrInfo
-> Socket.AddrInfo
-> IO ()
connectTo_V1 tracer peeridFn versionData application localAddr remoteAddr =
connectTo_V1 handshakeTracer peeridFn versionData application localAddr remoteAddr =
connectTo
tracer peeridFn
handshakeTracer peeridFn
(simpleSingletonVersions
NodeToNodeV_1
versionData
@@ -199,16 +202,18 @@ connectTo_V1 tracer peeridFn versionData application localAddr remoteAddr =
--
withServer
:: HasResponder appType ~ True
=> ConnectionTable IO Socket.SockAddr
=> Tracer IO (TraceSendRecv (Handshake NodeToNodeVersion CBOR.Term) peerid (DecoderFailureOrTooMuchInput DeserialiseFailure))
-> ConnectionTable IO Socket.SockAddr
-> Socket.AddrInfo
-> (Socket.SockAddr -> Socket.SockAddr -> peerid)
-- ^ create peerid from local address and remote address
-> (forall vData. DictVersion vData -> vData -> vData -> Accept)
-> Versions NodeToNodeVersion DictVersion (OuroborosApplication appType peerid NodeToNodeProtocols IO BL.ByteString a b)
-> (Async () -> IO t)
-> IO t
withServer tbl addr peeridFn acceptVersion versions k =
withServer handshakeTracer tbl addr peeridFn acceptVersion versions k =
withServerNode
handshakeTracer
tbl
addr
(\(DictVersion codec) -> encodeTerm codec)
@@ -223,17 +228,18 @@ withServer tbl addr peeridFn acceptVersion versions k =
--
withServer_V1
:: HasResponder appType ~ True
=> ConnectionTable IO Socket.SockAddr
=> Tracer IO (TraceSendRecv (Handshake NodeToNodeVersion CBOR.Term) peerid (DecoderFailureOrTooMuchInput DeserialiseFailure))
-> ConnectionTable IO Socket.SockAddr
-> Socket.AddrInfo
-> (Socket.SockAddr -> Socket.SockAddr -> peerid)
-- ^ create peerid from local address and remote address
-> NodeToNodeVersionData
-> (OuroborosApplication appType peerid NodeToNodeProtocols IO BL.ByteString x y)
-> (Async () -> IO t)
-> IO t
withServer_V1 tbl addr peeridFn versionData application k =
withServer_V1 handshakeTracer tbl addr peeridFn versionData application k =
withServer
tbl addr peeridFn
handshakeTracer tbl addr peeridFn
(\(DictVersion _) -> acceptEq)
(simpleSingletonVersions
NodeToNodeV_1
@@ -51,7 +51,7 @@ import Data.Int

import qualified Network.Socket as Socket hiding (recv)

import Control.Tracer (nullTracer, Tracer)
import Control.Tracer (Tracer)

import Network.TypedProtocol.Driver.ByteLimit
import Network.TypedProtocol.Driver (TraceSendRecv)
@@ -113,7 +113,7 @@ connectToNode
-> Socket.AddrInfo
-- ^ remote address
-> IO ()
connectToNode encodeData decodeData tracer peeridFn versions localAddr remoteAddr =
connectToNode encodeData decodeData handshakeTracer peeridFn versions localAddr remoteAddr =
bracket
(Socket.socket (Socket.addrFamily remoteAddr) Socket.Stream Socket.defaultProtocol)
Socket.close
@@ -128,7 +128,7 @@ connectToNode encodeData decodeData tracer peeridFn versions localAddr remoteAdd
Just addr -> Socket.bind sd (Socket.addrAddress addr)
Nothing -> return ()
Socket.connect sd (Socket.addrAddress remoteAddr)
connectToNode' encodeData decodeData tracer peeridFn versions sd
connectToNode' encodeData decodeData handshakeTracer peeridFn versions sd
)

-- |
@@ -162,14 +162,14 @@ connectToNode'
-- ^ application to run over the connection
-> Socket.Socket
-> IO ()
connectToNode' encodeData decodeData tracer peeridFn versions sd = do
connectToNode' encodeData decodeData handshakeTracer peeridFn versions sd = do
peerid <- peeridFn <$> Socket.getSocketName sd <*> Socket.getPeerName sd
bearer <- Mx.socketAsMuxBearer sd
Mx.muxBearerSetState bearer Mx.Connected
mapp <- runPeerWithByteLimit
maxTransmissionUnit
BL.length
tracer
handshakeTracer
codecHandshake
peerid
(Mx.muxBearerAsControlChannel bearer Mx.ModeInitiator)
@@ -226,13 +226,14 @@ beginConnection
, Typeable vNumber
, Show vNumber
)
=> (forall vData. extra vData -> vData -> CBOR.Term)
=> Tracer IO (TraceSendRecv (Handshake vNumber CBOR.Term) peerid (DecoderFailureOrTooMuchInput DeserialiseFailure))
-> (forall vData. extra vData -> vData -> CBOR.Term)
-> (forall vData. extra vData -> CBOR.Term -> Either Text vData)
-> (forall vData. extra vData -> vData -> vData -> Accept)
-> (addr -> st -> STM.STM (AcceptConnection st vNumber extra peerid ptcl IO BL.ByteString))
-- ^ either accept or reject a connection.
-> Server.BeginConnection addr Socket.Socket st ()
beginConnection encodeData decodeData acceptVersion fn addr st = do
beginConnection handshakeTracer encodeData decodeData acceptVersion fn addr st = do
accept <- fn addr st
case accept of
AcceptConnection st' peerid versions -> pure $ Server.Accept st' $ \sd -> do
@@ -241,7 +242,7 @@ beginConnection encodeData decodeData acceptVersion fn addr st = do
mapp <- runPeerWithByteLimit
maxTransmissionUnit
BL.length
nullTracer
handshakeTracer
codecHandshake
peerid
(Mx.muxBearerAsControlChannel bearer Mx.ModeResponder)
@@ -312,7 +313,8 @@ runNetworkNode'
, Typeable vNumber
, Show vNumber
)
=> ConnectionTable IO Socket.SockAddr
=> Tracer IO (TraceSendRecv (Handshake vNumber CBOR.Term) peerid (DecoderFailureOrTooMuchInput DeserialiseFailure))
-> ConnectionTable IO Socket.SockAddr
-> Socket.Socket
-> (forall vData. extra vData -> vData -> CBOR.Term)
-> (forall vData. extra vData -> CBOR.Term -> Either Text vData)
@@ -324,8 +326,8 @@ runNetworkNode'
-> Server.Main st t
-> st
-> IO t
runNetworkNode' tbl sd encodeData decodeData acceptVersion acceptException acceptConn complete
main st = Server.run (fromSocket tbl sd) acceptException (beginConnection encodeData decodeData
runNetworkNode' handshakeTracer tbl sd encodeData decodeData acceptVersion acceptException acceptConn complete
main st = Server.run (fromSocket tbl sd) acceptException (beginConnection handshakeTracer encodeData decodeData
acceptVersion acceptConn) complete main st


@@ -360,7 +362,8 @@ withServerNode
, Typeable vNumber
, Show vNumber
)
=> ConnectionTable IO Socket.SockAddr
=> Tracer IO (TraceSendRecv (Handshake vNumber CBOR.Term) peerid (DecoderFailureOrTooMuchInput DeserialiseFailure))
-> ConnectionTable IO Socket.SockAddr
-> Socket.AddrInfo
-> (forall vData. extra vData -> vData -> CBOR.Term)
-> (forall vData. extra vData -> CBOR.Term -> Either Text vData)
@@ -375,11 +378,12 @@ withServerNode
-- Note: the server thread will terminate when the callback returns or
-- throws an exception.
-> IO t
withServerNode tbl addr encodeData decodeData peeridFn acceptVersion versions k =
withServerNode handshakeTracer tbl addr encodeData decodeData peeridFn acceptVersion versions k =
bracket (mkListeningSocket (Socket.addrFamily addr) (Just $ Socket.addrAddress addr)) Socket.close $ \sd -> do
addr' <- Socket.getSocketName sd
withAsync
(runNetworkNode'
handshakeTracer
tbl
sd
encodeData
@@ -233,6 +233,7 @@ prop_socket_send_recv initiatorAddr responderAddr f xs = do

res <-
withServerNode
nullTracer
tbl
responderAddr
(\(DictVersion codec) -> encodeTerm codec)
@@ -395,6 +396,7 @@ demo chain0 updates = do
encode decode

withServerNode
nullTracer
tbl
producerAddress
(\(DictVersion codec)-> encodeTerm codec)
@@ -544,6 +544,7 @@ prop_send_recv f xs first = ioProperty $ do

withDummyServer faultyAddress $
withServerNode
nullTracer
tbl
responderAddr
(\(DictVersion codec) -> encodeTerm codec)
@@ -666,6 +667,7 @@ prop_send_recv_init_and_rsp f xs = ioProperty $ do
)

startPassiveServer tbl responderAddr localAddrVar rrcfg = withServerNode
nullTracer
tbl
responderAddr
(\(DictVersion codec) -> encodeTerm codec)
@@ -681,6 +683,7 @@ prop_send_recv_init_and_rsp f xs = ioProperty $ do
return r

startActiveServer tbl responderAddr localAddrVar remoteAddrVar rrcfg = withServerNode
nullTracer
tbl
responderAddr
(\(DictVersion codec) -> encodeTerm codec)
@@ -779,6 +782,7 @@ _demo = ioProperty $ do

spawnServer tbl addr delay =
void $ async $ withServerNode
nullTracer
tbl
addr
(\(DictVersion codec) -> encodeTerm codec)

0 comments on commit f110dd8

Please sign in to comment.
You can’t perform that action at this time.