Skip to content

Commit

Permalink
Add configuration option that switches the indexer.
Browse files Browse the repository at this point in the history
  • Loading branch information
raduom committed Aug 8, 2022
1 parent c9ed83e commit 115a7ed
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 36 deletions.
1 change: 1 addition & 0 deletions plutus-pab-executables/plutus-pab.yaml.sample
Expand Up @@ -10,6 +10,7 @@ pabWebserverConfig:
# available. If this is not set, calls to unavailable endpoints fail
# immediately.
endpointTimeout: 5
enableMarconi: False

walletServerConfig:
tag: LocalWalletConfig
Expand Down
91 changes: 61 additions & 30 deletions plutus-pab/src/Plutus/PAB/Core/ContractInstance/BlockchainEnv.hs
Expand Up @@ -17,6 +17,7 @@ import Cardano.Protocol.Socket.Client (ChainSyncEvent (..))
import Cardano.Protocol.Socket.Client qualified as Client
import Cardano.Protocol.Socket.Mock.Client qualified as MockClient
import Control.Lens.Operators
import Control.Monad (when)
import Data.Either (isLeft)
import Data.IORef (newIORef, readIORef, writeIORef)
import Data.List (findIndex)
Expand All @@ -27,23 +28,25 @@ import Data.Text (unpack)
import Ledger (Block, Slot (..), TxId (..))
import Marconi.Index.TxConfirmationStatus (TxInfo (..))
import Marconi.Index.TxConfirmationStatus qualified as Ix
import Plutus.ChainIndex.TxIdState qualified as TxIdState
import Plutus.HystericalScreams.Index.VSqlite qualified as Ix
import Plutus.PAB.Core.ContractInstance.STM (BlockchainEnv (..), InstanceClientEnv (..), InstancesState,
OpenTxOutProducedRequest (..), OpenTxOutSpentRequest (..),
emptyBlockchainEnv, getIndexerTxChanges, getUtxoIndexTxChanges)
emptyBlockchainEnv, getUtxoIndexTxChanges)
import Plutus.PAB.Core.ContractInstance.STM qualified as S
import Plutus.Trace.Emulator.ContractInstance (IndexedBlock (..), indexBlock)

import Plutus.PAB.Types (Config (Config), DbConfig (DbConfig, dbConfigFile),
DevelopmentOptions (DevelopmentOptions, pabResumeFrom, pabRollbackHistory), dbConfig,
developmentOptions, nodeServerConfig)
DevelopmentOptions (DevelopmentOptions, pabResumeFrom, pabRollbackHistory),
WebserverConfig (WebserverConfig, enableMarconi), dbConfig, developmentOptions,
nodeServerConfig, pabWebserverConfig)

