Skip to content

Commit

Permalink
Benchmark can run an arbitrary number of nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
abailly-iohk committed Sep 10, 2021
1 parent 5a29446 commit ff7b2f4
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 67 deletions.
67 changes: 33 additions & 34 deletions local-cluster/bench/Bench/EndToEnd.hs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import HydraNode (
waitFor,
waitForNodesConnected,
waitMatch,
withHydraNode,
withHydraCluster,
withMockChain,
withNewClient,
)
Expand Down Expand Up @@ -79,43 +79,42 @@ data Event = Event
}
deriving (Generic, Eq, Show, ToJSON)

bench :: DiffTime -> FilePath -> [Dataset] -> Spec
bench timeoutSeconds workDir dataset =
bench :: DiffTime -> FilePath -> [Dataset] -> Word64 -> Spec
bench timeoutSeconds workDir dataset clusterSize =
specify ("Load test on three local nodes (" <> workDir <> ")") $ do
showLogsOnFailure $ \tracer ->
failAfter timeoutSeconds $ do
withMockChain $ \chainPorts ->
withHydraNode tracer workDir chainPorts 1 aliceSk [bobVk, carolVk] $ \n1 ->
withHydraNode tracer workDir chainPorts 2 bobSk [aliceVk, carolVk] $ \n2 ->
withHydraNode tracer workDir chainPorts 3 carolSk [aliceVk, bobVk] $ \n3 -> do
waitForNodesConnected tracer [n1, n2, n3]
let contestationPeriod = 10 :: Natural
send n1 $ input "Init" ["contestationPeriod" .= contestationPeriod]
waitFor tracer 3 [n1, n2, n3] $
output "ReadyToCommit" ["parties" .= [int 10, 20, 30]]

expectedUtxo <- commit [n1, n2, n3] dataset

waitFor tracer 3 [n1, n2, n3] $ output "HeadIsOpen" ["utxo" .= expectedUtxo]

processedTransactions <- processTransactions [n1, n2, n3] dataset

putTextLn "Closing the Head..."
send n1 $ input "Close" []
waitMatch (contestationPeriod + 3) n1 $ \v ->
guard (v ^? key "tag" == Just "HeadIsFinalized")

let res = mapMaybe analyze . Map.toList $ processedTransactions
writeResultsCsv (workDir </> "results.csv") res
-- TODO: Create a proper summary
let confTimes = map (\(_, _, a) -> a) res
below1Sec = filter (< 1) confTimes
avgConfirmation = double (nominalDiffTimeToSeconds $ sum confTimes) / double (length confTimes)
percentBelow1Sec = double (length below1Sec) / double (length confTimes) * 100
putTextLn $ "Confirmed txs: " <> show (length confTimes)
putTextLn $ "Average confirmation time: " <> show avgConfirmation
putTextLn $ "Confirmed below 1 sec: " <> show percentBelow1Sec <> "%"
percentBelow1Sec `shouldSatisfy` (> 90)
withHydraCluster tracer workDir chainPorts clusterSize $ \(leader :| followers) -> do
let nodes = leader : followers
waitForNodesConnected tracer [1 .. fromIntegral clusterSize] nodes
let contestationPeriod = 100 :: Natural
send leader $ input "Init" ["contestationPeriod" .= contestationPeriod]
waitFor tracer 3 nodes $
output "ReadyToCommit" ["parties" .= map int [1 .. fromIntegral clusterSize]]

expectedUtxo <- commit nodes dataset

waitFor tracer 3 nodes $ output "HeadIsOpen" ["utxo" .= expectedUtxo]

processedTransactions <- processTransactions nodes dataset

putTextLn "Closing the Head..."
send leader $ input "Close" []
waitMatch (contestationPeriod + 3) leader $ \v ->
guard (v ^? key "tag" == Just "HeadIsFinalized")

