Skip to content

Commit

Permalink
mini-protocols: trace termination message
Browse files Browse the repository at this point in the history
  • Loading branch information
coot authored and karknu committed Jan 26, 2021
1 parent 215e6b1 commit fb3aca0
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 13 deletions.
Expand Up @@ -473,7 +473,7 @@ chainSyncClient mkPipelineDecision0 tracer cfg
continueWithState uis $
intersectFound (castPoint i) (Their theirTip')
, recvMsgIntersectNotFound = \theirTip' ->
return $ terminate $
terminate $
mkResult
(ourTipFromChain ourFrag)
(Their theirTip')
Expand Down Expand Up @@ -884,15 +884,17 @@ chainSyncClient mkPipelineDecision0 tracer cfg

-- | Gracefully terminate the connection with the upstream node with the
-- given result.
terminate :: ChainSyncClientResult -> Consensus (ClientPipelinedStIdle 'Z) blk m
terminate = SendMsgDone
terminate :: ChainSyncClientResult -> m (Consensus (ClientPipelinedStIdle 'Z) blk m)
terminate res = do
traceWith tracer (TraceTermination res)
pure (SendMsgDone res)

-- | Same as 'terminate', but first 'drainThePipe'.
terminateAfterDrain :: Nat n -> ChainSyncClientResult -> m (Consensus (ClientPipelinedStIdle n) blk m)
terminateAfterDrain n result =
continueWithState ()
$ drainThePipe n
$ Stateful $ const $ return $ terminate result
$ Stateful $ const $ terminate result

-- | Disconnect from the upstream node by throwing the given exception.
-- The cleanup is handled in 'bracketChainSyncClient'.
Expand Down Expand Up @@ -1233,6 +1235,8 @@ data TraceChainSyncClientEvent blk
-- candidate's chain.
| TraceException ChainSyncClientException
-- ^ An exception was thrown by the Chain Sync Client.
| TraceTermination ChainSyncClientResult
-- ^ The client has terminated.

deriving instance ( BlockSupportsProtocol blk
, Eq (ValidationErr (BlockProtocol blk))
Expand Down
Expand Up @@ -3,6 +3,7 @@
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}

module Ouroboros.Network.Protocol.TxSubmission.Direct (
directPipelined
Expand Down Expand Up @@ -39,8 +40,7 @@ directPipelined (TxSubmissionServerPipelined mserver)
SendMsgReplyTxIds (BlockingReply txids) client' -> do
server' <- serverNext txids
directSender q server' client'
SendMsgDone b ->
return (a, b)
SendMsgDone b -> (,b) <$> a

directSender q (SendMsgRequestTxIdsPipelined ackNo reqNo serverNext)
ClientStIdle{recvMsgRequestTxIds} = do
Expand Down
Expand Up @@ -250,7 +250,7 @@ txSubmissionServer tracer txId maxUnacked maxTxIdsToRequest maxTxToRequest =
SendMsgRequestTxIdsBlocking
(numTxsToAcknowledge st)
numTxIdsToRequest
accum -- result if the client reports we're done
(pure accum) -- result if the client reports we're done
(\txids -> do
traceWith tracer (EventRequestTxIdsBlocking st (numTxsToAcknowledge st) numTxIdsToRequest)
handleReply accum Zero st {
Expand Down
Expand Up @@ -64,7 +64,7 @@ data ServerStIdle (n :: N) txid tx m a where
SendMsgRequestTxIdsBlocking
:: Word16 -- ^ number of txids to acknowledge
-> Word16 -- ^ number of txids to request
-> a -- ^ Result if done
-> m a -- ^ Result if done
-> (NonEmpty (txid, TxSizeInBytes)
-> m (ServerStIdle Z txid tx m a))
-> ServerStIdle Z txid tx m a
Expand Down Expand Up @@ -121,7 +121,7 @@ txSubmissionServerPeerPipelined (TxSubmissionServerPipelined server) =
(ClientAgency (TokTxIds TokBlocking)) $ \msg ->
case msg of
MsgDone ->
SenderDone TokDone kDone
SenderEffect (SenderDone TokDone <$> kDone)

MsgReplyTxIds (BlockingReply txids) ->
SenderEffect (go <$> k txids)
Expand Down
12 changes: 8 additions & 4 deletions ouroboros-network/src/Ouroboros/Network/TxSubmission/Inbound.hs
Expand Up @@ -31,7 +31,7 @@ import Control.Monad (unless)
import Control.Monad.Class.MonadSTM
import Control.Monad.Class.MonadSTM.Strict (checkInvariant)
import Control.Monad.Class.MonadThrow
import Control.Tracer (Tracer)
import Control.Tracer (Tracer, traceWith)

import Network.TypedProtocol.Pipelined (N, Nat (..))

Expand Down Expand Up @@ -63,7 +63,10 @@ data TxSubmissionMempoolWriter txid tx idx m =
mempoolAddTxs :: [tx] -> m [txid]
}

data TraceTxSubmissionInbound txid tx = TraceTxSubmissionInbound --TODO
-- TODO: extend tracing issue #2615
data TraceTxSubmissionInbound txid tx
-- | Server received 'MsgDone'
= ClientTerminated
deriving Show

data TxSubmissionProtocolError =
Expand Down Expand Up @@ -163,7 +166,7 @@ txSubmissionInbound
-> TxSubmissionMempoolWriter txid tx idx m
-> NodeToNodeVersion
-> TxSubmissionServerPipelined txid tx m ()
txSubmissionInbound _tracer maxUnacked mpReader mpWriter _version =
txSubmissionInbound tracer maxUnacked mpReader mpWriter _version =
TxSubmissionServerPipelined $
continueWithStateM (serverIdle Zero) initialServerState
where
Expand Down Expand Up @@ -204,7 +207,8 @@ txSubmissionInbound _tracer maxUnacked mpReader mpWriter _version =
SendMsgRequestTxIdsBlocking
(numTxsToAcknowledge st)
numTxIdsToRequest
() -- Our result if the client terminates the protocol
-- Our result if the client terminates the protocol
(traceWith tracer ClientTerminated)
( collectAndContinueWithState (handleReply Zero) st {
numTxsToAcknowledge = 0,
requestedTxIdsInFlight = numTxIdsToRequest
Expand Down

0 comments on commit fb3aca0

Please sign in to comment.