Skip to content


Stop using STM for storing txIdStatuses.
Browse files Browse the repository at this point in the history
  • Loading branch information
raduom committed Aug 8, 2022
1 parent 2c95b00 commit 2f04bb5
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 38 deletions.
2 changes: 0 additions & 2 deletions plutus-chain-index-core/src/Plutus/ChainIndex/UtxoState.hs
Expand Up @@ -133,8 +133,6 @@ rollbackWith
rollbackWith f PointAtGenesis after =
Right (RollbackResult TipAtGenesis (f mempty after))
-- Partial synchronisation, starting from a given block id.
-- TODO: After we implement persistent storage this should return
-- Left RollbackNoTip.
rollbackWith f _ after@(viewTip -> TipAtGenesis) =
Right (RollbackResult TipAtGenesis (f mempty after))
rollbackWith f targetPoint idx@(viewTip -> currentTip)
Expand Down
1 change: 1 addition & 0 deletions plutus-pab/plutus-pab.cabal
Expand Up @@ -176,6 +176,7 @@ library
, either
, exceptions
, filepath
, fingertree
, freer-simple
, generic-arbitrary
, http-client
Expand Down
75 changes: 39 additions & 36 deletions plutus-pab/src/Plutus/PAB/Core/ContractInstance/BlockchainEnv.hs
Expand Up @@ -16,6 +16,7 @@ import Cardano.Node.Params qualified as Params
import Cardano.Protocol.Socket.Client (ChainSyncEvent (..))
import Cardano.Protocol.Socket.Client qualified as Client
import Cardano.Protocol.Socket.Mock.Client qualified as MockClient
import Data.FingerTree qualified as FT
import Data.Map qualified as Map
import Data.Monoid (Last (..), Sum (..))
import Ledger (Block, Slot (..), TxId (..))
Expand Down Expand Up @@ -44,7 +45,6 @@ import Plutus.ChainIndex (BlockNumber (..), ChainIndexTx (..), ChainIndexTxOutpu
TxOutBalance, TxValidity (..), UtxoIndex, UtxoState (..), blockId, citxTxId, fromOnChainTx,
insert, reduceBlockCount, tipAsPoint, utxoState)
import Plutus.ChainIndex.Compatibility (fromCardanoBlockHeader, fromCardanoPoint, toCardanoPoint)
import Plutus.ChainIndex.TxIdState qualified as TxIdState
import Plutus.ChainIndex.TxOutBalance qualified as TxOutBalance
import Plutus.ChainIndex.UtxoState (viewTip)
import Plutus.Contract.CardanoAPI (fromCardanoTx, withIsCardanoEra)
Expand Down Expand Up @@ -86,22 +86,25 @@ startNodeClient config instancesState = do
slot <- TimeSlot.currentSlot slotConfig
STM.atomically $ STM.writeTVar (beCurrentSlot env) slot

handleSyncAction $ processChainSyncEvent instancesState env block
processChainSyncEvent instancesState env block >>= handleSyncAction'
pure env

-- | Deal with sync action failures from running this STM action. For now, we
-- deal with them by simply calling `error`; i.e. the application exits.
handleSyncAction :: STM (Either SyncActionFailure (Slot, BlockNumber)) -> IO ()
handleSyncAction action = do
result <- STM.atomically action
case result of
STM.atomically action >>= handleSyncAction'

handleSyncAction' :: Either SyncActionFailure (Slot, BlockNumber) -> IO ()
handleSyncAction' action = do
case action of
Left err -> putStrLn $ "handleSyncAction failed with: " <> show err
Right (Slot s, BlockNumber n) -> do
stdGen <- newStdGen
when (fst (randomR (0 :: Int, 10_000) stdGen) == 0) $
putStrLn $ "Current synced block: " <> show n <> ". Current synced slot: " <> show s
either (error . show) (const $ pure ()) result
either (error . show) (const $ pure ()) action

updateInstances :: IndexedBlock -> InstanceClientEnv -> STM ()
Expand All @@ -122,13 +125,15 @@ processChainSyncEvent
:: InstancesState
-> BlockchainEnv
-> ChainSyncEvent
-> STM (Either SyncActionFailure (Slot, BlockNumber))
-> IO (Either SyncActionFailure (Slot, BlockNumber))
processChainSyncEvent instancesState blockchainEnv event = do
case event of
Resume _ -> Right <$> blockAndSlot blockchainEnv
Resume _ -> STM.atomically $ Right <$> blockAndSlot blockchainEnv
RollForward (BlockInMode (C.Block header transactions) era) _ ->
withIsCardanoEra era (processBlock instancesState header blockchainEnv transactions era)
RollBackward chainPoint _ -> runRollback blockchainEnv chainPoint
RollBackward chainPoint _ -> do
-- TODO: Index rollback
STM.atomically $ runRollback blockchainEnv chainPoint

