Skip to content

Commit

Permalink
Add indexer to BlockchainEnv.
Browse files Browse the repository at this point in the history
  • Loading branch information
raduom committed Aug 8, 2022
1 parent 80f003a commit 75d8362
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 29 deletions.
47 changes: 24 additions & 23 deletions plutus-pab/src/Plutus/PAB/Core/ContractInstance/BlockchainEnv.hs
Expand Up @@ -17,19 +17,19 @@ import Cardano.Protocol.Socket.Client (ChainSyncEvent (..))
import Cardano.Protocol.Socket.Client qualified as Client
import Cardano.Protocol.Socket.Mock.Client qualified as MockClient
import Control.Lens.Operators
import Data.FingerTree qualified as FT
import Data.IORef (IORef, newIORef, readIORef, writeIORef)
import Data.Either (isLeft)
import Data.IORef (newIORef, readIORef, writeIORef)
import Data.List (findIndex)
import Data.Map qualified as Map
import Data.Maybe (fromMaybe)
import Data.Monoid (Last (..), Sum (..))
import Index.VSqlite qualified as Ix
import Ledger (Block, Slot (..), TxId (..))
import Marconi.Index.TxConfirmationStatus (TCSIndex, TxInfo (..))
import Marconi.Index.TxConfirmationStatus (TxInfo (..))
import Marconi.Index.TxConfirmationStatus qualified as Ix
import Plutus.PAB.Core.ContractInstance.STM (BlockchainEnv (..), InstanceClientEnv (..), InstancesState,
OpenTxOutProducedRequest (..), OpenTxOutSpentRequest (..),
emptyBlockchainEnv)
emptyBlockchainEnv, getIndexerTxChanges, getUtxoIndexTxChanges)
import Plutus.PAB.Core.ContractInstance.STM qualified as S
import Plutus.Trace.Emulator.ContractInstance (IndexedBlock (..), indexBlock)

Expand Down Expand Up @@ -84,6 +84,7 @@ startNodeClient config instancesState = do
)
AlonzoNode -> do
utxoIx <- Ix.open "./utxos.sqlite3" (Ix.Depth 2160) >>= newIORef
let env' = env { beTxChanges = Right utxoIx }
let resumePoints = maybeToList $ toCardanoPoint resumePoint
void $ Client.runChainSync socket nullTracer slotConfig networkId resumePoints
(\block -> do
Expand All @@ -93,8 +94,7 @@ startNodeClient config instancesState = do
-- useful/necessary for blocking contract actions like `awaitSlot`.
slot <- TimeSlot.currentSlot slotConfig
STM.atomically $ STM.writeTVar (beCurrentSlot env) slot

processChainSyncEvent utxoIx instancesState env block >>= handleSyncAction'
processChainSyncEvent instancesState env' block >>= handleSyncAction'
)
pure env

Expand Down Expand Up @@ -130,26 +130,25 @@ blockAndSlot BlockchainEnv{beLastSyncedBlockNo, beLastSyncedBlockSlot} =

