Skip to content

Commit

Permalink
Support resuming from db
Browse files Browse the repository at this point in the history
  • Loading branch information
sjoerdvisscher committed Sep 24, 2021
1 parent b189017 commit a9b6eb6
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 33 deletions.
51 changes: 33 additions & 18 deletions plutus-chain-index/app/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ import Plutus.ChainIndex.Compatibility (fromCardanoBlock, fromCardan
import Plutus.ChainIndex.DbStore (DbStoreEffect, checkedSqliteDb, handleDbStore)
import Plutus.ChainIndex.Effects (ChainIndexControlEffect (..), ChainIndexQueryEffect (..),
appendBlock, rollback)
import Plutus.ChainIndex.Handlers (ChainIndexState, handleControl, handleQuery)
import Plutus.ChainIndex.Handlers (ChainIndexState, getResumePoints, handleControl, handleQuery,
restoreStateFromDb)
import Plutus.Monitoring.Util (runLogEffects)

type ChainIndexEffects
Expand All @@ -66,12 +67,12 @@ runChainIndex
-> STM.TVar ChainIndexState
-> Sqlite.Connection
-> Eff ChainIndexEffects a
-> IO ()
runChainIndex trace emulatorState conn effect = do
-> IO a
runChainIndex trace mState conn effect = do
-- First run the STM block capturing all log messages emited on a
-- successful STM transaction.
oldEmulatorState <- STM.atomically $ STM.readTVar emulatorState
(result, logMessages') <-
oldEmulatorState <- STM.atomically $ STM.readTVar mState
(errOrResult, logMessages') <-
effect
& interpret handleControl
& interpret handleQuery
Expand All @@ -83,15 +84,16 @@ runChainIndex trace emulatorState conn effect = do
@(Seq (LogMessage ChainIndexLog)) $ unto pure)
& runWriter @(Seq (LogMessage ChainIndexLog))
& runM
logMessages <- case result of
(result, logMessages) <- case errOrResult of
Left err ->
pure $ LogMessage Error (Err err) <| logMessages'
Right (_, newState) -> do
STM.atomically $ STM.writeTVar emulatorState newState
pure logMessages'
pure (fail $ show err, LogMessage Error (Err err) <| logMessages')
Right (result, newState) -> do
STM.atomically $ STM.writeTVar mState newState
pure (pure result, logMessages')
-- Log all previously captured messages
traverse_ (send . LMessage) logMessages
& runLogEffects trace
result

chainSyncHandler
:: Trace IO ChainIndexLog
Expand All @@ -107,15 +109,24 @@ chainSyncHandler trace mState conn
Left err ->
logError trace (ConversionFailed err)
Right txs ->
runChainIndex trace mState conn $ appendBlock (tipFromCardanoBlock block) txs
void $ runChainIndex trace mState conn $ appendBlock (tipFromCardanoBlock block) txs
chainSyncHandler trace mState conn
(RollBackward point _) _ = do
putStr "Rolling back to "
print point
-- Do we really want to pass the tip of the new blockchain to the
-- rollback function (rather than the point where the chains diverge)?
runChainIndex trace mState conn $ rollback (fromCardanoPoint point)
-- On resume we do nothing, for now.
chainSyncHandler _ _ _ (Resume _) _ = do
pure ()
void $ runChainIndex trace mState conn $ rollback (fromCardanoPoint point)
chainSyncHandler trace mState conn
(Resume point) _ = do
putStr "Resuming from "
print point
newState <- runChainIndex trace mState conn $ restoreStateFromDb $ fromCardanoPoint point
putStrLn "New state:"
print newState
STM.atomically $ STM.writeTVar mState newState



main :: IO ()
main = do
Expand Down Expand Up @@ -150,19 +161,23 @@ main = do
putStrLn "\nChain Index config:"
print (pretty config)

appState <- STM.newTVarIO mempty

Sqlite.withConnection (Config.cicDbPath config) $ \conn -> do

Sqlite.runBeamSqliteDebug (logDebug trace . SqlLog) conn $ do
autoMigrate Sqlite.migrationBackend checkedSqliteDb

appState <- STM.newTVarIO mempty
resumePoints <- runChainIndex trace appState conn getResumePoints

putStr "\nNumber of possible resume points: "
print $ length resumePoints

putStrLn $ "Connecting to the node using socket: " <> Config.cicSocketPath config
void $ runChainSync (Config.cicSocketPath config)
nullTracer
(Config.cicSlotConfig config)
(Config.cicNetworkId config)
[]
resumePoints
(chainSyncHandler trace appState conn)

putStrLn $ "Starting webserver on port " <> show (Config.cicPort config)
Expand Down
8 changes: 4 additions & 4 deletions plutus-chain-index/src/Plutus/ChainIndex/DbStore.hs
Original file line number Diff line number Diff line change
Expand Up @@ -111,17 +111,17 @@ instance Table AddressRowT where
primaryKey (AddressRow c o) = AddressRowId c o

data UtxoRowT f = UtxoRow
{ _utxoRowBalance :: Columnar f ByteString
, _utxoRowSlot :: Columnar f ByteString
{ _utxoRowSlot :: Columnar f Word64 -- In Plutus Slot is Integer, but in the Cardano API it is Word64, so this is safe
, _utxoRowBlockId :: Columnar f ByteString
, _utxoRowBlockNumber :: Columnar f Word64
, _utxoRowBalance :: Columnar f ByteString
} deriving (Generic, Beamable)

type UtxoStateRow = UtxoRowT Identity
type UtxoRow = UtxoRowT Identity

instance Table UtxoRowT where
data PrimaryKey UtxoRowT f = UtxoRowId (Columnar f Word64) deriving (Generic, Beamable)
primaryKey = UtxoRowId . _utxoRowBlockNumber
primaryKey = UtxoRowId . _utxoRowSlot

data Db f = Db
{ datumRows :: f (TableEntity DatumRowT)
Expand Down
44 changes: 33 additions & 11 deletions plutus-chain-index/src/Plutus/ChainIndex/Handlers.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@
module Plutus.ChainIndex.Handlers
( handleQuery
, handleControl
, restoreStateFromDb
, getResumePoints
, ChainIndexState
) where

import qualified Cardano.Api as C
import Codec.Serialise (Serialise, deserialiseOrFail, serialise)
import Control.Applicative (Const (..))
import Control.Lens (Lens', _Just, ix, view, (^?))
Expand All @@ -28,14 +31,16 @@ import Data.ByteString (ByteString)
import qualified Data.ByteString.Lazy as BSL
import Data.Default (def)
import Data.Either (fromRight)
import qualified Data.FingerTree as FT
import qualified Data.Map as Map
import Data.Maybe (catMaybes, fromMaybe)
import Data.Maybe (catMaybes, fromMaybe, mapMaybe)
import Data.Monoid (Ap (..))
import Data.Proxy (Proxy (..))
import qualified Data.Set as Set
import Data.Word (Word64)
import Database.Beam (Identity, SqlSelect, TableEntity, aggregate_, all_, countAll_,
delete, filter_, limit_, nub_, select, val_)
import Database.Beam.Query (desc_, orderBy_, (==.), (>.))
import Database.Beam.Query (asc_, desc_, orderBy_, (==.), (>.))
import Database.Beam.Schema.Tables (zipTables)
import Database.Beam.Sqlite (Sqlite)
import Ledger (Address (..), ChainIndexTxOut (..), Datum, DatumHash (..),
Expand All @@ -47,7 +52,7 @@ import Plutus.ChainIndex.DbStore
import Plutus.ChainIndex.Effects (ChainIndexControlEffect (..), ChainIndexQueryEffect (..))
import Plutus.ChainIndex.Tx
import Plutus.ChainIndex.Types (BlockId (BlockId), BlockNumber (BlockNumber), Diagnostics (..),
Tip (..), pageOf)
Point (..), Tip (..), pageOf, tipAsPoint)
import Plutus.ChainIndex.UtxoState (InsertUtxoSuccess (..), RollbackResult (..), TxUtxoBalance,
UtxoIndex)
import qualified Plutus.ChainIndex.UtxoState as UtxoState
Expand All @@ -56,8 +61,25 @@ import PlutusTx.Builtins.Internal (BuiltinByteString (..))

type ChainIndexState = UtxoIndex TxUtxoBalance

getResumePoints :: Member DbStoreEffect effs => Eff effs [C.ChainPoint]
getResumePoints = do
rows <- selectList . select $ fmap (\row -> (_utxoRowSlot row, _utxoRowBlockId row)) $ orderBy_ (desc_ . _utxoRowSlot) (all_ (utxoRows db))
pure $ mapMaybe toChainPoint rows ++ [C.ChainPointAtGenesis]
where
toChainPoint :: (Word64, ByteString) -> Maybe C.ChainPoint
toChainPoint (slot, bi) = C.ChainPoint (C.SlotNo slot) <$> C.deserialiseFromRawBytes (C.AsHash (C.proxyToAsType (Proxy :: Proxy C.BlockHeader))) bi

restoreStateFromDb :: Member DbStoreEffect effs => Point -> Eff effs ChainIndexState
restoreStateFromDb point = do
rollbackUtxoDb point
rows <- selectList . select $ orderBy_ (asc_ . _utxoRowSlot) (all_ (utxoRows db))
pure $ FT.fromList $ fmap toUtxoState rows
where
toUtxoState :: UtxoRow -> UtxoState.UtxoState TxUtxoBalance
toUtxoState (UtxoRow slot bi bn balance)
= UtxoState.UtxoState (fromByteString balance) (Tip (fromIntegral slot) (BlockId bi) (BlockNumber bn))

handleQuery ::
forall effs.
( Member (State ChainIndexState) effs
, Member DbStoreEffect effs
, Member (Error ChainIndexError) effs
Expand Down Expand Up @@ -90,10 +112,10 @@ handleQuery = \case

getTip :: Member DbStoreEffect effs => Eff effs Tip
getTip = do
row <- selectOne . select $ limit_ 1 (orderBy_ (desc_ . _utxoRowBlockNumber) (all_ (utxoRows db)))
row <- selectOne . select $ limit_ 1 (orderBy_ (desc_ . _utxoRowSlot) (all_ (utxoRows db)))
pure $ case row of
Nothing -> TipAtGenesis
Just (UtxoRow _ slot bi bn) -> Tip (fromByteString slot) (BlockId bi) (BlockNumber bn)
Just (UtxoRow slot bi bn _) -> Tip (fromIntegral slot) (BlockId bi) (BlockNumber bn)

getDatumFromHash :: Member DbStoreEffect effs => DatumHash -> Eff effs (Maybe Datum)
getDatumFromHash (DatumHash (BuiltinByteString dh)) =
Expand Down Expand Up @@ -195,7 +217,7 @@ handleControl = \case
throwError reason
Right RollbackResult{newTip, rolledBackIndex} -> do
put rolledBackIndex
rollbackUtxoDb newTip
rollbackUtxoDb $ tipAsPoint newTip
logDebug $ RollbackSuccess newTip
CollectGarbage -> do
-- Rebuild the index using only transactions that still have at
Expand Down Expand Up @@ -225,11 +247,11 @@ insertUtxoDb ::
-> Eff effs ()
insertUtxoDb (UtxoState.UtxoState _ TipAtGenesis) = throwError $ InsertionFailed UtxoState.InsertUtxoNoTip
insertUtxoDb (UtxoState.UtxoState balance (Tip sl (BlockId bi) (BlockNumber bn)))
= addRows (utxoRows db) [UtxoRow (toByteString balance) (toByteString sl) bi bn]
= addRows (utxoRows db) [UtxoRow (fromIntegral sl) bi bn (toByteString balance)]

rollbackUtxoDb :: Member DbStoreEffect effs => Tip -> Eff effs ()
rollbackUtxoDb TipAtGenesis = deleteRows $ delete (utxoRows db) (const (val_ True))
rollbackUtxoDb (Tip _ _ (BlockNumber bn)) = deleteRows $ delete (utxoRows db) (\row -> _utxoRowBlockNumber row >. val_ bn)
rollbackUtxoDb :: Member DbStoreEffect effs => Point -> Eff effs ()
rollbackUtxoDb PointAtGenesis = deleteRows $ delete (utxoRows db) (const (val_ True))
rollbackUtxoDb (Point slot _) = deleteRows $ delete (utxoRows db) (\row -> _utxoRowSlot row >. val_ (fromIntegral slot))

data InsertRows te where
InsertRows :: BeamableSqlite t => [t Identity] -> InsertRows (TableEntity t)
Expand Down
1 change: 1 addition & 0 deletions plutus-chain-index/src/Plutus/ChainIndex/UtxoState.hs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ viewTip = tip . measure
rollback :: Point
-> UtxoIndex TxUtxoBalance
-> Either RollbackFailed (RollbackResult TxUtxoBalance)
rollback PointAtGenesis (viewTip -> TipAtGenesis) = Right (RollbackResult TipAtGenesis mempty)
rollback _ (viewTip -> TipAtGenesis) = Left RollbackNoTip
rollback targetPoint idx@(viewTip -> currentTip)
-- The rollback happened sometime after the current tip.
Expand Down

0 comments on commit a9b6eb6

Please sign in to comment.