Skip to content

Commit

Permalink
PLT-9014 Benchmark for BulkSync protocol.
Browse files Browse the repository at this point in the history
  • Loading branch information
bwbush committed Jan 4, 2024
1 parent 1c8558a commit 064c7d7
Show file tree
Hide file tree
Showing 8 changed files with 252 additions and 36 deletions.
56 changes: 47 additions & 9 deletions marlowe-benchmark/app/Language/Marlowe/Runtime/Benchmark.hs
Original file line number Diff line number Diff line change
@@ -1,27 +1,39 @@
{-# LANGUAGE RecordWildCards #-}

-- | Benchmark for protocols.
module Language.Marlowe.Runtime.Benchmark (
-- * Benchmarking
BenchmarkConfig (..),
measure,
) where

import Control.Monad (forM_)
import Control.Monad (forM_, when)
import Control.Monad.IO.Class (liftIO)
import Control.Monad.Trans.Marlowe (MarloweT)
import Data.Aeson (FromJSON, ToJSON)
import Data.Default (Default (..))
import Data.Word (Word8)
import GHC.Generics (Generic)

import qualified Data.Aeson as A (encode)
import qualified Data.ByteString.Lazy.Char8 as LBS8 (putStrLn)
import qualified Language.Marlowe.Runtime.Benchmark.BulkSync as Bulk (measure)
import qualified Language.Marlowe.Runtime.Benchmark.HeaderSync as HeaderSync (measure)
import qualified Language.Marlowe.Runtime.Benchmark.Query as Query (measure)
import qualified Language.Marlowe.Runtime.Benchmark.Sync as Sync (measure)

-- | Benchmark configuration.
data BenchmarkConfig = BenchmarkConfig
{ headerSyncParallelism :: Int
-- ^ Number of parallel clients for `HeaderSync` protocol.
, maxContracts :: Int
, headerMaxContracts :: Int
-- ^ Maximum number of contracts to be read by each `HeaderSync` client.
, bulkParallelism :: Int
-- ^ Number of parallel clients for the `BulkSync` protocol.
, bulkPageSize :: Word8
-- ^ Number of blocks to fetch at a time for the `BulkSync` clients.
, bulkMaxBlocks :: Int
-- ^ Maximum number of blocks to fetch for each `BulkSync` client.
, syncParallelism :: Int
-- ^ Number of parallel clients for `Sync` protocol.
, syncBatchSize :: Int
Expand All @@ -35,16 +47,42 @@ data BenchmarkConfig = BenchmarkConfig
}
deriving (Eq, FromJSON, Generic, Ord, Show, ToJSON)

instance Default BenchmarkConfig where
def =
BenchmarkConfig
{ headerSyncParallelism = 4
, headerMaxContracts = maxBound
, bulkParallelism = 4
, bulkPageSize = 128
, bulkMaxBlocks = maxBound
, syncParallelism = 4
, syncBatchSize = 512
, queryParallelism = 4
, queryBatchSize = 16
, queryPageSize = 256
}

-- | Run the benchmarks.
measure
:: BenchmarkConfig
-> MarloweT IO ()
measure BenchmarkConfig{..} =
do
(headerSyncResults, contractIds) <- HeaderSync.measure headerSyncParallelism maxContracts
when (headerSyncParallelism == 0) $
error "At least one `HeaderSync` client must be run."
(headerSyncResults, contractIds) <- HeaderSync.measure headerSyncParallelism headerMaxContracts
liftIO . forM_ headerSyncResults $ LBS8.putStrLn . A.encode
syncResults <- Sync.measure syncParallelism syncBatchSize contractIds
liftIO . forM_ syncResults $ LBS8.putStrLn . A.encode
queryResults <-
Query.measure queryParallelism queryBatchSize queryPageSize "No policy ID" $
replicate (queryParallelism * queryBatchSize) mempty
liftIO . forM_ queryResults $ LBS8.putStrLn . A.encode
when (bulkParallelism > 0) $
do
bulkResults <- Bulk.measure bulkParallelism bulkPageSize bulkMaxBlocks
liftIO . forM_ bulkResults $ LBS8.putStrLn . A.encode
when (syncParallelism > 0) $
do
syncResults <- Sync.measure syncParallelism syncBatchSize contractIds
liftIO . forM_ syncResults $ LBS8.putStrLn . A.encode
when (queryParallelism > 0) $
do
queryResults <-
Query.measure queryParallelism queryBatchSize queryPageSize "No policy ID" $
replicate (queryParallelism * queryBatchSize) mempty
liftIO . forM_ queryResults $ LBS8.putStrLn . A.encode
116 changes: 116 additions & 0 deletions marlowe-benchmark/app/Language/Marlowe/Runtime/Benchmark/BulkSync.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
{-# LANGUAGE RecordWildCards #-}

-- | Benchmark for the `BulkSync` protocol.
module Language.Marlowe.Runtime.Benchmark.BulkSync (
-- * Benchmarking
Benchmark (..),
measure,
) where

import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.Trans.Marlowe (MarloweT)
import Data.Aeson (ToJSON)
import Data.Default (Default (..))
import Data.Time.Clock (NominalDiffTime)
import Data.Time.Clock.POSIX (getPOSIXTime)
import Data.Word (Word8)
import GHC.Generics (Generic)
import Language.Marlowe.Protocol.BulkSync.Client (
ClientStIdle (..),
ClientStNext (..),
ClientStPoll (..),
MarloweBulkSyncClient (MarloweBulkSyncClient),
)
import Language.Marlowe.Runtime.Client (runMarloweBulkSyncClient)
import Language.Marlowe.Runtime.History.Api (MarloweBlock (..), MarloweCreateTransaction (newContracts))
import UnliftIO (replicateConcurrently)

data Benchmark = Benchmark
{ metric :: String
, blocksPerSecond :: Double
, createsPerSecond :: Double
, applyInputsPerSecond :: Double
, withdrawsPerSecond :: Double
, seconds :: Double
}
deriving (Eq, Generic, Ord, Show, ToJSON)

data Statistics = Statistics
{ blocks :: Int
, creates :: Int
, applyInputs :: Int
, withdraws :: Int
, duration :: NominalDiffTime
}
deriving (Eq, Generic, Ord, Show, ToJSON)

instance Default Statistics where
def = Statistics def def def def 0

-- | Measure the performance of the protocol.
measure
:: Int
-- ^ Number of parallel clients for the `BulkSync` protocol.
-> Word8
-- ^ Number of blocks to fetch at a time for the `BulkSync` clients.
-> Int
-- ^ Maximum number of blocks to fetch for each `BulkSync` client.
-> MarloweT IO [Benchmark]
-- ^ Action for running the benchmark.
measure parallelism pageSize maxBlocks =
replicateConcurrently parallelism $
run "BulkSync" pageSize maxBlocks

-- | Run the benchmarking client.
run
:: String
-- ^ Label for the benchmark.
-> Word8
-- ^ Number of blocks to fetch at a time for the `BulkSync` clients.
-> Int
-- ^ Maximum number of blocks to fetch for each `BulkSync` client.
-> MarloweT IO Benchmark
-- ^ Action for running the benchmark.
run metric pageSize maxBlocks =
do
Statistics{..} <- runMarloweBulkSyncClient . benchmark pageSize maxBlocks =<< liftIO getPOSIXTime
let seconds = realToFrac duration
blocksPerSecond = realToFrac blocks / seconds
createsPerSecond = realToFrac creates / seconds
applyInputsPerSecond = realToFrac applyInputs / seconds
withdrawsPerSecond = realToFrac withdraws / seconds
pure Benchmark{..}

-- | Run a benchmark.
benchmark
:: (MonadIO m)
=> Word8
-- ^ Number of blocks to fetch at a time for the `BulkSync` clients.
-> Int
-- ^ Maximum number of blocks to fetch for each `BulkSync` client.
-> NominalDiffTime
-- ^ When the benchmark started.
-> MarloweBulkSyncClient m Statistics
-- ^ Action for running the benchmark.
benchmark pageSize maxBlocks start =
let idle = SendMsgRequestNext pageSize . next
next stats@Statistics{..} =
ClientStNext
{ recvMsgRollForward = \blocks' _tip ->
if blocks >= maxBlocks
then pure $ SendMsgDone stats
else do
now <- liftIO getPOSIXTime
pure $
idle
stats
{ blocks = blocks + length blocks'
, creates = creates + sum (length . mconcat . fmap newContracts . createTransactions <$> blocks')
, applyInputs = applyInputs + sum (length . applyInputsTransactions <$> blocks')
, withdraws = withdraws + sum (length . withdrawTransactions <$> blocks')
, duration = now - start
}
, recvMsgRollBackward = \_point _tip -> pure $ idle stats
, recvMsgWait = pure . SendMsgCancel $ SendMsgDone stats
}
in MarloweBulkSyncClient . pure $ idle def
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
{-# LANGUAGE RecordWildCards #-}

-- | Benchmark for the `HeaderSync` protocol.
module Language.Marlowe.Runtime.Benchmark.HeaderSync (
-- * Benchmarking
Benchmark (..),
measure,
) where
Expand Down Expand Up @@ -35,7 +37,7 @@ data Benchmark = Benchmark
deriving (Eq, Generic, Ord, Show, ToJSON)

data Statistics = Statistics
{ blocks :: Integer
{ blocks :: Int
, contracts :: S.Set ContractId
, duration :: NominalDiffTime
}
Expand All @@ -44,31 +46,42 @@ data Statistics = Statistics
instance Default Statistics where
def = Statistics def mempty 0

-- | Measure the performance of the protocol.
measure
:: Int
-- ^ Number of parallel clients for `HeaderSync` protocol.
-> Int
-- ^ Maximum number of contracts to be read by each `HeaderSync` client.
-> MarloweT IO ([Benchmark], S.Set ContractId)
measure parallelism maxContracts =
second head . unzip
<$> replicateConcurrently parallelism (run "HeaderSync" maxContracts)

-- | Run a benchmark client.
run
:: String
-- ^ Label for the benchmark.
-> Int
-- ^ Maximum number of contracts to be read by each `HeaderSync` client.
-> MarloweT IO (Benchmark, S.Set ContractId)
-- ^ Action for running the benchmark.
run metric maxContracts =
do
Statistics{..} <- runMarloweHeaderSyncClient . benchmark maxContracts =<< liftIO getPOSIXTime
let seconds = realToFrac duration
blocksPerSecond = fromInteger blocks / seconds
blocksPerSecond = realToFrac blocks / seconds
contractsPerSecond = fromIntegral (S.size contracts) / seconds
pure (Benchmark{..}, contracts)

-- | Run the benchmark.
benchmark
:: (MonadIO m)
=> Int
-- ^ Maximum number of contracts to be read by each `HeaderSync` client.
-> NominalDiffTime
-- ^ When the benchmark started.
-> MarloweHeaderSyncClient m Statistics
-- ^ Action for running the benchmark.
benchmark maxContracts start =
let clientIdle = SendMsgRequestNext . clientNext
clientWait = pure . SendMsgCancel . SendMsgDone
Expand Down
36 changes: 28 additions & 8 deletions marlowe-benchmark/app/Language/Marlowe/Runtime/Benchmark/Query.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
{-# LANGUAGE RecordWildCards #-}

-- | Benchmark for the `Query` protocol.
module Language.Marlowe.Runtime.Benchmark.Query (
-- * Benchmarking
Benchmark (..),
measure,
) where
Expand Down Expand Up @@ -32,51 +34,69 @@ data Benchmark = Benchmark
deriving (Eq, Generic, Ord, Show, ToJSON)

data Statistics = Statistics
{ queries :: Integer
, pages :: Integer
, contracts :: Integer
{ queries :: Int
, pages :: Int
, contracts :: Int
, duration :: NominalDiffTime
}
deriving (Eq, Generic, Ord, Show, ToJSON)

instance Default Statistics where
def = Statistics def def def 0

-- | Measure the performance of the protocol.
measure
:: Int
-- ^ Number of parallel clients for `Query` protocol.
-> Int
-- ^ Number of queries to be executed by each `Query` client.
-> Int
-- ^ Page size for each `Query` client.
-> String
-- ^ Label for the contract filter.
-> [ContractFilter]
-- ^ Contract filters to query.
-> MarloweT IO [Benchmark]
-- ^ Action to run the benchmark.
measure parallelism batchSize pageSize query filters =
let batches = take parallelism $ chunksOf batchSize filters
in forConcurrently batches $
run "Query" pageSize query

-- | Run a benchmark client.
run
:: String
-- ^ Label for the benchmark.
-> Int
-- ^ Page size for each `Query` client.
-> String
-- ^ Label for the contract filter.
-> [ContractFilter]
-- ^ Contract filters to query.
-> MarloweT IO Benchmark
-- ^ Action to run the benchmark.
run metric pageSize query filters =
do
start <- liftIO getPOSIXTime
Statistics{..} <- foldlM ((runMarloweQueryClient .) . benchmark start pageSize) def filters
let seconds = realToFrac duration
queriesPerSecond = fromInteger queries / seconds
pagesPerSecond = fromInteger pages / seconds
contractsPerSecond = fromInteger contracts / seconds
queriesPerSecond = realToFrac queries / seconds
pagesPerSecond = realToFrac pages / seconds
contractsPerSecond = realToFrac contracts / seconds
pure Benchmark{..}

-- | Run the benchmark.
benchmark
:: (MonadIO m)
=> NominalDiffTime
=> NominalDiffTime -- When the benchmark started.
-> Int
-- ^ Page size for each `Query` client.
-> Statistics
-- ^ The statistics so far.
-> ContractFilter
-- ^ The contract filter.
-> MarloweQueryClient m Statistics
-- ^ Action to run the benchmark.
benchmark start pageSize initial@Statistics{queries} cFilter =
let accumulate = (. Query.getContractHeaders cFilter) . (=<<) . handleNextPage
handleNextPage stats Nothing = pure stats
Expand All @@ -86,7 +106,7 @@ benchmark start pageSize initial@Statistics{queries} cFilter =
let stats' =
stats
{ pages = pages + 1
, contracts = contracts + toInteger (length items)
, contracts = contracts + length items
, duration = now - start
}
case nextRange of
Expand Down

0 comments on commit 064c7d7

Please sign in to comment.