-- | Process a chain sync event that we receive from the alonzo node client
processChainSyncEvent
:: IORef TCSIndex
-> InstancesState
:: InstancesState
-> BlockchainEnv
-> ChainSyncEvent
-> IO (Either SyncActionFailure (Slot, BlockNumber))
processChainSyncEvent utxoIx instancesState blockchainEnv event = do
processChainSyncEvent instancesState blockchainEnv event = do
case event of
Resume _ -> STM.atomically $ Right <$> blockAndSlot blockchainEnv
RollForward (BlockInMode (C.Block header transactions) era) _ ->
withIsCardanoEra era (processBlock utxoIx instancesState header blockchainEnv transactions era)
withIsCardanoEra era (processBlock instancesState header blockchainEnv transactions era)
RollBackward chainPoint _ -> do
-- Rollback the index
ix' <- readIORef utxoIx
ix' <- readIORef $ getIndexerTxChanges blockchainEnv
events <- concat <$> Ix.getEvents (ix' ^. Ix.storage)
-- TODO: Stop ignoring errors.
let nextIx = fromMaybe ix' $ do
slot <- chainPointToSlotNo chainPoint
offset <- findIndex (\(TxInfo _ _ sn) -> sn < slot) events
Ix.rewind offset ix'
writeIORef utxoIx nextIx
writeIORef (getIndexerTxChanges blockchainEnv) nextIx

STM.atomically $ runRollback blockchainEnv chainPoint

Expand Down Expand Up @@ -189,14 +188,13 @@ txEvent tx =
-- | Update the blockchain env. with changes from a new block of cardano
-- transactions in any era
processBlock :: forall era. C.IsCardanoEra era
=> IORef TCSIndex
-> InstancesState
=> InstancesState
-> C.BlockHeader
-> BlockchainEnv
-> [C.Tx era]
-> C.EraInMode era C.CardanoMode
-> IO (Either SyncActionFailure (Slot, BlockNumber))
processBlock utxoIx instancesState header env transactions era = do
processBlock instancesState header env transactions era = do
let C.BlockHeader (C.SlotNo slot) _ _ = header
tip = fromCardanoBlockHeader header
-- We ignore cardano transactions that we couldn't convert to
Expand All @@ -210,13 +208,11 @@ processBlock utxoIx instancesState header env transactions era = do
else do
instEnv <- S.instancesClientEnv instancesState
updateInstances (indexBlock ciTxs) instEnv
r <- updateEmulatorTransactionState tip env (txEvent <$> ciTxs)
STM.writeTVar (beTxChanges env) FT.empty
pure r
updateEmulatorTransactionState tip env (txEvent <$> ciTxs)

ix' <- readIORef utxoIx
ix' <- readIORef $ getIndexerTxChanges env
nextIx <- Ix.insert (mkEvent tip <$> ciTxs) ix'
writeIORef utxoIx nextIx
writeIORef (getIndexerTxChanges env) nextIx
pure stmResult

mkEvent :: Tip -> ChainIndexTx -> TxInfo
Expand Down Expand Up @@ -248,8 +244,13 @@ updateEmulatorTransactionState
}
xs = do

txIdStateIndex <- STM.readTVar beTxChanges
let useOldIndex = isLeft beTxChanges
txIdStateIndex <- case beTxChanges of
Left c -> STM.readTVar c
Right _ -> pure mempty

let txIdState = _usTxUtxoData $ utxoState txIdStateIndex

txUtxoBalanceIndex <- STM.readTVar beTxOutChanges
let txUtxoBalance = _usTxUtxoData $ utxoState txUtxoBalanceIndex
blockNumber <- STM.readTVar beLastSyncedBlockNo
Expand All @@ -261,8 +262,8 @@ updateEmulatorTransactionState
case (txIdStateInsert, txUtxoBalanceInsert) of
(Right InsertUtxoSuccess{newIndex=newTxIdState}
, Right InsertUtxoSuccess{newIndex=newTxOutBalance}) -> do -- TODO: Get tx out status another way

STM.writeTVar beTxChanges $ trimIx beRollbackHistory newTxIdState
when useOldIndex $
STM.writeTVar (getUtxoIndexTxChanges env) $ trimIx beRollbackHistory newTxIdState
STM.writeTVar beTxOutChanges $ trimIx beRollbackHistory newTxOutBalance
STM.writeTVar beLastSyncedBlockNo (succ blockNumber)
Right <$> blockAndSlot env
Expand Down
25 changes: 19 additions & 6 deletions plutus-pab/src/Plutus/PAB/Core/ContractInstance/STM.hs
Expand Up @@ -17,6 +17,8 @@ module Plutus.PAB.Core.ContractInstance.STM(
, waitForTxOutStatusChange
, currentSlot
, lastSyncedBlockSlot
, getUtxoIndexTxChanges
, getIndexerTxChanges
-- * State of a contract instance
, InstanceState(..)
, emptyInstanceState
Expand Down Expand Up @@ -55,14 +57,17 @@ import Control.Concurrent.STM (STM, TMVar, TVar)
import Control.Concurrent.STM qualified as STM
import Control.Monad (guard, (<=<))
import Data.Aeson (Value)
import Data.Either (fromLeft, fromRight)
import Data.Foldable (fold)
import Data.IORef (IORef)
import Data.List.NonEmpty (NonEmpty)
import Data.Map (Map)
import Data.Map qualified as Map
import Data.Set (Set)
import Ledger (Address, Params (pSlotConfig), Slot, TxId, TxOutRef)
import Ledger.Time (POSIXTime)
import Ledger.TimeSlot qualified as TimeSlot
import Marconi.Index.TxConfirmationStatus (TCSIndex)
import Plutus.ChainIndex (BlockNumber (BlockNumber), ChainIndexTx, TxIdState, TxOutBalance, TxOutStatus, TxStatus,
transactionStatus)
import Plutus.ChainIndex.TxOutBalance (transactionOutputStatus)
Expand Down Expand Up @@ -153,19 +158,27 @@ data BlockchainEnv =
, beCurrentSlot :: TVar Slot -- ^ Actual current slot
, beLastSyncedBlockSlot :: TVar Slot -- ^ Slot of the last synced block from 'startNodeClient'
, beLastSyncedBlockNo :: TVar BlockNumber -- ^ Last synced block number from 'startNodeClient'.
, beTxChanges :: TVar (UtxoIndex TxIdState) -- ^ Map holding metadata which determines the status of transactions.
, beTxChanges :: Either (TVar (UtxoIndex TxIdState)) (IORef TCSIndex)-- ^ Map holding metadata which determines the status of transactions.
, beTxOutChanges :: TVar (UtxoIndex TxOutBalance) -- ^ Map holding metadata which determines the status of transaction outputs.
, beParams :: Params -- ^ The set of parameters, like protocol parameters and slot configuration.
}

