Skip to content

Commit

Permalink
Merge pull request #81 from input-output-hk/abailly-iohk/make-benchma…
Browse files Browse the repository at this point in the history
…rk-concurrent

Make benchmark concurrent
  • Loading branch information
KtorZ committed Sep 10, 2021
2 parents e2d1995 + ce5c4f1 commit a648f07
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 69 deletions.
135 changes: 97 additions & 38 deletions local-cluster/bench/Bench/EndToEnd.hs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE TypeApplications #-}

module Bench.EndToEnd where
Expand All @@ -13,11 +14,11 @@ import Cardano.Crypto.DSIGN (
VerKeyDSIGN,
)
import Control.Lens (to, (^?))
import Control.Monad.Class.MonadAsync (mapConcurrently)
import Control.Monad.Class.MonadSTM (
MonadSTM (readTVarIO),
check,
modifyTVar,
modifyTVar',
newTBQueueIO,
newTVarIO,
tryReadTBQueue,
Expand All @@ -31,10 +32,11 @@ import Data.Set ((\\))
import qualified Data.Set as Set
import Data.Time (nominalDiffTimeToSeconds)
import Hydra.Ledger (Tx, TxId, Utxo, txId)
import Hydra.Ledger.Cardano (CardanoTx)
import Hydra.Ledger.Cardano (CardanoTx, genSequenceOfValidTransactions, genUtxo)
import Hydra.Logging (showLogsOnFailure)
import HydraNode (
HydraClient,
hydraNodeId,
input,
output,
send,
Expand All @@ -45,6 +47,7 @@ import HydraNode (
withMockChain,
)
import System.FilePath ((</>))
import Test.QuickCheck (generate, scale)

aliceSk, bobSk, carolSk :: SignKeyDSIGN MockDSIGN
aliceSk = 10
Expand All @@ -56,18 +59,30 @@ aliceVk = deriveVerKeyDSIGN aliceSk
bobVk = deriveVerKeyDSIGN bobSk
carolVk = deriveVerKeyDSIGN carolSk

data Dataset = Dataset
{ initialUtxo :: Utxo CardanoTx
, transactionsSequence :: [CardanoTx]
}
deriving (Eq, Show, Generic, ToJSON, FromJSON)

generateDataset :: Int -> IO Dataset
generateDataset scalingFactor = do
initialUtxo <- generate genUtxo
transactionsSequence <- generate $ scale (* scalingFactor) $ genSequenceOfValidTransactions initialUtxo
pure Dataset{initialUtxo, transactionsSequence}

data Event = Event
{ submittedAt :: UTCTime
, validAt :: Maybe UTCTime
, confirmedAt :: Maybe UTCTime
}
deriving (Generic, Eq, Show, ToJSON)

bench :: FilePath -> Utxo CardanoTx -> [CardanoTx] -> Spec
bench workDir initialUtxo txs =
bench :: DiffTime -> FilePath -> [Dataset] -> Spec
bench timeoutSeconds workDir dataset =
specify ("Load test on three local nodes (" <> workDir <> ")") $ do
registry <- newRegistry txs
showLogsOnFailure $ \tracer ->
failAfter 600 $ do
failAfter timeoutSeconds $ do
withMockChain $ \chainPorts ->
withHydraNode tracer workDir chainPorts 1 aliceSk [bobVk, carolVk] $ \n1 ->
withHydraNode tracer workDir chainPorts 2 bobSk [aliceVk, carolVk] $ \n2 ->
Expand All @@ -78,36 +93,62 @@ bench workDir initialUtxo txs =
waitFor tracer 3 [n1, n2, n3] $
output "ReadyToCommit" ["parties" .= [int 10, 20, 30]]

send n1 $ input "Commit" ["utxo" .= initialUtxo]
send n2 $ input "Commit" ["utxo" .= noUtxos]
send n3 $ input "Commit" ["utxo" .= noUtxos]
expectedUtxo <- commit [n1, n2, n3] dataset

waitFor tracer 3 [n1, n2, n3] $ output "HeadIsOpen" ["utxo" .= initialUtxo]
waitFor tracer 3 [n1, n2, n3] $ output "HeadIsOpen" ["utxo" .= expectedUtxo]

submitTxs n1 registry
`concurrently_` waitForAllConfirmations n1 registry (Set.fromList . map txId $ txs)
processedTransactions <- processTransactions [n1, n2, n3] dataset

putTextLn "Closing the Head..."
send n1 $ input "Close" []
waitMatch (contestationPeriod + 3) n1 $ \v ->
guard (v ^? key "tag" == Just "HeadIsFinalized")

res <- mapMaybe analyze . Map.toList <$> readTVarIO (processedTxs registry)
writeResultsCsv (workDir </> "results.csv") res
-- TODO: Create a proper summary
let confTimes = map snd res
below1Sec = filter (< 1) confTimes
avgConfirmation = double (nominalDiffTimeToSeconds $ sum confTimes) / double (length confTimes)
percentBelow1Sec = double (length below1Sec) / double (length confTimes) * 100
putTextLn $ "Confirmed txs: " <> show (length confTimes)
putTextLn $ "Average confirmation time: " <> show avgConfirmation
putTextLn $ "Confirmed below 1 sec: " <> show percentBelow1Sec <> "%"
percentBelow1Sec `shouldSatisfy` (> 90)
let res = mapMaybe analyze . Map.toList $ processedTransactions
writeResultsCsv (workDir </> "results.csv") res
-- TODO: Create a proper summary
let confTimes = map (\(_, _, a) -> a) res
below1Sec = filter (< 1) confTimes
avgConfirmation = double (nominalDiffTimeToSeconds $ sum confTimes) / double (length confTimes)
percentBelow1Sec = double (length below1Sec) / double (length confTimes) * 100
putTextLn $ "Confirmed txs: " <> show (length confTimes)
putTextLn $ "Average confirmation time: " <> show avgConfirmation
putTextLn $ "Confirmed below 1 sec: " <> show percentBelow1Sec <> "%"
percentBelow1Sec `shouldSatisfy` (> 90)

processTransactions :: [HydraClient] -> [Dataset] -> IO (Map.Map (TxId CardanoTx) Event)
processTransactions clients dataset = do
let processors = zip dataset (cycle clients)
mconcat <$> mapConcurrently clientProcessTransactionsSequence processors
where
clientProcessTransactionsSequence (Dataset{transactionsSequence}, client) = do
submissionQ <- newTBQueueIO (fromIntegral $ length transactionsSequence)
registry <- newRegistry
atomically $ forM_ transactionsSequence $ writeTBQueue submissionQ
submitTxs client registry submissionQ
`concurrently_` waitForAllConfirmations client registry submissionQ (Set.fromList $ map txId transactionsSequence)
readTVarIO (processedTxs registry)

--
-- Helpers
--

commit :: [HydraClient] -> [Dataset] -> IO (Utxo CardanoTx)
commit clients dataset = do
let initialMap = Map.fromList $ map (\node -> (hydraNodeId node, (node, noUtxos))) clients
distributeUtxo = zip (initialUtxo <$> dataset) (cycle $ hydraNodeId <$> clients)
clientsToUtxo = foldr assignUtxo initialMap distributeUtxo

forM_ (Map.elems clientsToUtxo) $ \(n, utxo) ->
send n $ input "Commit" ["utxo" .= utxo]

pure $ mconcat $ snd <$> Map.elems clientsToUtxo

assignUtxo :: (Utxo CardanoTx, Int) -> Map.Map Int (HydraClient, Utxo CardanoTx) -> Map.Map Int (HydraClient, Utxo CardanoTx)
assignUtxo (utxo, clientId) = Map.adjust appendUtxo clientId
where
appendUtxo (client, utxo') = (client, utxo <> utxo')

noUtxos :: Utxo CardanoTx
noUtxos = mempty

Expand All @@ -134,42 +175,41 @@ newTx registry client tx = do
Map.insert (txId tx) $
Event
{ submittedAt = now
, validAt = Nothing
, confirmedAt = Nothing
}
send client $ input "NewTx" ["transaction" .= tx]
putTextLn $ "Submitted tx " <> show (txId tx)

data WaitResult
= TxInvalid {transaction :: CardanoTx, reason :: Text}
| TxValid {transaction :: CardanoTx}
| SnapshotConfirmed {transactions :: [Value], snapshotNumber :: Scientific}

data Registry tx = Registry
{ processedTxs :: TVar IO (Map.Map (TxId tx) Event)
, submissionQ :: TBQueue IO CardanoTx
, latestSnapshot :: TVar IO Scientific
}

newRegistry ::
[CardanoTx] ->
IO (Registry CardanoTx)
newRegistry txs = do
newRegistry = do
processedTxs <- newTVarIO mempty
submissionQ <- newTBQueueIO (fromIntegral $ length txs)
atomically $ mapM_ (writeTBQueue submissionQ) txs
latestSnapshot <- newTVarIO 0
pure $ Registry{processedTxs, submissionQ, latestSnapshot}
pure $ Registry{processedTxs, latestSnapshot}

submitTxs ::
HydraClient ->
Registry CardanoTx ->
TBQueue IO CardanoTx ->
IO ()
submitTxs client registry@Registry{processedTxs, submissionQ} = do
submitTxs client registry@Registry{processedTxs} submissionQ = do
txToSubmit <- atomically $ tryReadTBQueue submissionQ
case txToSubmit of
Just tx -> do
newTx processedTxs client tx
waitTxIsConfirmed (txId tx)
submitTxs client registry
submitTxs client registry submissionQ
Nothing -> pure ()
where
waitTxIsConfirmed txid =
Expand All @@ -180,34 +220,43 @@ submitTxs client registry@Registry{processedTxs, submissionQ} = do
waitForAllConfirmations ::
HydraClient ->
Registry CardanoTx ->
TBQueue IO CardanoTx ->
Set (TxId CardanoTx) ->
IO ()
waitForAllConfirmations n1 Registry{processedTxs, submissionQ, latestSnapshot} allIds = do
waitForAllConfirmations n1 Registry{processedTxs} submissionQ allIds = do
go allIds
where
go remainingIds
| Set.null remainingIds = do
putStrLn "All transactions confirmed. Sweet!"
| otherwise = do
waitForSnapshotConfirmation >>= \case
TxValid{transaction} -> do
validTx processedTxs (txId transaction)
go remainingIds
TxInvalid{transaction} -> do
putTextLn $ "TxInvalid: " <> show (txId transaction) <> ", resubmitting"
atomically $ writeTBQueue submissionQ transaction
go remainingIds
SnapshotConfirmed{transactions, snapshotNumber} -> do
-- TODO(SN): use a tracer for this
atomically $ modifyTVar' latestSnapshot (max snapshotNumber)
confirmedIds <- mapM (confirmTx processedTxs) transactions
putTextLn $ "Snapshot confirmed: " <> show snapshotNumber
putTextLn $ "Transaction(s) confirmed: " <> fmtIds confirmedIds
go $ remainingIds \\ Set.fromList confirmedIds

waitForSnapshotConfirmation = waitMatch 20 n1 $ \v ->
maybeTxInvalid v <|> maybeSnapshotConfirmed v
maybeTxValid v <|> maybeTxInvalid v <|> maybeSnapshotConfirmed v

fmtIds =
toText . intercalate "" . fmap (("\n - " <>) . show)

maybeTxValid v = do
guard (v ^? key "tag" == Just "TxValid")
v ^? key "transaction" . to fromJSON >>= \case
Error _ -> Nothing
Success tx -> pure $ TxValid tx

maybeTxInvalid v = do
guard (v ^? key "tag" == Just "TxInvalid")
v ^? key "transaction" . to fromJSON >>= \case
Expand Down Expand Up @@ -236,16 +285,26 @@ confirmTx registry tx = do
pure identifier
_ -> error $ "incorrect Txid" <> show tx

analyze :: (TxId CardanoTx, Event) -> Maybe (UTCTime, NominalDiffTime)
validTx ::
TVar IO (Map.Map (TxId CardanoTx) Event) ->
TxId CardanoTx ->
IO ()
validTx registry txid = do
now <- getCurrentTime
atomically $
modifyTVar registry $
Map.adjust (\e -> e{validAt = Just now}) txid

analyze :: (TxId CardanoTx, Event) -> Maybe (UTCTime, NominalDiffTime, NominalDiffTime)
analyze = \case
(_, Event{submittedAt, confirmedAt = Just conf}) -> Just (submittedAt, conf `diffUTCTime` submittedAt)
(_, Event{submittedAt, validAt = Just valid, confirmedAt = Just conf}) -> Just (submittedAt, valid `diffUTCTime` submittedAt, conf `diffUTCTime` submittedAt)
_ -> Nothing

writeResultsCsv :: FilePath -> [(UTCTime, NominalDiffTime)] -> IO ()
writeResultsCsv :: FilePath -> [(UTCTime, NominalDiffTime, NominalDiffTime)] -> IO ()
writeResultsCsv fp res = do
putStrLn $ "Writing results to: " <> fp
writeFileLBS fp $ headers <> "\n" <> foldMap toCsv res
where
headers = "txId,confirmationTime"

toCsv (a, b) = show a <> "," <> encode b <> "\n"
toCsv (a, b, c) = show a <> "," <> encode b <> "," <> encode c <> "\n"
68 changes: 39 additions & 29 deletions local-cluster/bench/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@ module Main where
import Hydra.Prelude
import Test.Hydra.Prelude

import Bench.EndToEnd (bench)
import Bench.EndToEnd (bench, generateDataset)
import Data.Aeson (eitherDecodeFileStrict', encodeFile)
import Hydra.Ledger.Cardano (genSequenceOfValidTransactions, genUtxo)
import Options.Applicative (
Parser,
ParserInfo,
Expand All @@ -26,11 +25,12 @@ import Options.Applicative (
import System.Directory (createDirectory, doesDirectoryExist)
import System.Environment (withArgs)
import System.FilePath ((</>))
import Test.QuickCheck (generate, scale)

data Options = Options
{ outputDirectory :: Maybe FilePath
, scalingFactor :: Int
, concurrency :: Int
, timeoutSeconds :: DiffTime
}

benchOptionsParser :: Parser Options
Expand All @@ -55,6 +55,24 @@ benchOptionsParser =
<> metavar "INT"
<> help "The scaling factor to apply to transactions generator (default: 100)"
)
<*> option
auto
( long "concurrency"
<> value 1
<> metavar "INT"
<> help
"The concurrency level in the generated dataset. This number is used to \
\ define how many independent UTXO set and transaction sequences will be \
\ generated and concurrently submitted to the nodes (default: 1)"
)
<*> option
auto
( long "timeout"
<> value 600.0
<> metavar "SECONDS"
<> help
"The timeout for the run, in seconds (default: '600s')"
)

benchOptions :: ParserInfo Options
benchOptions =
Expand All @@ -72,37 +90,29 @@ benchOptions =
main :: IO ()
main =
execParser benchOptions >>= \case
Options{outputDirectory = Just benchDir, scalingFactor} -> do
Options{outputDirectory = Just benchDir, scalingFactor, concurrency, timeoutSeconds} -> do
existsDir <- doesDirectoryExist benchDir
if existsDir
then replay benchDir
else createDirectory benchDir >> play scalingFactor benchDir
Options{scalingFactor} ->
createSystemTempDirectory "bench" >>= play scalingFactor
then replay timeoutSeconds benchDir
else createDirectory benchDir >> play scalingFactor concurrency timeoutSeconds benchDir
Options{scalingFactor, concurrency, timeoutSeconds} ->
createSystemTempDirectory "bench" >>= play scalingFactor concurrency timeoutSeconds
where
replay benchDir = do
txs <- either die pure =<< eitherDecodeFileStrict' (benchDir </> "txs.json")
utxo <- either die pure =<< eitherDecodeFileStrict' (benchDir </> "utxo.json")
replay timeoutSeconds benchDir = do
datasets <- either die pure =<< eitherDecodeFileStrict' (benchDir </> "dataset.json")
putStrLn $ "Using UTxO and Transactions from: " <> benchDir
run benchDir utxo txs
run timeoutSeconds benchDir datasets

play scalingFactor benchDir = do
initialUtxo <- generate genUtxo
txs <- generate $ scale (* scalingFactor) $ genSequenceOfValidTransactions initialUtxo
saveTransactions benchDir txs
saveUtxos benchDir initialUtxo
run benchDir initialUtxo txs
play scalingFactor concurrency timeoutSeconds benchDir = do
dataset <- replicateM concurrency (generateDataset scalingFactor)
saveDataset benchDir dataset
run timeoutSeconds benchDir dataset

-- TODO(SN): Ideally we would like to say "to re-run use ... " on errors
run fp utxo txs =
withArgs [] . hspec $ bench fp utxo txs

saveTransactions tmpDir txs = do
let txsFile = tmpDir </> "txs.json"
putStrLn $ "Writing transactions to: " <> txsFile
encodeFile txsFile txs
run timeoutSeconds benchDir datasets =
withArgs [] . hspec $ bench timeoutSeconds benchDir datasets

saveUtxos tmpDir utxos = do
let utxosFile = tmpDir </> "utxo.json"
putStrLn $ "Writing UTxO set to: " <> utxosFile
encodeFile utxosFile utxos
saveDataset tmpDir dataset = do
let txsFile = tmpDir </> "dataset.json"
putStrLn $ "Writing dataset to: " <> txsFile
encodeFile txsFile dataset

0 comments on commit a648f07

Please sign in to comment.