Skip to content

Commit

Permalink
Test an inefficient TxStatus index
Browse files Browse the repository at this point in the history
  • Loading branch information
raduom committed May 23, 2022
1 parent 1d7dc28 commit f6cbad9
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 7 deletions.
36 changes: 29 additions & 7 deletions plutus-streaming/app/Main.hs
Expand Up @@ -7,11 +7,17 @@ module Main where
import Cardano.Api (Block (Block), BlockInMode (BlockInMode), ChainPoint (ChainPoint, ChainPointAtGenesis),
NetworkId (Mainnet, Testnet), NetworkMagic (NetworkMagic), SlotNo (SlotNo))
import Cardano.Api.Extras ()
import Control.Concurrent.MVar (MVar, newMVar, putMVar, takeMVar)
import Data.Aeson.Text qualified as Aeson
import Data.Text.Lazy qualified as TL
import Index.Sqlite (SqliteIndex, insert)
import Index.TxIdStatus (openIx)
import Ledger.TxId (TxId)
import Options.Applicative (Alternative ((<|>)), Parser, auto, execParser, flag', help, helper, info, long, metavar,
option, str, strOption, (<**>))
import Plutus.Streaming (ChainSyncEvent (RollBackward, RollForward), withSimpleChainSyncEventStream)
import Plutus.ChainIndex.Types (TxConfirmedState)
import Plutus.Streaming (ChainSyncEvent (RollBackward, RollForward), SimpleChainSyncEvent,
withSimpleChainSyncEventStream)
import Streaming.Prelude qualified as S

--
Expand Down Expand Up @@ -66,20 +72,36 @@ chainPointParser =
-- Main
--

type TxIndex = SqliteIndex SimpleChainSyncEvent () TxId TxConfirmedState

-- | Process a chain sync event that we receive from the alonzo node client
processChainSyncEvent
:: SimpleChainSyncEvent
-> MVar TxIndex
-> IO ()
processChainSyncEvent event mix = do
currentIx <- takeMVar mix
nextIx <- insert event currentIx
putMVar mix nextIx

main :: IO ()
main = do
Options {optionsSocketPath, optionsNetworkId, optionsChainPoint} <-
execParser $ info (optionsParser <**> helper) mempty
ix <- openIx "txs.db"
mix <- newMVar ix

withSimpleChainSyncEventStream
optionsSocketPath
optionsNetworkId
optionsChainPoint
$ S.stdoutLn
. S.map
( \case
RollForward (BlockInMode (Block header _txs) _era) _ct ->
"RollForward, header: " <> TL.unpack (Aeson.encodeToLazyText header)
RollBackward cp _ct ->
"RollBackward, point: " <> TL.unpack (Aeson.encodeToLazyText cp)
. S.mapM
( \evt -> do
case evt of
RollForward (BlockInMode (Block header transactions) _era) _ct -> do
_ <- processChainSyncEvent evt mix
pure $ "RollForward, header: " <> TL.unpack (Aeson.encodeToLazyText header) <> " Transactions: " <> show (length transactions)
RollBackward cp _ct ->
pure $ "RollBackward, point: " <> TL.unpack (Aeson.encodeToLazyText cp)
)
10 changes: 10 additions & 0 deletions plutus-streaming/plutus-streaming.cabal
Expand Up @@ -19,6 +19,7 @@ common lang
LambdaCase
ScopedTypeVariables
StandaloneDeriving
GADTs
ghc-options:
-Wall
-Widentities
Expand All @@ -34,6 +35,7 @@ library
hs-source-dirs: src
exposed-modules:
Cardano.Api.Extras
Index.TxIdStatus
Plutus.Streaming
Plutus.Streaming.ChainIndex
Plutus.Streaming.LedgerState
Expand All @@ -44,7 +46,11 @@ library
bytestring,
cardano-api,
containers,
hysterical-screams,
plutus-chain-index-core,
plutus-ledger,
plutus-ledger-api,
sqlite-simple,
stm,
streaming,
ouroboros-network,
Expand All @@ -59,6 +65,10 @@ executable plutus-streaming-cli
base >=4.9 && <5,
aeson,
cardano-api,
hysterical-screams,
plutus-chain-index-core,
plutus-ledger,
plutus-ledger-api,
optparse-applicative,
streaming,
text
87 changes: 87 additions & 0 deletions plutus-streaming/src/Index/TxIdStatus.hs
@@ -0,0 +1,87 @@
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}

