Skip to content

Commit

Permalink
CAD-2487 Traces for load discrimination metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
newhoggy committed Jan 26, 2021
1 parent 4f2e9ca commit d1cbe85
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 22 deletions.
Expand Up @@ -2,21 +2,21 @@
{-# LANGUAGE EmptyDataDeriving #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE PatternSynonyms #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE UndecidableInstances #-}

module Ouroboros.Consensus.MiniProtocol.BlockFetch.Server
( blockFetchServer
-- * Trace events
, TraceBlockFetchServerEvent
, TraceBlockFetchServerEvent(..)
-- * Exceptions
, BlockFetchServerException
) where

import Control.Tracer (Tracer)
import Control.Tracer (Tracer, traceWith)
import Data.Typeable (Typeable)

import Ouroboros.Network.Block (Serialised (..))
Expand Down Expand Up @@ -76,7 +76,7 @@ blockFetchServer
-> NodeToNodeVersion
-> ResourceRegistry m
-> BlockFetchServer (Serialised blk) (Point blk) m ()
blockFetchServer _tracer chainDB _version registry = senderSide
blockFetchServer tracer chainDB _version registry = senderSide
where
senderSide :: BlockFetchServer (Serialised blk) (Point blk) m ()
senderSide = BlockFetchServer receiveReq' ()
Expand Down Expand Up @@ -114,7 +114,8 @@ blockFetchServer _tracer chainDB _version registry = senderSide
sendBlocks it = do
next <- ChainDB.iteratorNext it
case next of
IteratorResult blk ->
IteratorResult blk -> do
traceWith tracer TraceBlockFetchServerSendBlock
return $ SendMsgBlock (withoutPoint blk) (sendBlocks it)
IteratorExhausted -> do
ChainDB.iteratorClose it
Expand All @@ -129,7 +130,7 @@ blockFetchServer _tracer chainDB _version registry = senderSide
-------------------------------------------------------------------------------}

-- | Events traced by the Block Fetch Server.
data TraceBlockFetchServerEvent blk
-- TODO no events yet. Tracing the messages send/received over the network
-- might be all we need?
data TraceBlockFetchServerEvent blk =
-- | The server sent a block to the peer.
TraceBlockFetchServerSendBlock
deriving (Eq, Show)
@@ -1,6 +1,5 @@
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}

Expand Down Expand Up @@ -118,32 +117,34 @@ chainSyncServerForFollower tracer chainDB flr =
tip <- atomically $ ChainDB.getCurrentTip chainDB
traceWith tracer $
TraceChainSyncServerRead tip (point <$> update)
return $ Left $ sendNext tip (withoutPoint <$> update)
Left <$> sendNext tip update
Nothing -> return $ Right $ do
-- Follower is at the head, we have to block and wait for the chain to
-- change.
update <- ChainDB.followerInstructionBlocking flr
tip <- atomically $ ChainDB.getCurrentTip chainDB
traceWith tracer $
TraceChainSyncServerReadBlocked tip (point <$> update)
return $ sendNext tip (withoutPoint <$> update)
sendNext tip update

sendNext :: Tip blk
-> ChainUpdate blk b
-> ServerStNext b (Point blk) (Tip blk) m ()
-> ChainUpdate blk (WithPoint blk b)
-> m (ServerStNext b (Point blk) (Tip blk) m ())
sendNext tip update = case update of
AddBlock hdr -> SendMsgRollForward hdr tip idle'
RollBack pt -> SendMsgRollBackward pt tip idle'
AddBlock hdr -> do
traceWith tracer (TraceChainSyncRollForward (point hdr))
return $ SendMsgRollForward (withoutPoint hdr) tip idle'
RollBack pt -> return $ SendMsgRollBackward pt tip idle'

handleFindIntersect :: [Point blk]
-> m (ServerStIntersect b (Point blk) (Tip blk) m ())
handleFindIntersect points = do
-- TODO guard number of points
changed <- ChainDB.followerForward flr points
tip <- atomically $ ChainDB.getCurrentTip chainDB
return $ case changed of
Just pt -> SendMsgIntersectFound pt tip idle'
Nothing -> SendMsgIntersectNotFound tip idle'
case changed of
Just pt -> return $ SendMsgIntersectFound pt tip idle'
Nothing -> return $ SendMsgIntersectNotFound tip idle'

{-------------------------------------------------------------------------------
Trace events
Expand All @@ -156,4 +157,6 @@ chainSyncServerForFollower tracer chainDB flr =
data TraceChainSyncServerEvent blk
= TraceChainSyncServerRead (Tip blk) (ChainUpdate blk (Point blk))
| TraceChainSyncServerReadBlocked (Tip blk) (ChainUpdate blk (Point blk))
| TraceChainSyncRollForward (Point blk)
| TraceChainSyncRollBackward (Point blk)
deriving (Eq, Show)
19 changes: 15 additions & 4 deletions ouroboros-network/src/Ouroboros/Network/TxSubmission/Inbound.hs
Expand Up @@ -31,7 +31,7 @@ import Control.Monad (unless)
import Control.Monad.Class.MonadSTM
import Control.Monad.Class.MonadSTM.Strict (checkInvariant)
import Control.Monad.Class.MonadThrow
import Control.Tracer (Tracer)
import Control.Tracer (Tracer, traceWith)

import Network.TypedProtocol.Pipelined (N, Nat (..))

Expand Down Expand Up @@ -63,7 +63,13 @@ data TxSubmissionMempoolWriter txid tx idx m =
mempoolAddTxs :: [tx] -> m [txid]
}

data TraceTxSubmissionInbound txid tx = TraceTxSubmissionInbound --TODO
data TraceTxSubmissionInbound txid tx =
-- | Transactions just about to be inserted.
TraceTxSubmissionCollected !Int
-- | Just accepted this many transactions.
| TraceTxSubmissionAccepted !Int
-- | Just rejected this many transactions.
| TraceTxSubmissionRejected !Int
deriving Show

data TxSubmissionProtocolError =
Expand Down Expand Up @@ -163,7 +169,7 @@ txSubmissionInbound
-> TxSubmissionMempoolWriter txid tx idx m
-> NodeToNodeVersion
-> TxSubmissionServerPipelined txid tx m ()
txSubmissionInbound _tracer maxUnacked mpReader mpWriter _version =
txSubmissionInbound tracer maxUnacked mpReader mpWriter _version =
TxSubmissionServerPipelined $
continueWithStateM (serverIdle Zero) initialServerState
where
Expand Down Expand Up @@ -324,8 +330,13 @@ txSubmissionInbound _tracer maxUnacked mpReader mpWriter _version =
bufferedTxs3 = forceElemsToWHNF $ bufferedTxs2 <>
(Map.fromList (zip live (repeat Nothing)))

let !collected = length txs

_writtenTxids <- mempoolAddTxs txsReady
txidsAccepted <- mempoolAddTxs txsReady

let !accepted = length txidsAccepted
traceWith tracer $ TraceTxSubmissionAccepted accepted
traceWith tracer $ TraceTxSubmissionRejected (collected - accepted)

continueWithStateM (serverIdle n) st {
bufferedTxs = bufferedTxs3,
Expand Down

0 comments on commit d1cbe85

Please sign in to comment.