getUtxoIndexTxChanges :: BlockchainEnv -> TVar (UtxoIndex TxIdState)
getUtxoIndexTxChanges BlockchainEnv{beTxChanges} =
fromLeft (error "Changes use an indexer for storage.") beTxChanges

getIndexerTxChanges :: BlockchainEnv -> IORef TCSIndex
getIndexerTxChanges BlockchainEnv{beTxChanges} =
fromRight (error "Changes use in-memory storage, not an indexer.") beTxChanges

-- | Initialise an empty 'BlockchainEnv' value
emptyBlockchainEnv :: Maybe Int -> Params -> STM BlockchainEnv
emptyBlockchainEnv rollbackHistory params =
BlockchainEnv rollbackHistory
<$> STM.newTVar 0
<*> STM.newTVar 0
<*> STM.newTVar (BlockNumber 0)
<*> STM.newTVar mempty
<*> (Left <$> STM.newTVar mempty)
<*> STM.newTVar mempty
<*> pure params

Expand Down Expand Up @@ -395,8 +408,8 @@ insertInstance instanceID state (InstancesState m) = STM.modifyTVar m (Map.inser

-- | Wait for the status of a transaction to change.
waitForTxStatusChange :: TxStatus -> TxId -> BlockchainEnv -> STM TxStatus
waitForTxStatusChange oldStatus tx BlockchainEnv{beTxChanges, beLastSyncedBlockNo} = do
txIdState <- _usTxUtxoData . utxoState <$> STM.readTVar beTxChanges
waitForTxStatusChange oldStatus tx env@BlockchainEnv{beLastSyncedBlockNo} = do
txIdState <- _usTxUtxoData . utxoState <$> STM.readTVar (getUtxoIndexTxChanges env)
blockNumber <- STM.readTVar beLastSyncedBlockNo
let txStatus = transactionStatus blockNumber txIdState tx
-- Succeed only if we _found_ a status and it was different; if
Expand All @@ -408,8 +421,8 @@ waitForTxStatusChange oldStatus tx BlockchainEnv{beTxChanges, beLastSyncedBlockN

-- | Wait for the status of a transaction output to change.
waitForTxOutStatusChange :: TxOutStatus -> TxOutRef -> BlockchainEnv -> STM TxOutStatus
waitForTxOutStatusChange oldStatus txOutRef BlockchainEnv{beTxChanges, beTxOutChanges, beLastSyncedBlockNo} = do
txIdState <- _usTxUtxoData . utxoState <$> STM.readTVar beTxChanges
waitForTxOutStatusChange oldStatus txOutRef env@BlockchainEnv{beTxOutChanges, beLastSyncedBlockNo} = do
txIdState <- _usTxUtxoData . utxoState <$> STM.readTVar (getUtxoIndexTxChanges env)
txOutBalance <- _usTxUtxoData . utxoState <$> STM.readTVar beTxOutChanges
blockNumber <- STM.readTVar beLastSyncedBlockNo
let txOutStatus = transactionOutputStatus blockNumber txIdState txOutBalance txOutRef
Expand Down

0 comments on commit 75d8362

Please sign in to comment.