module Index.TxIdStatus where

import Cardano.Api (Block (Block), BlockHeader (BlockHeader), BlockInMode (BlockInMode), BlockNo (BlockNo), CardanoMode)
import Cardano.Api qualified as C
import Control.Monad (forM_)
import Data.Map (Map)
import Data.Map qualified as Map
import Data.Maybe (catMaybes, fromJust)
import Data.Monoid (Last (Last), Sum (Sum, getSum))
import Database.SQLite.Simple (execute, execute_)
import Index.Split (SplitIndex (SplitIndex, siBuffered, siHandle))
import Index.Sqlite (SqliteIndex)
import Index.Sqlite qualified as Ix
import Ledger.TxId (TxId)
import Plutus.ChainIndex.Tx (ChainIndexTx (_citxTxId))
import Plutus.ChainIndex.Types (BlockNumber (BlockNumber),
TxConfirmedState (TxConfirmedState, blockAdded, timesConfirmed, validity),
TxValidity (TxValid))
import Plutus.Contract.CardanoAPI (fromCardanoTx)
import Plutus.Streaming (ChainSyncEvent (RollForward), SimpleChainSyncEvent)

type TxStatusIndex = SqliteIndex SimpleChainSyncEvent () TxId TxConfirmedState

openIx :: FilePath -> IO TxStatusIndex
openIx path =
fromJust <$> Ix.new query onInsert store 3000 path

-- Ignore notifications for now
onInsert :: SimpleChainSyncEvent -> TxStatusIndex -> IO [()]
onInsert _ _ = pure []

-- No one will query this for now.
query :: TxStatusIndex -> TxId -> [SimpleChainSyncEvent] -> IO TxConfirmedState
query = undefined

store :: TxStatusIndex -> IO ()
store SplitIndex{siHandle, siBuffered} = do
let bufferedTxs = foldTxs $ getTxs . getBlocks <$> siBuffered
execute_ siHandle "CREATE TABLE IF NOT EXISTS tx_state (txid TEXT PRIMARY KEY, confirmations INTEGER)"
execute_ siHandle "BEGIN"
forM_ (Map.assocs bufferedTxs) $
\(txid, v) -> execute siHandle "INSERT INTO tx_state (txid, confirmations) VALUES (?, ?)" (show txid, getSum $ timesConfirmed v)
-- This will really work your SSD to death, and it is not very useful, since
-- all txs that are persisted are settled.
-- execute siHandle "UPDATE tx_state SET confirmations = confirmations + ?" (Only $ Map.size bufferedTxs)
execute_ siHandle "COMMIT"

getBlocks :: SimpleChainSyncEvent -> BlockInMode CardanoMode
getBlocks (RollForward block _tip) = block
getBlocks _ = error "This should never happen"

-- We won't have any rollbacks in the buffered events since those
-- blocks have settled
getTxs :: BlockInMode CardanoMode -> (BlockNo, [TxId])
getTxs (BlockInMode (Block header transactions) era) =
case era of
C.ByronEraInCardanoMode -> go header transactions era
C.ShelleyEraInCardanoMode -> go header transactions era
C.AllegraEraInCardanoMode -> go header transactions era
C.MaryEraInCardanoMode -> go header transactions era
C.AlonzoEraInCardanoMode -> go header transactions era
where
go :: forall era. C.IsCardanoEra era
=> C.BlockHeader
-> [C.Tx era]
-> C.EraInMode era C.CardanoMode
-> (BlockNo, [TxId])
go (BlockHeader _ _ blockNo) txs era' =
(blockNo, _citxTxId <$> catMaybes (either (const Nothing) Just . fromCardanoTx era' <$> txs))

foldTxs :: [(BlockNo, [TxId])] -> Map TxId TxConfirmedState
foldTxs bs = snd $ foldl go (0, Map.empty) bs
where
go :: (Int, Map TxId TxConfirmedState)
-> (BlockNo, [TxId])
-> (Int, Map TxId TxConfirmedState)
go (confirmations, acc) (_, []) = (confirmations, acc)
go (confirmations, acc) (blockNo@(BlockNo no), tx : txs) =
let acc' = Map.insert tx (TxConfirmedState { timesConfirmed = Sum confirmations
, blockAdded = Last (Just $ BlockNumber no)
, validity = Last (Just TxValid)
})
acc
in go (confirmations + 1, acc') (blockNo, txs)

0 comments on commit f6cbad9

Please sign in to comment.