let res = mapMaybe analyze . Map.toList $ processedTransactions
writeResultsCsv (workDir </> "results.csv") res
-- TODO: Create a proper summary
let confTimes = map (\(_, _, a) -> a) res
below1Sec = filter (< 1) confTimes
avgConfirmation = double (nominalDiffTimeToSeconds $ sum confTimes) / double (length confTimes)
percentBelow1Sec = double (length below1Sec) / double (length confTimes) * 100
putTextLn $ "Confirmed txs: " <> show (length confTimes)
putTextLn $ "Average confirmation time: " <> show avgConfirmation
putTextLn $ "Confirmed below 1 sec: " <> show percentBelow1Sec <> "%"
percentBelow1Sec `shouldSatisfy` (> 90)

processTransactions :: [HydraClient] -> [Dataset] -> IO (Map.Map (TxId CardanoTx) Event)
processTransactions clients dataset = do
Expand Down
31 changes: 20 additions & 11 deletions local-cluster/bench/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ data Options = Options
, scalingFactor :: Int
, concurrency :: Int
, timeoutSeconds :: DiffTime
, clusterSize :: Word64
}

benchOptionsParser :: Parser Options
Expand Down Expand Up @@ -73,6 +74,14 @@ benchOptionsParser =
<> help
"The timeout for the run, in seconds (default: '600s')"
)
<*> option
auto
( long "cluster-size"
<> value 3
<> metavar "INT"
<> help
"The number of Hydra nodes to start and connect (default: 3)"
)

benchOptions :: ParserInfo Options
benchOptions =
Expand All @@ -90,27 +99,27 @@ benchOptions =
main :: IO ()
main =
execParser benchOptions >>= \case
Options{outputDirectory = Just benchDir, scalingFactor, concurrency, timeoutSeconds} -> do
Options{outputDirectory = Just benchDir, scalingFactor, concurrency, timeoutSeconds, clusterSize} -> do
existsDir <- doesDirectoryExist benchDir
if existsDir
then replay timeoutSeconds benchDir
else createDirectory benchDir >> play scalingFactor concurrency timeoutSeconds benchDir
Options{scalingFactor, concurrency, timeoutSeconds} ->
createSystemTempDirectory "bench" >>= play scalingFactor concurrency timeoutSeconds
then replay timeoutSeconds clusterSize benchDir
else createDirectory benchDir >> play scalingFactor concurrency timeoutSeconds clusterSize benchDir
Options{scalingFactor, concurrency, timeoutSeconds, clusterSize} ->
createSystemTempDirectory "bench" >>= play scalingFactor concurrency timeoutSeconds clusterSize
where
replay timeoutSeconds benchDir = do
replay timeoutSeconds clusterSize benchDir = do
datasets <- either die pure =<< eitherDecodeFileStrict' (benchDir </> "dataset.json")
putStrLn $ "Using UTxO and Transactions from: " <> benchDir
run timeoutSeconds benchDir datasets
run timeoutSeconds benchDir datasets clusterSize

play scalingFactor concurrency timeoutSeconds benchDir = do
play scalingFactor concurrency timeoutSeconds clusterSize benchDir = do
dataset <- replicateM concurrency (generateDataset scalingFactor)
saveDataset benchDir dataset
run timeoutSeconds benchDir dataset
run timeoutSeconds benchDir dataset clusterSize

-- TODO(SN): Ideally we would like to say "to re-run use ... " on errors
run timeoutSeconds benchDir datasets =
withArgs [] . hspec $ bench timeoutSeconds benchDir datasets
run timeoutSeconds benchDir datasets clusterSize =
withArgs [] . hspec $ bench timeoutSeconds benchDir datasets clusterSize

saveDataset tmpDir dataset = do
let txsFile = tmpDir </> "dataset.json"
Expand Down
47 changes: 33 additions & 14 deletions local-cluster/src/HydraNode.hs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,16 @@ module HydraNode (
withTempDir,
waitNext,
withNewClient,
withHydraCluster,
) where