data SyncActionFailure
= RollbackFailure RollbackFailed
Expand All @@ -137,30 +142,24 @@ data SyncActionFailure

-- | Roll back the chain to the given ChainPoint and slot.
runRollback :: BlockchainEnv -> ChainPoint -> STM (Either SyncActionFailure (Slot, BlockNumber))
runRollback env@BlockchainEnv{beLastSyncedBlockSlot, beTxChanges, beTxOutChanges} chainPoint = do
runRollback env@BlockchainEnv{beLastSyncedBlockSlot, beTxOutChanges} chainPoint = do
currentSlot <- STM.readTVar beLastSyncedBlockSlot
txIdStateIndex <- STM.readTVar beTxChanges
txOutBalanceStateIndex <- STM.readTVar beTxOutChanges

let point = fromCardanoPoint chainPoint
rs = TxIdState.rollback point txIdStateIndex
rs' = TxOutBalance.rollback point txOutBalanceStateIndex
rs = TxOutBalance.rollback point txOutBalanceStateIndex
-- Check to see if the rollback is just through a sequence of empty blocks ending at the tip.
emptyRollBack =
point > tipAsPoint (viewTip txIdStateIndex)
point > tipAsPoint (viewTip txOutBalanceStateIndex)
&& pointSlot point <= currentSlot

if emptyRollBack
then Right <$> blockAndSlot env
else case rs of
Left e -> pure $ Left (RollbackFailure e)
Right RollbackResult{rolledBackIndex=rolledBackTxIdStateIndex} ->
case rs' of
Left e' -> pure $ Left (RollbackFailure e')
Right RollbackResult{rolledBackIndex=rolledBackTxOutBalanceStateIndex} -> do
STM.writeTVar beTxChanges rolledBackTxIdStateIndex
STM.writeTVar beTxOutChanges rolledBackTxOutBalanceStateIndex
Right <$> blockAndSlot env
Left e' -> pure $ Left (RollbackFailure e')
Right RollbackResult{rolledBackIndex=rolledBackTxOutBalanceStateIndex} -> do
STM.writeTVar beTxOutChanges rolledBackTxOutBalanceStateIndex
Right <$> blockAndSlot env

-- | Get transaction ID and validity from a transaction.
txEvent :: ChainIndexTx -> (TxId, TxOutBalance, TxValidity)
Expand All @@ -177,32 +176,36 @@ processBlock :: forall era. C.IsCardanoEra era
-> BlockchainEnv
-> [C.Tx era]
-> C.EraInMode era C.CardanoMode
-> STM (Either SyncActionFailure (Slot, BlockNumber))
-> IO (Either SyncActionFailure (Slot, BlockNumber))
processBlock instancesState header env transactions era = do
let C.BlockHeader (C.SlotNo slot) _ _ = header
STM.writeTVar (beLastSyncedBlockSlot env) (fromIntegral slot)
if null transactions
then Right <$> blockAndSlot env
else do
let tip = fromCardanoBlockHeader header
-- We ignore cardano transactions that we couldn't convert to
-- our 'ChainIndexTx'.
ciTxs = catMaybes (either (const Nothing) Just . fromCardanoTx era <$> transactions)
STM.atomically $ do
STM.writeTVar (beLastSyncedBlockSlot env) (fromIntegral slot)
if null transactions
then Right <$> blockAndSlot env
else do
let tip = fromCardanoBlockHeader header
-- We ignore cardano transactions that we couldn't convert to
-- our 'ChainIndexTx'.
ciTxs = catMaybes (either (const Nothing) Just . fromCardanoTx era <$> transactions)

instEnv <- S.instancesClientEnv instancesState
updateInstances (indexBlock ciTxs) instEnv

instEnv <- S.instancesClientEnv instancesState
updateInstances (indexBlock ciTxs) instEnv
r <- updateEmulatorTransactionState tip env (txEvent <$> ciTxs)
STM.writeTVar (beTxChanges env) FT.empty
pure r

updateTransactionState tip env (txEvent <$> ciTxs)

-- | For the given transactions, perform the updates in the 'TxIdState', and
-- also record that a new block has been processed.
:: Foldable t
=> Tip
-> BlockchainEnv
-> t (TxId, TxOutBalance, TxValidity)
-> STM (Either SyncActionFailure (Slot, BlockNumber))
env@BlockchainEnv{ beRollbackHistory
, beTxChanges
Expand Down Expand Up @@ -291,4 +294,4 @@ processMockBlock
, tipBlockNo = blockNumber

updateTransactionState tip env (txEvent <$> fmap fromOnChainTx transactions)
updateEmulatorTransactionState tip env (txEvent <$> fmap fromOnChainTx transactions)

0 comments on commit 2f04bb5

Please sign in to comment.