Skip to content

Commit

Permalink
PLT-214 Refactored the Epoch-StakePoolDelegation indexer to the Marconi
Browse files Browse the repository at this point in the history
interface.

* Rewrote the Epoch-StakePoolDelegation using the marconi-core interface
  and to make it support rollbacks and resuming.

* Computed the LedgerState directly with functions in
  `ouroboros-network` instead of using `cardano-api` and
  `cardano-streaming` (allows more fined grained control of what to
  extract).

* Moved orphan instances of EpochStakepoolSize indexer to the Orphans
  module and added rountrip ToField/FromField tests.

* Added LedgerState serialization/deserialization functions in Orphans
  module.
  • Loading branch information
koslambrou committed Mar 15, 2023
1 parent ef102f5 commit a069152
Show file tree
Hide file tree
Showing 13 changed files with 1,335 additions and 111 deletions.
19 changes: 14 additions & 5 deletions cardano-streaming/src/Cardano/Streaming.hs
@@ -1,5 +1,5 @@
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiWayIf #-}

module Cardano.Streaming
( withChainSyncEventStream
, CS.ChainSyncEvent (..)
Expand Down Expand Up @@ -180,15 +180,19 @@ ledgerStatesPipelined pipelineSize config socket validationMode = do
-- keeps waiting for more blocks when chainsync server and client are
-- fully synchronized.
foldLedgerState
:: C.Env -> LedgerStateHistory -> C.ValidationMode
:: C.Env
-> LedgerStateHistory
-> C.ValidationMode
-> S.Stream (S.Of (CS.ChainSyncEvent (C.BlockInMode C.CardanoMode))) IO r
-> S.Stream (S.Of C.LedgerState) IO r
foldLedgerState env initialLedgerStateHistory validationMode =
S.map (fst . snd) . foldLedgerStateEvents env initialLedgerStateHistory validationMode

-- | Like `foldLedgerState`, but also produces blocks and `C.LedgerEvent`s.
foldLedgerStateEvents
:: C.Env -> LedgerStateHistory -> C.ValidationMode
:: C.Env
-> LedgerStateHistory
-> C.ValidationMode
-> S.Stream (S.Of (CS.ChainSyncEvent (C.BlockInMode C.CardanoMode))) IO r
-> S.Stream (S.Of (C.BlockInMode C.CardanoMode, LedgerStateEvents)) IO r
foldLedgerStateEvents env initialLedgerStateHistory validationMode = loop initialLedgerStateHistory
Expand All @@ -204,7 +208,7 @@ foldLedgerStateEvents env initialLedgerStateHistory validationMode = loop initia
Left r -> pure r
Right (chainSyncEvent, source') -> do
ledgerStateHistory' <- case chainSyncEvent of
CS.RollForward (blockInMode@(C.BlockInMode block _)) _ct -> do
CS.RollForward blockInMode@(C.BlockInMode block _) _ct -> do
newLedgerState <- liftIO $ applyBlock_ (getLastLedgerState ledgerStateHistory) block
let (ledgerStateHistory', committedStates) = pushLedgerState env ledgerStateHistory (CS.bimSlotNo blockInMode) newLedgerState blockInMode
forM_ committedStates $ \(_, (ledgerState, ledgerEvents), currBlockMay) -> case currBlockMay of
Expand All @@ -224,7 +228,12 @@ getEnvAndInitialLedgerStateHistory configPath = do
return (env, initialLedgerStateHistory)


applyBlockThrow :: C.Env -> C.LedgerState -> C.ValidationMode -> C.Block era -> IO (C.LedgerState, [C.LedgerEvent])
applyBlockThrow
:: C.Env
-> C.LedgerState
-> C.ValidationMode
-> C.Block era
-> IO (C.LedgerState, [C.LedgerEvent])
applyBlockThrow env ledgerState validationMode block = case C.applyBlock env ledgerState validationMode block of
Left err -> IO.throw err
Right ls -> pure ls
Expand Down
4 changes: 2 additions & 2 deletions marconi-chain-index/app/Main.hs
Expand Up @@ -19,12 +19,12 @@ main = do
, (Indexers.mintBurnWorker (\_ -> pure ()), Cli.mintBurnDbPath o)
] <> case Cli.optionsNodeConfigPath o of
Just configPath ->
[(Indexers.epochStakepoolSizeWorker configPath, Cli.epochStakepoolSizeDbPath o)]
[(Indexers.epochStakepoolSizeWorker' configPath (const $ pure ()), Cli.epochStakepoolSizeDbPath o)]
Nothing -> []

Indexers.runIndexers
(Cli.optionsSocketPath o)
(Cli.optionsNetworkId o)
(Cli.optionsChainPoint o)
"marconi"
"marconi-chain-index"
indexers
17 changes: 17 additions & 0 deletions marconi-chain-index/marconi-chain-index.cabal
Expand Up @@ -53,13 +53,16 @@ library
Marconi.ChainIndex.Indexers
Marconi.ChainIndex.Indexers.AddressDatum
Marconi.ChainIndex.Indexers.Datum
Marconi.ChainIndex.Indexers.EpochSPD
Marconi.ChainIndex.Indexers.EpochStakepoolSize
Marconi.ChainIndex.Indexers.MintBurn
Marconi.ChainIndex.Indexers.ScriptTx
Marconi.ChainIndex.Indexers.Utxo
Marconi.ChainIndex.Logging
Marconi.ChainIndex.Node.Client.GenesisConfig
Marconi.ChainIndex.Orphans
Marconi.ChainIndex.Types
Marconi.ChainIndex.Utils

--------------------
-- Local components
Expand All @@ -74,12 +77,22 @@ library
build-depends:
, cardano-api
, cardano-binary
, cardano-crypto-class
, cardano-crypto-wrapper
, cardano-data
, cardano-ledger-alonzo
, cardano-ledger-babbage
, cardano-ledger-byron
, cardano-ledger-core
, cardano-ledger-shelley
, cardano-ledger-shelley-ma
, cardano-protocol-tpraos
, cardano-slotting
, iohk-monitoring
, ouroboros-consensus
, ouroboros-consensus-byron
, ouroboros-consensus-protocol
, ouroboros-network

------------------------
-- Non-IOG dependencies
Expand All @@ -90,7 +103,9 @@ library
, base
, base16-bytestring
, bytestring
, cborg
, containers
, directory
, filepath
, lens
, mwc-random
Expand All @@ -106,7 +121,9 @@ library
, text
, time
, transformers
, transformers-except
, vector-map
, yaml

library json-rpc
import: lang
Expand Down
55 changes: 55 additions & 0 deletions marconi-chain-index/performance/monitor-marconi-sync-all.sh
@@ -0,0 +1,55 @@
#!/usr/bin/env bash

CURRENT_SCRIPT_PATH=$0
CURRENT_SCRIPT_BASEPATH=$(dirname "$CURRENT_SCRIPT_PATH")

monitor_indexer () {
indexer=$1
network=$2

case $network in
mainnet)
networkid="764824073"
echo $networkid
indexers_cli="--disable-datum --disable-script-tx --disable-address-datum --disable-mintburn --disable-epoch-stakepool-size"
;;
preprod)
networkid="1"
echo $networkid
indexers_cli="--disable-datum --disable-script-tx --disable-utxo --disable-address-datum --disable-mintburn"
;;
esac

echo "Syncing ${indexer} indexer..."
case $indexer in
utxo)
indexers_cli="--disable-datum --disable-script-tx --disable-address-datum --disable-mintburn --disable-epoch-stakepool-size"
;;
addressdatum)
indexers_cli="--disable-datum --disable-script-tx --disable-utxo --disable-mintburn --disable-epoch-stakepool-size"
;;
mintburn)
indexers_cli="--disable-datum --disable-script-tx --disable-utxo --disable-address-datum --disable-epoch-stakepool-size"
;;
epochsdd)
indexers_cli="--disable-datum --disable-script-tx --disable-utxo --disable-address-datum --disable-mintburn"
;;
esac