import Hydra.Prelude hiding (delete)

import Cardano.BM.Tracing (ToObject)
import Cardano.Crypto.DSIGN (
DSIGNAlgorithm (..),
SignKeyDSIGN,
VerKeyDSIGN,
SignKeyDSIGN (SignKeyMockDSIGN),
VerKeyDSIGN (VerKeyMockDSIGN),
)
import Control.Concurrent.Async (
forConcurrently_,
Expand Down Expand Up @@ -179,6 +180,27 @@ data EndToEndLog
| EndWaiting Int
deriving (Eq, Show, Generic, ToJSON, FromJSON, ToObject)

withHydraCluster ::
Tracer IO EndToEndLog ->
FilePath ->
(Int, Int, Int) ->
Word64 ->
(NonEmpty HydraClient -> IO ()) ->
IO ()
withHydraCluster tracer workDir mockChainPorts clusterSize action =
case clusterSize of
0 -> error "Cannot run a cluster with 0 number of nodes"
n -> go n [] [1 .. n]
where
go n clients = \case
[] -> action (fromList clients)
(nodeId : rest) ->
let vKeys = map VerKeyMockDSIGN $ filter (/= nodeId) allNodeIds
key = SignKeyMockDSIGN nodeId
in withHydraNode tracer workDir mockChainPorts (fromIntegral nodeId) key vKeys (map fromIntegral allNodeIds) (\c -> go n (c : clients) rest)
where
allNodeIds = [1 .. n]

withHydraNode ::
forall alg.
DSIGNAlgorithm alg =>
Expand All @@ -188,9 +210,10 @@ withHydraNode ::
Int ->
SignKeyDSIGN alg ->
[VerKeyDSIGN alg] ->
[Int] ->
(HydraClient -> IO ()) ->
IO ()
withHydraNode tracer workDir mockChainPorts hydraNodeId sKey vKeys action = do
withHydraNode tracer workDir mockChainPorts hydraNodeId sKey vKeys allNodeIds action = do
let logFile = workDir </> show hydraNodeId
withFile' logFile $ \out -> do
withSystemTempDirectory "hydra-node" $ \dir -> do
Expand All @@ -201,7 +224,7 @@ withHydraNode tracer workDir mockChainPorts hydraNodeId sKey vKeys action = do
filepath <$ BS.writeFile filepath (rawSerialiseVerKeyDSIGN vKey)

let p =
(hydraNodeProcess $ defaultArguments hydraNodeId sKeyPath vKeysPaths mockChainPorts)
(hydraNodeProcess $ defaultArguments hydraNodeId sKeyPath vKeysPaths mockChainPorts allNodeIds)
{ std_out = UseHandle out
}
withCreateProcess p $
Expand Down Expand Up @@ -257,8 +280,9 @@ defaultArguments ::
FilePath ->
[FilePath] ->
(Int, Int, Int) ->
[Int] ->
[String]
defaultArguments nodeId sKey vKeys ports =
defaultArguments nodeId sKey vKeys ports allNodeIds =
[ "--quiet"
, "--node-id"
, show nodeId
Expand Down Expand Up @@ -302,16 +326,11 @@ checkProcessHasNotDied name processHandle =
ExitSuccess -> pure ()
ExitFailure exit -> failure $ "Process " <> show name <> " exited with failure code: " <> show exit

-- HACK(SN): These functions here are hard-coded for three nodes, but the tests
-- are somewhat parameterized -> make it all or nothing hard-coded
allNodeIds :: [Int]
allNodeIds = [1 .. 3]

waitForNodesConnected :: HasCallStack => Tracer IO EndToEndLog -> [HydraClient] -> IO ()
waitForNodesConnected tracer = mapM_ (waitForNodeConnected tracer)
waitForNodesConnected :: HasCallStack => Tracer IO EndToEndLog -> [Int] -> [HydraClient] -> IO ()
waitForNodesConnected tracer allNodeIds = mapM_ (waitForNodeConnected tracer allNodeIds)

waitForNodeConnected :: HasCallStack => Tracer IO EndToEndLog -> HydraClient -> IO ()
waitForNodeConnected tracer n@HydraClient{hydraNodeId} =
waitForNodeConnected :: HasCallStack => Tracer IO EndToEndLog -> [Int] -> HydraClient -> IO ()
waitForNodeConnected tracer allNodeIds n@HydraClient{hydraNodeId} =
-- HACK(AB): This is gross, we hijack the node ids and because we know
-- keys are just integers we can compute them but that's ugly -> use property
-- party identifiers everywhere
Expand Down
19 changes: 11 additions & 8 deletions local-cluster/test/Test/EndToEndSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ import HydraNode (
import Text.Regex.TDFA ((=~))
import Text.Regex.TDFA.Text ()

allNodeIds :: [Int]
allNodeIds = [1 .. 3]

spec :: Spec
spec = around showLogsOnFailure $
describe "End-to-end test using a mocked chain though" $ do
Expand All @@ -101,10 +104,10 @@ spec = around showLogsOnFailure $
failAfter 30 $
withTempDir "end-to-end-inits-and-closes" $ \tmpDir ->
withMockChain $ \chainPorts ->
withHydraNode tracer tmpDir chainPorts 1 aliceSk [bobVk, carolVk] $ \n1 ->
withHydraNode tracer tmpDir chainPorts 2 bobSk [aliceVk, carolVk] $ \n2 ->
withHydraNode tracer tmpDir chainPorts 3 carolSk [aliceVk, bobVk] $ \n3 -> do
waitForNodesConnected tracer [n1, n2, n3]
withHydraNode tracer tmpDir chainPorts 1 aliceSk [bobVk, carolVk] allNodeIds $ \n1 ->
withHydraNode tracer tmpDir chainPorts 2 bobSk [aliceVk, carolVk] allNodeIds $ \n2 ->
withHydraNode tracer tmpDir chainPorts 3 carolSk [aliceVk, bobVk] allNodeIds $ \n3 -> do
waitForNodesConnected tracer allNodeIds [n1, n2, n3]
let contestationPeriod = 10 :: Natural
send n1 $ input "Init" ["contestationPeriod" .= contestationPeriod]
waitFor tracer 3 [n1, n2, n3] $
Expand Down Expand Up @@ -173,10 +176,10 @@ spec = around showLogsOnFailure $
withTempDir "end-to-end-prometheus-metrics" $ \tmpDir ->
failAfter 20 $
withMockChain $ \mockPorts ->
withHydraNode tracer tmpDir mockPorts 1 aliceSk [bobVk, carolVk] $ \n1 ->
withHydraNode tracer tmpDir mockPorts 2 bobSk [aliceVk, carolVk] $ \_n2 ->
withHydraNode tracer tmpDir mockPorts 3 carolSk [aliceVk, bobVk] $ \_n3 -> do
waitForNodesConnected tracer [n1]
withHydraNode tracer tmpDir mockPorts 1 aliceSk [bobVk, carolVk] allNodeIds $ \n1 ->
withHydraNode tracer tmpDir mockPorts 2 bobSk [aliceVk, carolVk] allNodeIds $ \_n2 ->
withHydraNode tracer tmpDir mockPorts 3 carolSk [aliceVk, bobVk] allNodeIds $ \_n3 -> do
waitForNodesConnected tracer allNodeIds [n1]
send n1 $ input "Init" ["contestationPeriod" .= int 10]
waitFor tracer 3 [n1] $ output "ReadyToCommit" ["parties" .= [int 10, 20, 30]]
metrics <- getMetrics n1
Expand Down

0 comments on commit ff7b2f4

Please sign in to comment.