Skip to content

Commit

Permalink
Remove Submission data type.
Browse files Browse the repository at this point in the history
This type was used to pass around lots of data and MVars
to places where they are not needed.
  • Loading branch information
MarcFontaine committed Jun 8, 2021
1 parent 6a3bb32 commit 80da745
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 96 deletions.
52 changes: 23 additions & 29 deletions bench/tx-generator/src/Cardano/Benchmarking/GeneratorTx.hs
Expand Up @@ -31,9 +31,11 @@ module Cardano.Benchmarking.GeneratorTx
import Cardano.Prelude
import Prelude (id, String)

import qualified Control.Concurrent.STM as STM
import Control.Monad (fail)
import Control.Monad.Trans.Except.Extra (left, newExceptT, right)
import Control.Tracer (Tracer, traceWith)
import qualified Data.Time.Clock as Clock

import qualified Data.List.NonEmpty as NE
import qualified Data.Map.Strict as Map
Expand Down Expand Up @@ -302,28 +304,27 @@ asyncBenchmark

traceDebug $ "******* Tx generator, launching Tx peers: " ++ show (NE.length remoteAddresses) ++ " of them"
liftIO $ do
submission :: Submission IO era <- mkSubmission traceSubmit $
SubmissionParams
{ spTps = tpsRate
, spTargets = numTargets
, spQueueLen = 32
, spErrorPolicy = errorPolicy
}
allAsyncs <- forM (zip [0..] $ NE.toList remoteAddresses) $
\(i, remoteAddr) ->
startTime <- Clock.getCurrentTime
txSendQueue <- STM.newTBQueueIO 32

reportRefs <- STM.atomically $ replicateM (fromIntegral numTargets) STM.newEmptyTMVar

allAsyncs <- forM (zip reportRefs $ NE.toList remoteAddresses) $
\(reportRef, remoteAddr) ->
launchTxPeer
traceSubmit
traceN2N
connectClient
remoteAddr
submission
i
tpsFeeder <- async $ tpsLimitedTxFeeder submission finalTransactions
txSendQueue
reportRef
errorPolicy
tpsFeeder <- async $ tpsLimitedTxFeeder traceSubmit numTargets txSendQueue tpsRate finalTransactions
let tpsFeederShutdown = do
cancel tpsFeeder
liftIO $ tpsLimitedTxFeederShutdown submission
liftIO $ tpsLimitedTxFeederShutdown numTargets txSendQueue

return (tpsFeeder, allAsyncs, mkSubmissionSummary threadName submission, tpsFeederShutdown)
return (tpsFeeder, allAsyncs, mkSubmissionSummary threadName startTime reportRefs, tpsFeederShutdown)

-- | At this moment 'sourceAddress' contains a huge amount of money (lets call it A).
-- Now we have to split this amount to N equal parts, as a result we'll have
Expand Down Expand Up @@ -439,36 +440,29 @@ txGenerator
-- Txs for submission.
---------------------------------------------------------------------------------------------------

