Skip to content

Commit

Permalink
Provide benchmark progress report for all clients
Browse files Browse the repository at this point in the history
  • Loading branch information
abailly-iohk committed Sep 10, 2021
1 parent f789742 commit 6bbdbc1
Showing 1 changed file with 20 additions and 13 deletions.
33 changes: 20 additions & 13 deletions local-cluster/bench/Bench/EndToEnd.hs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import Control.Monad.Class.MonadAsync (mapConcurrently)
import Control.Monad.Class.MonadSTM (
MonadSTM (readTVarIO),
check,
lengthTBQueue,
modifyTVar,
newTBQueueIO,
newTVarIO,
Expand Down Expand Up @@ -49,6 +50,7 @@ import HydraNode (
)
import System.FilePath ((</>))
import Test.QuickCheck (generate, scale)
import Text.Printf (printf)

aliceSk, bobSk, carolSk :: SignKeyDSIGN MockDSIGN
aliceSk = 10
Expand Down Expand Up @@ -118,18 +120,31 @@ bench timeoutSeconds workDir dataset clusterSize =

processTransactions :: [HydraClient] -> [Dataset] -> IO (Map.Map (TxId CardanoTx) Event)
processTransactions clients dataset = do
let processors = zip dataset (cycle clients)
mconcat <$> mapConcurrently clientProcessTransactionsSequence processors
let processors = zip (zip dataset (cycle clients)) [1 ..]
mconcat <$> mapConcurrently (uncurry clientProcessTransactionsSequence) processors
where
clientProcessTransactionsSequence (Dataset{transactionsSequence}, client) = do
submissionQ <- newTBQueueIO (fromIntegral $ length transactionsSequence)
clientProcessTransactionsSequence (Dataset{transactionsSequence}, client) clientId = do
let numberOfTxs = length transactionsSequence
submissionQ <- newTBQueueIO (fromIntegral numberOfTxs)
registry <- newRegistry
withNewClient client $ \client' -> do
atomically $ forM_ transactionsSequence $ writeTBQueue submissionQ
submitTxs client' registry submissionQ
`concurrently_` waitForAllConfirmations client' registry submissionQ (Set.fromList $ map txId transactionsSequence)
`concurrently_` progressReport (hydraNodeId client') clientId numberOfTxs submissionQ
readTVarIO (processedTxs registry)

progressReport :: Int -> Int -> Int -> TBQueue IO CardanoTx -> IO ()
progressReport nodeId clientId queueSize queue = do
len <- atomically (lengthTBQueue queue)
if len == (0 :: Natural)
then pure ()
else do
let progress :: Double = (1 - fromIntegral len / fromIntegral queueSize) * 100.0
putStrLn $ printf "Client %d (node %d): %d/%d (%.02f%%)" clientId nodeId (queueSize - fromIntegral len) queueSize progress
threadDelay 5
progressReport nodeId clientId queueSize queue

--
-- Helpers
--
Expand Down Expand Up @@ -180,7 +195,6 @@ newTx registry client tx = do
, confirmedAt = Nothing
}
send client $ input "NewTx" ["transaction" .= tx]
putTextLn $ "Submitted tx " <> show (txId tx)

data WaitResult
= TxInvalid {transaction :: CardanoTx, reason :: Text}
Expand Down Expand Up @@ -236,22 +250,15 @@ waitForAllConfirmations n1 Registry{processedTxs} submissionQ allIds = do
validTx processedTxs (txId transaction)
go remainingIds
TxInvalid{transaction} -> do
putTextLn $ "TxInvalid: " <> show (txId transaction) <> ", resubmitting"
atomically $ writeTBQueue submissionQ transaction
go remainingIds
SnapshotConfirmed{transactions, snapshotNumber} -> do
-- TODO(SN): use a tracer for this
SnapshotConfirmed{transactions} -> do
confirmedIds <- mapM (confirmTx processedTxs) transactions
putTextLn $ "Snapshot confirmed: " <> show snapshotNumber
putTextLn $ "Transaction(s) confirmed: " <> fmtIds confirmedIds
go $ remainingIds \\ Set.fromList confirmedIds

waitForSnapshotConfirmation = waitMatch 20 n1 $ \v ->
maybeTxValid v <|> maybeTxInvalid v <|> maybeSnapshotConfirmed v

fmtIds =
toText . intercalate "" . fmap (("\n - " <>) . show)

maybeTxValid v = do
guard (v ^? key "tag" == Just "TxValid")
v ^? key "transaction" . to fromJSON >>= \case
Expand Down

0 comments on commit 6bbdbc1

Please sign in to comment.