From 064c7d7b67dbca2bbff4674281a07d85953ccbaf Mon Sep 17 00:00:00 2001 From: Brian W Bush Date: Thu, 4 Jan 2024 15:01:17 -0700 Subject: [PATCH] PLT-9014 Benchmark for `BulkSync` protocol. --- .../app/Language/Marlowe/Runtime/Benchmark.hs | 56 +++++++-- .../Marlowe/Runtime/Benchmark/BulkSync.hs | 116 ++++++++++++++++++ .../Marlowe/Runtime/Benchmark/HeaderSync.hs | 17 ++- .../Marlowe/Runtime/Benchmark/Query.hs | 36 ++++-- .../Marlowe/Runtime/Benchmark/Sync.hs | 25 +++- marlowe-benchmark/app/Main.hs | 33 +++-- .../20240104_144839_brian.bush_PLT_9014.rst | 4 + marlowe-benchmark/marlowe-benchmark.cabal | 1 + 8 files changed, 252 insertions(+), 36 deletions(-) create mode 100644 marlowe-benchmark/app/Language/Marlowe/Runtime/Benchmark/BulkSync.hs create mode 100644 marlowe-benchmark/changelog.d/20240104_144839_brian.bush_PLT_9014.rst diff --git a/marlowe-benchmark/app/Language/Marlowe/Runtime/Benchmark.hs b/marlowe-benchmark/app/Language/Marlowe/Runtime/Benchmark.hs index 017cf3ba66..e24bb2af2c 100644 --- a/marlowe-benchmark/app/Language/Marlowe/Runtime/Benchmark.hs +++ b/marlowe-benchmark/app/Language/Marlowe/Runtime/Benchmark.hs @@ -1,27 +1,39 @@ {-# LANGUAGE RecordWildCards #-} +-- | Benchmark for protocols. module Language.Marlowe.Runtime.Benchmark ( + -- * Benchmarking BenchmarkConfig (..), measure, ) where -import Control.Monad (forM_) +import Control.Monad (forM_, when) import Control.Monad.IO.Class (liftIO) import Control.Monad.Trans.Marlowe (MarloweT) import Data.Aeson (FromJSON, ToJSON) +import Data.Default (Default (..)) +import Data.Word (Word8) import GHC.Generics (Generic) import qualified Data.Aeson as A (encode) import qualified Data.ByteString.Lazy.Char8 as LBS8 (putStrLn) +import qualified Language.Marlowe.Runtime.Benchmark.BulkSync as Bulk (measure) import qualified Language.Marlowe.Runtime.Benchmark.HeaderSync as HeaderSync (measure) import qualified Language.Marlowe.Runtime.Benchmark.Query as Query (measure) import qualified Language.Marlowe.Runtime.Benchmark.Sync as Sync (measure) +-- | Benchmark configuration. data BenchmarkConfig = BenchmarkConfig { headerSyncParallelism :: Int -- ^ Number of parallel clients for `HeaderSync` protocol. - , maxContracts :: Int + , headerMaxContracts :: Int -- ^ Maximum number of contracts to be read by each `HeaderSync` client. + , bulkParallelism :: Int + -- ^ Number of parallel clients for the `BulkSync` protocol. + , bulkPageSize :: Word8 + -- ^ Number of blocks to fetch at a time for the `BulkSync` clients. + , bulkMaxBlocks :: Int + -- ^ Maximum number of blocks to fetch for each `BulkSync` client. , syncParallelism :: Int -- ^ Number of parallel clients for `Sync` protocol. , syncBatchSize :: Int @@ -35,16 +47,42 @@ data BenchmarkConfig = BenchmarkConfig } deriving (Eq, FromJSON, Generic, Ord, Show, ToJSON) +instance Default BenchmarkConfig where + def = + BenchmarkConfig + { headerSyncParallelism = 4 + , headerMaxContracts = maxBound + , bulkParallelism = 4 + , bulkPageSize = 128 + , bulkMaxBlocks = maxBound + , syncParallelism = 4 + , syncBatchSize = 512 + , queryParallelism = 4 + , queryBatchSize = 16 + , queryPageSize = 256 + } + +-- | Run the benchmarks. measure :: BenchmarkConfig -> MarloweT IO () measure BenchmarkConfig{..} = do - (headerSyncResults, contractIds) <- HeaderSync.measure headerSyncParallelism maxContracts + when (headerSyncParallelism == 0) $ + error "At least one `HeaderSync` client must be run." + (headerSyncResults, contractIds) <- HeaderSync.measure headerSyncParallelism headerMaxContracts liftIO . forM_ headerSyncResults $ LBS8.putStrLn . A.encode - syncResults <- Sync.measure syncParallelism syncBatchSize contractIds - liftIO . forM_ syncResults $ LBS8.putStrLn . A.encode - queryResults <- - Query.measure queryParallelism queryBatchSize queryPageSize "No policy ID" $ - replicate (queryParallelism * queryBatchSize) mempty - liftIO . forM_ queryResults $ LBS8.putStrLn . A.encode + when (bulkParallelism > 0) $ + do + bulkResults <- Bulk.measure bulkParallelism bulkPageSize bulkMaxBlocks + liftIO . forM_ bulkResults $ LBS8.putStrLn . A.encode + when (syncParallelism > 0) $ + do + syncResults <- Sync.measure syncParallelism syncBatchSize contractIds + liftIO . forM_ syncResults $ LBS8.putStrLn . A.encode + when (queryParallelism > 0) $ + do + queryResults <- + Query.measure queryParallelism queryBatchSize queryPageSize "No policy ID" $ + replicate (queryParallelism * queryBatchSize) mempty + liftIO . forM_ queryResults $ LBS8.putStrLn . A.encode diff --git a/marlowe-benchmark/app/Language/Marlowe/Runtime/Benchmark/BulkSync.hs b/marlowe-benchmark/app/Language/Marlowe/Runtime/Benchmark/BulkSync.hs new file mode 100644 index 0000000000..0473090359 --- /dev/null +++ b/marlowe-benchmark/app/Language/Marlowe/Runtime/Benchmark/BulkSync.hs @@ -0,0 +1,116 @@ +{-# LANGUAGE RecordWildCards #-} + +-- | Benchmark for the `BulkSync` protocol. +module Language.Marlowe.Runtime.Benchmark.BulkSync ( + -- * Benchmarking + Benchmark (..), + measure, +) where + +import Control.Monad.IO.Class (MonadIO, liftIO) +import Control.Monad.Trans.Marlowe (MarloweT) +import Data.Aeson (ToJSON) +import Data.Default (Default (..)) +import Data.Time.Clock (NominalDiffTime) +import Data.Time.Clock.POSIX (getPOSIXTime) +import Data.Word (Word8) +import GHC.Generics (Generic) +import Language.Marlowe.Protocol.BulkSync.Client ( + ClientStIdle (..), + ClientStNext (..), + ClientStPoll (..), + MarloweBulkSyncClient (MarloweBulkSyncClient), + ) +import Language.Marlowe.Runtime.Client (runMarloweBulkSyncClient) +import Language.Marlowe.Runtime.History.Api (MarloweBlock (..), MarloweCreateTransaction (newContracts)) +import UnliftIO (replicateConcurrently) + +data Benchmark = Benchmark + { metric :: String + , blocksPerSecond :: Double + , createsPerSecond :: Double + , applyInputsPerSecond :: Double + , withdrawsPerSecond :: Double + , seconds :: Double + } + deriving (Eq, Generic, Ord, Show, ToJSON) + +data Statistics = Statistics + { blocks :: Int + , creates :: Int + , applyInputs :: Int + , withdraws :: Int + , duration :: NominalDiffTime + } + deriving (Eq, Generic, Ord, Show, ToJSON) + +instance Default Statistics where + def = Statistics def def def def 0 + +-- | Measure the performance of the protocol. +measure + :: Int + -- ^ Number of parallel clients for the `BulkSync` protocol. + -> Word8 + -- ^ Number of blocks to fetch at a time for the `BulkSync` clients. + -> Int + -- ^ Maximum number of blocks to fetch for each `BulkSync` client. + -> MarloweT IO [Benchmark] + -- ^ Action for running the benchmark. +measure parallelism pageSize maxBlocks = + replicateConcurrently parallelism $ + run "BulkSync" pageSize maxBlocks + +-- | Run the benchmarking client. +run + :: String + -- ^ Label for the benchmark. + -> Word8 + -- ^ Number of blocks to fetch at a time for the `BulkSync` clients. + -> Int + -- ^ Maximum number of blocks to fetch for each `BulkSync` client. + -> MarloweT IO Benchmark + -- ^ Action for running the benchmark. +run metric pageSize maxBlocks = + do + Statistics{..} <- runMarloweBulkSyncClient . benchmark pageSize maxBlocks =<< liftIO getPOSIXTime + let seconds = realToFrac duration + blocksPerSecond = realToFrac blocks / seconds + createsPerSecond = realToFrac creates / seconds + applyInputsPerSecond = realToFrac applyInputs / seconds + withdrawsPerSecond = realToFrac withdraws / seconds + pure Benchmark{..} + +-- | Run a benchmark. +benchmark + :: (MonadIO m) + => Word8 + -- ^ Number of blocks to fetch at a time for the `BulkSync` clients. + -> Int + -- ^ Maximum number of blocks to fetch for each `BulkSync` client. + -> NominalDiffTime + -- ^ When the benchmark started. + -> MarloweBulkSyncClient m Statistics + -- ^ Action for running the benchmark. +benchmark pageSize maxBlocks start = + let idle = SendMsgRequestNext pageSize . next + next stats@Statistics{..} = + ClientStNext + { recvMsgRollForward = \blocks' _tip -> + if blocks >= maxBlocks + then pure $ SendMsgDone stats + else do + now <- liftIO getPOSIXTime + pure $ + idle + stats + { blocks = blocks + length blocks' + , creates = creates + sum (length . mconcat . fmap newContracts . createTransactions <$> blocks') + , applyInputs = applyInputs + sum (length . applyInputsTransactions <$> blocks') + , withdraws = withdraws + sum (length . withdrawTransactions <$> blocks') + , duration = now - start + } + , recvMsgRollBackward = \_point _tip -> pure $ idle stats + , recvMsgWait = pure . SendMsgCancel $ SendMsgDone stats + } + in MarloweBulkSyncClient . pure $ idle def diff --git a/marlowe-benchmark/app/Language/Marlowe/Runtime/Benchmark/HeaderSync.hs b/marlowe-benchmark/app/Language/Marlowe/Runtime/Benchmark/HeaderSync.hs index 88f4648ce0..d8ab5ce5f6 100644 --- a/marlowe-benchmark/app/Language/Marlowe/Runtime/Benchmark/HeaderSync.hs +++ b/marlowe-benchmark/app/Language/Marlowe/Runtime/Benchmark/HeaderSync.hs @@ -1,6 +1,8 @@ {-# LANGUAGE RecordWildCards #-} +-- | Benchmark for the `HeaderSync` protocol. module Language.Marlowe.Runtime.Benchmark.HeaderSync ( + -- * Benchmarking Benchmark (..), measure, ) where @@ -35,7 +37,7 @@ data Benchmark = Benchmark deriving (Eq, Generic, Ord, Show, ToJSON) data Statistics = Statistics - { blocks :: Integer + { blocks :: Int , contracts :: S.Set ContractId , duration :: NominalDiffTime } @@ -44,31 +46,42 @@ data Statistics = Statistics instance Default Statistics where def = Statistics def mempty 0 +-- | Measure the performance of the protocol. measure :: Int + -- ^ Number of parallel clients for `HeaderSync` protocol. -> Int + -- ^ Maximum number of contracts to be read by each `HeaderSync` client. -> MarloweT IO ([Benchmark], S.Set ContractId) measure parallelism maxContracts = second head . unzip <$> replicateConcurrently parallelism (run "HeaderSync" maxContracts) +-- | Run a benchmark client. run :: String + -- ^ Label for the benchmark. -> Int + -- ^ Maximum number of contracts to be read by each `HeaderSync` client. -> MarloweT IO (Benchmark, S.Set ContractId) + -- ^ Action for running the benchmark. run metric maxContracts = do Statistics{..} <- runMarloweHeaderSyncClient . benchmark maxContracts =<< liftIO getPOSIXTime let seconds = realToFrac duration - blocksPerSecond = fromInteger blocks / seconds + blocksPerSecond = realToFrac blocks / seconds contractsPerSecond = fromIntegral (S.size contracts) / seconds pure (Benchmark{..}, contracts) +-- | Run the benchmark. benchmark :: (MonadIO m) => Int + -- ^ Maximum number of contracts to be read by each `HeaderSync` client. -> NominalDiffTime + -- ^ When the benchmark started. -> MarloweHeaderSyncClient m Statistics + -- ^ Action for running the benchmark. benchmark maxContracts start = let clientIdle = SendMsgRequestNext . clientNext clientWait = pure . SendMsgCancel . SendMsgDone diff --git a/marlowe-benchmark/app/Language/Marlowe/Runtime/Benchmark/Query.hs b/marlowe-benchmark/app/Language/Marlowe/Runtime/Benchmark/Query.hs index 347db12029..08a54bbd5d 100644 --- a/marlowe-benchmark/app/Language/Marlowe/Runtime/Benchmark/Query.hs +++ b/marlowe-benchmark/app/Language/Marlowe/Runtime/Benchmark/Query.hs @@ -1,6 +1,8 @@ {-# LANGUAGE RecordWildCards #-} +-- | Benchmark for the `Query` protocol. module Language.Marlowe.Runtime.Benchmark.Query ( + -- * Benchmarking Benchmark (..), measure, ) where @@ -32,9 +34,9 @@ data Benchmark = Benchmark deriving (Eq, Generic, Ord, Show, ToJSON) data Statistics = Statistics - { queries :: Integer - , pages :: Integer - , contracts :: Integer + { queries :: Int + , pages :: Int + , contracts :: Int , duration :: NominalDiffTime } deriving (Eq, Generic, Ord, Show, ToJSON) @@ -42,41 +44,59 @@ data Statistics = Statistics instance Default Statistics where def = Statistics def def def 0 +-- | Measure the performance of the protocol. measure :: Int + -- ^ Number of parallel clients for `Query` protocol. -> Int + -- ^ Number of queries to be executed by each `Query` client. -> Int + -- ^ Page size for each `Query` client. -> String + -- ^ Label for the contract filter. -> [ContractFilter] + -- ^ Contract filters to query. -> MarloweT IO [Benchmark] + -- ^ Action to run the benchmark. measure parallelism batchSize pageSize query filters = let batches = take parallelism $ chunksOf batchSize filters in forConcurrently batches $ run "Query" pageSize query +-- | Run a benchmark client. run :: String + -- ^ Label for the benchmark. -> Int + -- ^ Page size for each `Query` client. -> String + -- ^ Label for the contract filter. -> [ContractFilter] + -- ^ Contract filters to query. -> MarloweT IO Benchmark + -- ^ Action to run the benchmark. run metric pageSize query filters = do start <- liftIO getPOSIXTime Statistics{..} <- foldlM ((runMarloweQueryClient .) . benchmark start pageSize) def filters let seconds = realToFrac duration - queriesPerSecond = fromInteger queries / seconds - pagesPerSecond = fromInteger pages / seconds - contractsPerSecond = fromInteger contracts / seconds + queriesPerSecond = realToFrac queries / seconds + pagesPerSecond = realToFrac pages / seconds + contractsPerSecond = realToFrac contracts / seconds pure Benchmark{..} +-- | Run the benchmark. benchmark :: (MonadIO m) - => NominalDiffTime + => NominalDiffTime -- When the benchmark started. -> Int + -- ^ Page size for each `Query` client. -> Statistics + -- ^ The statistics so far. -> ContractFilter + -- ^ The contract filter. -> MarloweQueryClient m Statistics + -- ^ Action to run the benchmark. benchmark start pageSize initial@Statistics{queries} cFilter = let accumulate = (. Query.getContractHeaders cFilter) . (=<<) . handleNextPage handleNextPage stats Nothing = pure stats @@ -86,7 +106,7 @@ benchmark start pageSize initial@Statistics{queries} cFilter = let stats' = stats { pages = pages + 1 - , contracts = contracts + toInteger (length items) + , contracts = contracts + length items , duration = now - start } case nextRange of diff --git a/marlowe-benchmark/app/Language/Marlowe/Runtime/Benchmark/Sync.hs b/marlowe-benchmark/app/Language/Marlowe/Runtime/Benchmark/Sync.hs index 3700f4404a..b617fa450f 100644 --- a/marlowe-benchmark/app/Language/Marlowe/Runtime/Benchmark/Sync.hs +++ b/marlowe-benchmark/app/Language/Marlowe/Runtime/Benchmark/Sync.hs @@ -3,7 +3,9 @@ {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TypeFamilies #-} +-- | Benchmark for the `Sync` protocol. module Language.Marlowe.Runtime.Benchmark.Sync ( + -- * Benchmarking Benchmark (..), measure, ) where @@ -47,8 +49,8 @@ data Benchmark = Benchmark deriving (Eq, Generic, Ord, Show, ToJSON) data Statistics v = Statistics - { contracts :: Integer - , steps :: Integer + { contracts :: Int + , steps :: Int , duration :: NominalDiffTime , version :: MarloweVersion v } @@ -57,37 +59,50 @@ data Statistics v = Statistics instance (IsMarloweVersion v) => Default (Statistics v) where def = Statistics def def 0 marloweVersion +-- | Measure the performance of the protocol. measure :: Int + -- ^ Number of parallel clients for `Sync` protocol. -> Int + -- ^ Number of contracts to be read by each `Sync` client. -> S.Set ContractId -> MarloweT IO [Benchmark] + -- ^ Action for running the benchmark. measure parallelism batchSize contractIds = let batches = take parallelism . chunksOf batchSize $ toList contractIds in forConcurrently batches $ run "Sync" +-- | Run a benchmark client. run :: String + -- ^ Label for the benchmark. -> [ContractId] + -- ^ Contracts to be synced. -> MarloweT IO Benchmark + -- ^ Action for running the benchmark. run metric contractIds = do start <- liftIO getPOSIXTime Statistics{..} <- foldlM ((runMarloweSyncClient .) . benchmark start) (def :: Statistics 'V1) contractIds let seconds = realToFrac duration - contractsPerSecond = fromInteger contracts / seconds - stepsPerSecond = fromInteger steps / seconds + contractsPerSecond = realToFrac contracts / seconds + stepsPerSecond = realToFrac steps / seconds pure Benchmark{..} +-- | Run the benchmark. benchmark :: forall v m . (IsMarloweVersion v) => (MonadIO m) => NominalDiffTime + -- ^ When the benchmark started. -> Statistics v + -- ^ The statistics so far. -> ContractId + -- ^ The contract to be synced. -> MarloweSyncClient m (Statistics v) + -- ^ Action for running the benchmark. benchmark start initial@Statistics{contracts} contractId = let clientInit = SendMsgFollowContract @@ -118,7 +133,7 @@ benchmark start initial@Statistics{contracts} contractId = pure . clientIdle version $ stats - { steps = toInteger (length steps') + steps + { steps = steps + length steps' , duration = now - start } , recvMsgWait = diff --git a/marlowe-benchmark/app/Main.hs b/marlowe-benchmark/app/Main.hs index 44c3052ee9..a2bc5d3cca 100644 --- a/marlowe-benchmark/app/Main.hs +++ b/marlowe-benchmark/app/Main.hs @@ -1,20 +1,29 @@ +{-# LANGUAGE LambdaCase #-} + +-- | Execute Benchmarks. module Main ( + -- * Entry point main, ) where -import Language.Marlowe.Runtime.Benchmark (BenchmarkConfig (..), measure) +import Data.Aeson (eitherDecodeFileStrict') +import Data.Default (def) +import Language.Marlowe.Runtime.Benchmark (measure) import Language.Marlowe.Runtime.Client (connectToMarloweRuntime) +import System.Environment (getArgs) +-- | Execute the benchmarks. main :: IO () main = - connectToMarloweRuntime "localhost" 13700 - . measure - $ BenchmarkConfig - { headerSyncParallelism = 3 - , maxContracts = 100 - , syncParallelism = 3 - , syncBatchSize = 20 - , queryParallelism = 3 - , queryBatchSize = 1 - , queryPageSize = 50 - } + do + (host, port, config) <- + getArgs >>= \case + [] -> pure ("localhost", 3700, def) + [h] -> pure (h, 3700, def) + [h, p] -> pure (h, read p, def) + [h, p, c] -> + eitherDecodeFileStrict' c >>= \case + Right c' -> pure (h, read p, c') + Left e -> error e + _ -> error "USAGE: marlowe-benchmark [host [port [configfile]]]" + connectToMarloweRuntime host port $ measure config diff --git a/marlowe-benchmark/changelog.d/20240104_144839_brian.bush_PLT_9014.rst b/marlowe-benchmark/changelog.d/20240104_144839_brian.bush_PLT_9014.rst new file mode 100644 index 0000000000..b17de916d5 --- /dev/null +++ b/marlowe-benchmark/changelog.d/20240104_144839_brian.bush_PLT_9014.rst @@ -0,0 +1,4 @@ +Added +----- + +- Implemented basic benchmarking for Marlowe Runtime sync and query protocols. diff --git a/marlowe-benchmark/marlowe-benchmark.cabal b/marlowe-benchmark/marlowe-benchmark.cabal index 8575675c4a..ae2cf0f15e 100644 --- a/marlowe-benchmark/marlowe-benchmark.cabal +++ b/marlowe-benchmark/marlowe-benchmark.cabal @@ -62,6 +62,7 @@ executable marlowe-benchmark main-is: Main.hs other-modules: Language.Marlowe.Runtime.Benchmark + Language.Marlowe.Runtime.Benchmark.BulkSync Language.Marlowe.Runtime.Benchmark.HeaderSync Language.Marlowe.Runtime.Benchmark.Query Language.Marlowe.Runtime.Benchmark.Sync