import Cardano.Node.Types (NodeMode (..),
PABServerConfig (PABServerConfig, pscNetworkId, pscNodeMode, pscSlotConfig, pscSocketPath))
import Control.Concurrent.STM (STM)
import Control.Concurrent.STM qualified as STM
import Control.Lens
import Control.Monad (forM_, void, when)
import Control.Monad (forM_, void)
import Control.Tracer (nullTracer)
import Data.Foldable (foldl')
import Data.Maybe (catMaybes, maybeToList)
Expand Down Expand Up @@ -77,6 +80,8 @@ startNodeClient config instancesState = do
, pabResumeFrom = resumePoint
}
, dbConfig = DbConfig { dbConfigFile = dbFile }
, pabWebserverConfig =
WebserverConfig { enableMarconi = useMarconiIndexer }
} = config
params <- Params.fromPABServerConfig $ nodeServerConfig config
env <- STM.atomically $ emptyBlockchainEnv pabRollbackHistory params
Expand All @@ -86,8 +91,13 @@ startNodeClient config instancesState = do
(\block slot -> handleSyncAction $ processMockBlock instancesState env block slot
)
AlonzoNode -> do
utxoIx <- Ix.open (unpack dbFile) (Ix.Depth 2160) >>= newIORef
let env' = env { beTxChanges = Right utxoIx }
env' <-
if useMarconiIndexer
then do
utxoIx <- Ix.open (unpack dbFile) (Ix.Depth 2160) >>= newIORef
pure $ env { beTxChanges = Right utxoIx }
else do
pure env
let resumePoints = maybeToList $ toCardanoPoint resumePoint
void $ Client.runChainSync socket nullTracer slotConfig networkId resumePoints
(\block -> do
Expand Down Expand Up @@ -137,23 +147,26 @@ processChainSyncEvent
-> BlockchainEnv
-> ChainSyncEvent
-> IO (Either SyncActionFailure (Slot, BlockNumber))
processChainSyncEvent instancesState blockchainEnv event = do
processChainSyncEvent instancesState env@BlockchainEnv{beTxChanges} event = do
case event of
Resume _ -> STM.atomically $ Right <$> blockAndSlot blockchainEnv
Resume _ -> STM.atomically $ Right <$> blockAndSlot env
RollForward (BlockInMode (C.Block header transactions) era) _ ->
withIsCardanoEra era (processBlock instancesState header blockchainEnv transactions era)
withIsCardanoEra era (processBlock instancesState header env transactions era)
RollBackward chainPoint _ -> do
-- Rollback the index
ix' <- readIORef $ getIndexerTxChanges blockchainEnv
events <- concat <$> Ix.getEvents (ix' ^. Ix.storage)
-- TODO: Stop ignoring errors.
let nextIx = fromMaybe ix' $ do
slot <- chainPointToSlotNo chainPoint
offset <- findIndex (\(TxInfo _ _ sn) -> sn < slot) events
Ix.rewind offset ix'
writeIORef (getIndexerTxChanges blockchainEnv) nextIx

STM.atomically $ runRollback blockchainEnv chainPoint
either (const $ pure ())
(\ixRef -> do
-- Rollback the index
ix' <- readIORef ixRef
events <- concat <$> Ix.getEvents (ix' ^. Ix.storage)
-- TODO: Stop ignoring errors.
let nextIx = fromMaybe ix' $ do
slot <- chainPointToSlotNo chainPoint
offset <- findIndex (\(TxInfo _ _ sn) -> sn < slot) events
Ix.rewind offset ix'
writeIORef ixRef nextIx)
beTxChanges

STM.atomically $ runRollback env chainPoint

data SyncActionFailure
= RollbackFailure RollbackFailed
Expand All @@ -162,24 +175,38 @@ data SyncActionFailure

-- | Roll back the chain to the given ChainPoint and slot.
runRollback :: BlockchainEnv -> ChainPoint -> STM (Either SyncActionFailure (Slot, BlockNumber))
runRollback env@BlockchainEnv{beLastSyncedBlockSlot, beTxOutChanges} chainPoint = do
runRollback env@BlockchainEnv{beTxChanges, beLastSyncedBlockSlot, beTxOutChanges} chainPoint = do
currentSlot <- STM.readTVar beLastSyncedBlockSlot
txOutBalanceStateIndex <- STM.readTVar beTxOutChanges

let point = fromCardanoPoint chainPoint
rs = TxOutBalance.rollback point txOutBalanceStateIndex
rs' = TxOutBalance.rollback point txOutBalanceStateIndex
-- Check to see if the rollback is just through a sequence of empty blocks ending at the tip.
emptyRollBack =
point > tipAsPoint (viewTip txOutBalanceStateIndex)
&& pointSlot point <= currentSlot

rs <- case beTxChanges of
Left ix' -> do
txIdStateIndex <- STM.readTVar ix'
pure $ TxIdState.rollback point txIdStateIndex
Right _ ->
pure $ Right RollbackResult { newTip = TipAtGenesis
, rolledBackIndex = mempty
}
if emptyRollBack
then Right <$> blockAndSlot env
else case rs of
Left e' -> pure $ Left (RollbackFailure e')
Right RollbackResult{rolledBackIndex=rolledBackTxOutBalanceStateIndex} -> do
STM.writeTVar beTxOutChanges rolledBackTxOutBalanceStateIndex
Right <$> blockAndSlot env
Left e -> pure $ Left (RollbackFailure e)
Right RollbackResult{rolledBackIndex=rolledBackTxIdStateIndex} -> do
case rs' of
Left e' -> pure $ Left (RollbackFailure e')
Right RollbackResult{rolledBackIndex=rolledBackTxOutBalanceStateIndex} -> do
STM.writeTVar beTxOutChanges rolledBackTxOutBalanceStateIndex
either (\ix' -> STM.writeTVar ix' rolledBackTxIdStateIndex)
(const $ pure ())
beTxChanges
Right <$> blockAndSlot env

-- | Get transaction ID and validity from a transaction.
txEvent :: ChainIndexTx -> (TxId, TxOutBalance, TxValidity)
Expand All @@ -197,7 +224,7 @@ processBlock :: forall era. C.IsCardanoEra era
-> [C.Tx era]
-> C.EraInMode era C.CardanoMode
-> IO (Either SyncActionFailure (Slot, BlockNumber))
processBlock instancesState header env transactions era = do
processBlock instancesState header env@BlockchainEnv{beTxChanges} transactions era = do
let C.BlockHeader (C.SlotNo slot) _ _ = header
tip = fromCardanoBlockHeader header
-- We ignore cardano transactions that we couldn't convert to
Expand All @@ -213,9 +240,13 @@ processBlock instancesState header env transactions era = do
updateInstances (indexBlock ciTxs) instEnv
updateEmulatorTransactionState tip env (txEvent <$> ciTxs)

ix' <- readIORef $ getIndexerTxChanges env
nextIx <- Ix.insert (mkEvent tip <$> ciTxs) ix'
writeIORef (getIndexerTxChanges env) nextIx
either (const $ pure ())
(\ixRef -> do
ix' <- readIORef ixRef
nextIx <- Ix.insert (mkEvent tip <$> ciTxs) ix'
writeIORef ixRef nextIx)
beTxChanges

pure stmResult

mkEvent :: Tip -> ChainIndexTx -> TxInfo
Expand Down
7 changes: 1 addition & 6 deletions plutus-pab/src/Plutus/PAB/Core/ContractInstance/STM.hs
Expand Up @@ -18,7 +18,6 @@ module Plutus.PAB.Core.ContractInstance.STM(
, currentSlot
, lastSyncedBlockSlot
, getUtxoIndexTxChanges
, getIndexerTxChanges
-- * State of a contract instance
, InstanceState(..)
, emptyInstanceState
Expand Down Expand Up @@ -57,7 +56,7 @@ import Control.Concurrent.STM (STM, TMVar, TVar)
import Control.Concurrent.STM qualified as STM
import Control.Monad (guard, (<=<))
import Data.Aeson (Value)
import Data.Either (fromLeft, fromRight)
import Data.Either (fromLeft)
import Data.Foldable (fold)
import Data.IORef (IORef)
import Data.List.NonEmpty (NonEmpty)
Expand Down Expand Up @@ -167,10 +166,6 @@ getUtxoIndexTxChanges :: BlockchainEnv -> TVar (UtxoIndex TxIdState)
getUtxoIndexTxChanges BlockchainEnv{beTxChanges} =
fromLeft (error "Changes use an indexer for storage.") beTxChanges

getIndexerTxChanges :: BlockchainEnv -> IORef TCSIndex
getIndexerTxChanges BlockchainEnv{beTxChanges} =
fromRight (error "Changes use in-memory storage, not an indexer.") beTxChanges

-- | Initialise an empty 'BlockchainEnv' value
emptyBlockchainEnv :: Maybe Int -> Params -> STM BlockchainEnv
emptyBlockchainEnv rollbackHistory params =
Expand Down
2 changes: 2 additions & 0 deletions plutus-pab/src/Plutus/PAB/Types.hs
Expand Up @@ -162,6 +162,7 @@ data WebserverConfig =
, staticDir :: Maybe FilePath
, permissiveCorsPolicy :: Bool -- ^ If true; use a very permissive CORS policy (any website can interact.)
, endpointTimeout :: Maybe Second
, enableMarconi :: Bool
}
deriving (Show, Eq, Generic)
deriving anyclass (FromJSON, ToJSON)
Expand All @@ -175,6 +176,7 @@ defaultWebServerConfig =
, staticDir = Nothing
, permissiveCorsPolicy = False
, endpointTimeout = Nothing
, enableMarconi = False
}

instance Default WebserverConfig where
Expand Down

0 comments on commit 115a7ed

Please sign in to comment.