-- | To get higher performance we need to hide latency of getting and
-- forwarding (in sufficient numbers) transactions.
--
-- TODO: transform comments into haddocks.
--
launchTxPeer
:: forall era
. IsShelleyBasedEra era
=> Tracer IO (TraceBenchTxSubmit TxId)
-> Tracer IO NodeToNodeSubmissionTrace
-> ConnectClient
-> Network.Socket.AddrInfo
-- Remote address
-> Submission IO era
-- Mutable state shared between submission threads
-> Natural
-- Thread index
-> TxSendQueue era
-> ReportRef
-> SubmissionErrorPolicy
-> IO (Async ())
launchTxPeer traceSubmit traceN2N connectClient remoteAddr sub tix =
launchTxPeer traceSubmit traceN2N connectClient remoteAddr txSendQueue reportRef errorPolicy =
async $
handle
(\(SomeException err) -> do
let errDesc = mconcat
[ "Exception while talking to peer #", show tix
[ "Exception while talking to peer "
, " (", show (addrAddress remoteAddr), "): "
, show err]
submitThreadReport sub tix (Left errDesc)
case spErrorPolicy $ sParams sub of
submitThreadReport reportRef (Left errDesc)
case errorPolicy of
FailOnError -> throwIO err
LogErrors -> traceWith traceSubmit $
TraceBenchTxSubError (pack errDesc))
$ connectClient remoteAddr
(txSubmissionClient traceN2N traceSubmit sub tix)
(txSubmissionClient traceN2N traceSubmit txSendQueue reportRef)
120 changes: 53 additions & 67 deletions bench/tx-generator/src/Cardano/Benchmarking/GeneratorTx/Submission.hs
Expand Up @@ -22,9 +22,9 @@

module Cardano.Benchmarking.GeneratorTx.Submission
( SubmissionParams(..)
, Submission(sParams)
, SubmissionThreadReport
, mkSubmission
, TxSendQueue
, ReportRef
, mkSubmissionSummary
, submitThreadReport
, txSubmissionClient
Expand Down Expand Up @@ -88,39 +88,18 @@ data SubmissionParams
= SubmissionParams
{ spTps :: !TPSRate
, spTargets :: !Natural
, spQueueLen :: !Natural
, spErrorPolicy :: !SubmissionErrorPolicy
}

data Submission (m :: Type -> Type) (era :: Type)
= Submission
{ sParams :: !SubmissionParams
, sStartTime :: !UTCTime
, sThreads :: !Natural
, sTxSendQueue :: !(TBQueue (Maybe (Tx era)))
, sReportsRefs :: ![STM.TMVar (Either String SubmissionThreadReport)]
, sTrace :: !(Tracer m (TraceBenchTxSubmit TxId))
}

mkSubmission
:: MonadIO m
=> Tracer m (TraceBenchTxSubmit TxId)
-> SubmissionParams
-> m (Submission m era)
mkSubmission sTrace sParams@SubmissionParams{spTargets=sThreads, spQueueLen} = liftIO $ do
sStartTime <- Clock.getCurrentTime
sTxSendQueue <- STM.newTBQueueIO spQueueLen
sReportsRefs <- STM.atomically $ replicateM (fromIntegral sThreads) STM.newEmptyTMVar
pure Submission{..}
type ReportRef = STM.TMVar (Either String SubmissionThreadReport)
type TxSendQueue era = TBQueue (Maybe (Tx era))

submitThreadReport
:: MonadIO m
=> Submission m era
-> Natural
=> ReportRef
-> Either String SubmissionThreadReport
-> m ()
submitThreadReport Submission{sReportsRefs} threadIx =
liftIO . STM.atomically . STM.putTMVar (sReportsRefs L.!! fromIntegral threadIx)
submitThreadReport ref report
= liftIO $ STM.atomically $ STM.putTMVar ref report

{-------------------------------------------------------------------------------
Results
Expand All @@ -135,20 +114,20 @@ data SubmissionThreadStats
data SubmissionThreadReport
= SubmissionThreadReport
{ strStats :: !SubmissionThreadStats
-- , strThreadIndex :: !Natural
, strEndOfProtocol :: !UTCTime
}

mkSubmissionSummary ::
String
-> Submission IO tx
-> UTCTime
-> [ReportRef]
-> IO SubmissionSummary
mkSubmissionSummary ssThreadName Submission{ sStartTime, sReportsRefs}
mkSubmissionSummary ssThreadName startTime reportsRefs
= do
results <- sequence (STM.atomically . STM.readTMVar <$> sReportsRefs)
results <- sequence (STM.atomically . STM.readTMVar <$> reportsRefs)
let (failures, reports) = partitionEithers results
now <- Clock.getCurrentTime
let ssElapsed = Clock.diffUTCTime now sStartTime
let ssElapsed = Clock.diffUTCTime now startTime
ssTxSent@(Sent sent) = sum $ stsSent . strStats <$> reports
ssTxUnavailable = sum $ stsUnavailable . strStats <$> reports
ssEffectiveTps = txDiffTimeTPS sent ssElapsed
Expand All @@ -164,53 +143,60 @@ mkSubmissionSummary ssThreadName Submission{ sStartTime, sReportsRefs}
threadReportTps
SubmissionThreadReport
{ strStats=SubmissionThreadStats{stsAcked=Ack ack}, strEndOfProtocol } =
txDiffTimeTPS ack (Clock.diffUTCTime strEndOfProtocol sStartTime)
txDiffTimeTPS ack (Clock.diffUTCTime strEndOfProtocol startTime)

{-------------------------------------------------------------------------------
Submission queue: feeding and consumption
-------------------------------------------------------------------------------}
simpleTxFeeder
:: forall m era
. (MonadIO m)
=> Submission m era -> [Tx era] -> m ()
simpleTxFeeder
Submission{sTrace, sThreads, sTxSendQueue} txs = do
simpleTxFeeder :: forall m era .
MonadIO m
=> Tracer m (TraceBenchTxSubmit TxId)
-> Natural
-> TxSendQueue era
-> [Tx era]
-> m ()
simpleTxFeeder tracer threads txSendQueue txs = do
forM_ (zip txs [0..]) feedTx
-- Issue the termination notifications.
replicateM_ (fromIntegral sThreads) $
liftIO $ STM.atomically $ STM.writeTBQueue sTxSendQueue Nothing
replicateM_ (fromIntegral threads) $
liftIO $ STM.atomically $ STM.writeTBQueue txSendQueue Nothing
where
feedTx :: (Tx era, Int) -> m ()
feedTx (tx, ix) = do
liftIO $ STM.atomically $ STM.writeTBQueue sTxSendQueue (Just tx)
traceWith sTrace $ TraceBenchTxSubServFed [getTxId $ getTxBody tx] ix

tpsLimitedTxFeederShutdown :: Submission m era -> IO ()
tpsLimitedTxFeederShutdown Submission{sThreads, sTxSendQueue }
liftIO $ STM.atomically $ STM.writeTBQueue txSendQueue (Just tx)
traceWith tracer $ TraceBenchTxSubServFed [getTxId $ getTxBody tx] ix

tpsLimitedTxFeederShutdown ::
Natural
-> TxSendQueue era
-> IO ()
tpsLimitedTxFeederShutdown threads txSendQueue
= STM.atomically $
replicateM_ (fromIntegral sThreads)
$ STM.writeTBQueue sTxSendQueue Nothing
replicateM_ (fromIntegral threads)
$ STM.writeTBQueue txSendQueue Nothing

tpsLimitedTxFeeder
:: forall m era . MonadIO m => Submission m era -> [Tx era] -> m ()
tpsLimitedTxFeeder submission txs = do
tpsLimitedTxFeeder :: forall m era .
MonadIO m
=> Tracer m (TraceBenchTxSubmit TxId)
-> Natural
-> TxSendQueue era
-> TPSRate
-> [Tx era] -> m ()
tpsLimitedTxFeeder tracer threads txSendQueue (TPSRate rate) txs = do
-- It would be nice to catch an AsyncException here and do a clean shutdown.
-- However this would require extra machineries because we are in MonadIO m not in IO ().
-- TODO: Move everything to IO () and avoid problems from over-polymorphism.
now <- liftIO Clock.getCurrentTime
foldM_ feedTx (now, 0) (zip txs [0..])
liftIO $ tpsLimitedTxFeederShutdown submission
liftIO $ tpsLimitedTxFeederShutdown threads txSendQueue
where
Submission{ sParams=SubmissionParams{spTps=TPSRate rate}
, sTrace
, sTxSendQueue } = submission

feedTx :: (UTCTime, NominalDiffTime)
-> (Tx era, Int)
-> m (UTCTime, NominalDiffTime)
feedTx (lastPreDelay, lastDelay) (tx, ix) = do
liftIO . STM.atomically $ STM.writeTBQueue sTxSendQueue (Just tx)
traceWith sTrace $ TraceBenchTxSubServFed [getTxId $ getTxBody tx] ix
liftIO . STM.atomically $ STM.writeTBQueue txSendQueue (Just tx)
traceWith tracer $ TraceBenchTxSubServFed [getTxId $ getTxBody tx] ix
now <- liftIO Clock.getCurrentTime
let targetDelay = realToFrac $ 1.0 / rate
loopCost = (now `Clock.diffUTCTime` lastPreDelay) - lastDelay
Expand All @@ -221,17 +207,17 @@ tpsLimitedTxFeeder submission txs = do
consumeTxs
:: forall m blk era
. (MonadIO m)
=> Submission m era -> TokBlockingStyle blk -> Req -> m (Bool, UnReqd (Tx era))
consumeTxs Submission{sTxSendQueue} blk req
=> TxSendQueue era -> TokBlockingStyle blk -> Req -> m (Bool, UnReqd (Tx era))
consumeTxs txSendQueue blk req
= liftIO . STM.atomically $ go blk req []
where
go :: TokBlockingStyle a -> Req -> [Tx era] -> STM (Bool, UnReqd (Tx era))
go _ 0 acc = pure (False, UnReqd acc)
go TokBlocking n acc = STM.readTBQueue sTxSendQueue >>=
go TokBlocking n acc = STM.readTBQueue txSendQueue >>=
\case
Nothing -> pure (True, UnReqd acc)
Just tx -> go TokBlocking (n - 1) (tx:acc)
go TokNonBlocking _ _ = STM.tryReadTBQueue sTxSendQueue >>=
go TokNonBlocking _ _ = STM.tryReadTBQueue txSendQueue >>=
\case
Nothing -> pure (False, UnReqd [])
Just Nothing -> pure (True, UnReqd [])
Expand All @@ -248,11 +234,11 @@ txSubmissionClient
)
=> Tracer m NodeToNodeSubmissionTrace
-> Tracer m (TraceBenchTxSubmit txid)
-> Submission m era
-> Natural
-> TxSendQueue era
-> ReportRef
-- This return type is forced by Ouroboros.Network.NodeToNode.connectTo
-> TxSubmissionClient gentxid gentx m ()
txSubmissionClient tr bmtr sub threadIx =
txSubmissionClient tr bmtr txSendQueue reportRef =
TxSubmissionClient $
pure $ client False (UnAcked []) (SubmissionThreadStats 0 0 0)
where
Expand Down Expand Up @@ -283,7 +269,7 @@ txSubmissionClient tr bmtr sub threadIx =

(exhausted, unReqd) <-
if done then pure (True, UnReqd [])
else consumeTxs sub blocking req
else consumeTxs txSendQueue blocking req

r' <- decideAnnouncement blocking ack unReqd unAcked
(ann@(ToAnnce annNow), newUnacked@(UnAcked outs), Acked acked)
Expand Down Expand Up @@ -345,7 +331,7 @@ txSubmissionClient tr bmtr sub threadIx =
submitReport strStats = do
strEndOfProtocol <- liftIO Clock.getCurrentTime
let report = SubmissionThreadReport{..}
submitThreadReport (sReportsRefs sub) threadIx (Right report)
submitThreadReport reportRef (Right report)
pure report

txToIdSize :: tx -> (gentxid, TxSizeInBytes)
Expand Down

0 comments on commit 80da745

Please sign in to comment.