cabal build marconi-chain-index
$(cabal exec -- which marconi-chain-index) \
-s "$HOME"/cardano-node/${network}/cardano-node.socket \
--testnet-magic ${networkid} \
${indexers_cli} \
-d ~/cardano-node/${network}/marconi-chain-index \
--node-config-path "$HOME"/cardano-node/${network}/${network}-config.json >> "${CURRENT_SCRIPT_BASEPATH}"/marconi-chain-index-"${indexer}".log &
pid=$!
"${CURRENT_SCRIPT_BASEPATH}"/monitor-marconi-sync.sh "${CURRENT_SCRIPT_BASEPATH}"/marconi-chain-index-"${indexer}".log $pid >> >(tee "${CURRENT_SCRIPT_BASEPATH}"/marconi-chain-index-"${indexer}"-monitor.log)
}

if [[ $1 ]] && [ "$1" == "utxo" ] || [ "$1" == "addressdatum" ] || [ "$1" == "mintburn" ] || [ "$1" == "epochsdd" ]; then indexer=$1; else echo "provide indexer to run: utxo, addressdatum, mintburn or epochsdd (arg0)" && exit 1; fi
if [[ $2 ]] && [ "$2" == "mainnet" ] || [ "$2" == "preprod" ]; then network=$2; else echo "provide network (mainnet, preprod) (arg1)" && exit 1; fi

monitor_indexer "$indexer" "$network"

trap 'kill $(jobs -p)' EXIT
15 changes: 7 additions & 8 deletions marconi-chain-index/performance/monitor-marconi-sync.sh
@@ -1,31 +1,30 @@
#!/usr/bin/env bash

# For monitoring time to sync marconi. Logs cpu, mem and sync %.
# For monitoring time to sync marconi-chain-index. Logs cpu, mem and sync %.

# Run marconi with "> >(tee marconi-log.txt)" and then run this script with marconi's log as arg
if [[ $1 ]]; then marconi_log=$1; else echo "provide marconi log file (arg0)" && exit 1; fi
# Run marconi-chain-index with "> >(tee marconi-chain-index-log.txt)" and then run this script with marconi-chain-index's log as arg
if [[ $1 ]]; then marconi_chain_index_log=$1; else echo "provide marconi-chain-index log file (arg0)" && exit 1; fi
if [[ $2 ]]; then pid=$2; else echo "provide marconi-chain-index PID (arg1)" && exit 1; fi

startMs=$(date +%s)

pid=$(pidof marconi)

if [[ -z $pid ]]
then
echo -e "pid empty, exiting.\n"
exit 1
else
echo -e "marconi pid=$pid\n"
echo -e "marconi-chain-index pid=$pid\n"
fi

exec > >(tee monitor-marconi-log.txt)
exec > >(tee monitor-marconi-chain-index-log.txt)

echo -e "Starting monitoring at $(date)"
echo -e "Process started at $(ps -p "$pid" -o start_time=)\n"
ps -p "$pid" -o etime,%cpu,rss,%mem

while true
do
last_log=$(tail -n1 "$marconi_log")
last_log=$(tail -n1 "$marconi_chain_index_log")
sync_percent=$(echo "$last_log" | cut -d '(' -f2 | cut -d ')' -f1)
echo -e "Synced: $sync_percent"
ps -p "$pid" -o etime=,%cpu=,rss=,%mem=
Expand Down

0 comments on commit a069152

Please sign in to comment.