Skip to content

Commit

Permalink
Insert into scripts table
Browse files Browse the repository at this point in the history
  • Loading branch information
jhbertra authored and nhenin committed Mar 28, 2024
1 parent 94cb0a4 commit 122eb9c
Show file tree
Hide file tree
Showing 14 changed files with 245 additions and 27 deletions.
Expand Up @@ -152,6 +152,16 @@ renderDatabaseSelectorOTel dbName dbUser host port = \case
CopyTxIns -> renderCopy "txIn"
CopyAssetOuts -> renderCopy "assetOut"
CopyAssetMints -> renderCopy "assetMint"
CopyScripts ->
OTelRendered
{ eventName = "INSERT INTO chain.script"
, eventKind = Internal
, renderField = \rows ->
standardAttributes
<> [ ("db.statement", "INSERT INTO chain.script VALUES (?,?,?) ON CONFLICT (id) DO NOTHING")
, ("db.rowsAffected", toAttribute rows)
]
}
where
standardAttributes =
catMaybes
Expand Down
Expand Up @@ -57,17 +57,21 @@ import Data.ByteString.Short (fromShort, toShort)
import Data.Csv (ToRecord)
import Data.Csv.Incremental (Builder, encode, encodeRecord)
import Data.Foldable (traverse_)
import Data.Function (on)
import Data.Int (Int64)
import Data.List (nubBy)
import Data.Profunctor (rmap)
import qualified Data.Set as Set
import Data.String (IsString (..))
import Data.Text (Text)
import qualified Data.Text as T
import Data.Vector (Vector)
import qualified Data.Vector as V
import Database.PostgreSQL.Simple (executeMany)
import qualified Database.PostgreSQL.Simple as PS
import Database.PostgreSQL.Simple.Copy (copy, putCopyData, putCopyEnd, putCopyError)
import qualified Database.PostgreSQL.Simple.Internal as PS
import Database.PostgreSQL.Simple.SqlQQ (sql)
import Database.PostgreSQL.Simple.Transaction (withTransactionSerializable)
import qualified Database.PostgreSQL.Simple.Types as PS
import Hasql.Connection (withLibPQConnection)
Expand Down Expand Up @@ -117,6 +121,7 @@ data QuerySelector f where
CopyTxIns :: QuerySelector Int64
CopyAssetOuts :: QuerySelector Int64
CopyAssetMints :: QuerySelector Int64
CopyScripts :: QuerySelector Int64

data QueryField
= SqlStatement ByteString
Expand Down Expand Up @@ -331,7 +336,7 @@ commitBlocks
commitBlocks runInIO = CommitBlocks \blocks -> do
liftIO $ runInIO $ logInfo $ "Saving " <> T.pack (show $ length blocks) <> " blocks"
let blockGroups = blockToRows <$> blocks
let (blockRows, txRows, txOutRows, txInRows, assetOutRows, assetMintRows) = flattenBlockGroups blockGroups
let (blockRows, txRows, txOutRows, txInRows, assetOutRows, assetMintRows, scripts) = flattenBlockGroups blockGroups
sessionConnection <- ask
liftIO $ withLibPQConnection sessionConnection \libPqConnection -> do
connectionHandle <- newMVar libPqConnection
Expand All @@ -345,26 +350,34 @@ commitBlocks runInIO = CommitBlocks \blocks -> do
runInIO $ copyTxIns connection txInRows
runInIO $ copyAssetOuts connection assetOutRows
runInIO $ copyAssetMints connection assetMintRows
runInIO $ copyScripts connection $ nubBy (on (==) scriptHash) scripts

flattenBlockGroups :: [BlockRowGroup] -> ([BlockRow], [TxRow], [TxOutRow], [TxInRow], [AssetOutRow], [AssetMintRow])
flattenBlockGroups = foldr foldBlockGroup ([], [], [], [], [], [])
flattenBlockGroups
:: [BlockRowGroup] -> ([BlockRow], [TxRow], [TxOutRow], [TxInRow], [AssetOutRow], [AssetMintRow], [ScriptRow])
flattenBlockGroups = foldr foldBlockGroup ([], [], [], [], [], [], [])
where
foldBlockGroup
:: BlockRowGroup
-> ([BlockRow], [TxRow], [TxOutRow], [TxInRow], [AssetOutRow], [AssetMintRow])
-> ([BlockRow], [TxRow], [TxOutRow], [TxInRow], [AssetOutRow], [AssetMintRow])
foldBlockGroup (blockRow, txGroups) (blockRows, txRows, txOutRows, txInRows, assetOutRows, assetMintRows) =
(blockRow : blockRows, txRows', txOutRows', txInRows', assetOutRows', assetMintRows')
-> ([BlockRow], [TxRow], [TxOutRow], [TxInRow], [AssetOutRow], [AssetMintRow], [ScriptRow])
-> ([BlockRow], [TxRow], [TxOutRow], [TxInRow], [AssetOutRow], [AssetMintRow], [ScriptRow])
foldBlockGroup (blockRow, txGroups) (blockRows, txRows, txOutRows, txInRows, assetOutRows, assetMintRows, scriptRows) =
(blockRow : blockRows, txRows', txOutRows', txInRows', assetOutRows', assetMintRows', scriptRows')
where
(txRows', txOutRows', txInRows', assetOutRows', assetMintRows') =
foldr foldTxGroup (txRows, txOutRows, txInRows, assetOutRows, assetMintRows) txGroups
(txRows', txOutRows', txInRows', assetOutRows', assetMintRows', scriptRows') =
foldr foldTxGroup (txRows, txOutRows, txInRows, assetOutRows, assetMintRows, scriptRows) txGroups

foldTxGroup
:: TxRowGroup
-> ([TxRow], [TxOutRow], [TxInRow], [AssetOutRow], [AssetMintRow])
-> ([TxRow], [TxOutRow], [TxInRow], [AssetOutRow], [AssetMintRow])
foldTxGroup (txRow, txInRows, txOutGroups, assetMintRows) (txRows, txOutRows, txInRows', assetOutRows, assetMintRows') =
(txRow : txRows, txOutRows', foldr (:) txInRows' txInRows, assetOutRows', foldr (:) assetMintRows' assetMintRows)
-> ([TxRow], [TxOutRow], [TxInRow], [AssetOutRow], [AssetMintRow], [ScriptRow])
-> ([TxRow], [TxOutRow], [TxInRow], [AssetOutRow], [AssetMintRow], [ScriptRow])
foldTxGroup (txRow, txInRows, txOutGroups, assetMintRows, scripts) (txRows, txOutRows, txInRows', assetOutRows, assetMintRows', scripts') =
( txRow : txRows
, txOutRows'
, foldr (:) txInRows' txInRows
, assetOutRows'
, foldr (:) assetMintRows' assetMintRows
, foldr (:) scripts' scripts
)
where
(txOutRows', assetOutRows') =
foldr foldTxOutGroup (txOutRows, assetOutRows) txOutGroups
Expand Down Expand Up @@ -414,6 +427,18 @@ copyAssetMints conn =
copyBuilder CopyAssetMints conn "assetMint (txId, slotNo, policyId, name, quantity)"
. foldMap encodeRecord

copyScripts
:: (MonadInjectEvent r QuerySelector s m, MonadUnliftIO m)
=> PS.Connection
-> [ScriptRow]
-> m ()
copyScripts conn rows = do
let query = [sql| INSERT INTO chain.script VALUES (?,?,?) ON CONFLICT (id) DO NOTHING |]
withEvent CopyScripts \ev -> do
count <- liftIO $ executeMany conn query rows
addField ev count
pure ()

copyBuilder
:: ( MonadInjectEvent r QuerySelector s m
, MonadUnliftIO m
Expand Down
2 changes: 1 addition & 1 deletion marlowe-chain-sync/deploy/scripts.sql
Expand Up @@ -12,7 +12,7 @@ TRUNCATE chain.txOut CASCADE;
TRUNCATE chain.assetOut;
TRUNCATE chain.assetMint;

CREATE TYPE chain.SCRIPTLANG as ENUM ('SimpleScript', 'PlutusV1', 'PlutusV2', 'PlutusV3');
CREATE TYPE chain.SCRIPTLANG as ENUM ('MultiSig', 'Timelock', 'PlutusV1', 'PlutusV2', 'PlutusV3');

CREATE TABLE chain.script
( id BYTEA PRIMARY KEY
Expand Down
26 changes: 22 additions & 4 deletions marlowe-chain-sync/marlowe-chain-copy/Main.hs
Expand Up @@ -30,7 +30,7 @@ import Cardano.Api (
getBlockHeader,
)
import Cardano.Api.ChainSync.Client (ClientStIdle (..), ClientStNext (..))
import Control.Monad (join, when)
import Control.Monad (guard, join, unless, when)
import Data.ByteString.Lazy (toChunks)
import Data.Csv (ToRecord)
import Data.Csv.Incremental (encode, encodeRecord)
Expand All @@ -39,7 +39,7 @@ import Data.Functor (void)
import Data.Int (Int64)
import Data.String (IsString (..))
import Data.Version (showVersion)
import Database.PostgreSQL.Simple (Connection, Query, close, connectPostgreSQL, execute_)
import Database.PostgreSQL.Simple (Connection, Query, close, connectPostgreSQL, executeMany, execute_)
import Database.PostgreSQL.Simple.Copy (copy_, putCopyData, putCopyEnd, putCopyError)
import Database.PostgreSQL.Simple.SqlQQ (sql)
import Language.Marlowe.Runtime.ChainSync.Database.PostgreSQL.Cardano (blockToRows)
Expand Down Expand Up @@ -67,6 +67,7 @@ import UnliftIO (
atomically,
bracket,
finally,
flushTBQueue,
mask,
newTBQueueIO,
onException,
Expand Down Expand Up @@ -104,6 +105,7 @@ main = do
txInRowQueue <- newTBQueueIO maxTxInsInQueue
assetOutRowQueue <- newTBQueueIO maxAssetOutsInQueue
assetMintRowQueue <- newTBQueueIO maxAssetMintsInQueue
scriptQueue <- newTBQueueIO maxBlocksInQueue
bracket (truncateTablesAndDisableIndexes databaseUri) enableIndexes \_ -> runConcurrently do
Concurrently $
runBlockProcessor
Expand All @@ -114,6 +116,7 @@ main = do
txInRowQueue
assetOutRowQueue
assetMintRowQueue
scriptQueue
Concurrently $ runCopy databaseUri "block (id, slotNo, blockNo)" blockRowQueue
Concurrently $
runCopy databaseUri "tx (blockId, id, slotNo, validityLowerBound, validityUpperBound, metadata, isValid)" txRowQueue
Expand All @@ -126,6 +129,7 @@ main = do
runCopy databaseUri "txIn (txOutId, txOutIx, txInId, slotNo, redeemerDatumBytes, isCollateral)" txInRowQueue
Concurrently $ runCopy databaseUri "assetOut (txOutId, txOutIx, slotNo, policyId, name, quantity)" assetOutRowQueue
Concurrently $ runCopy databaseUri "assetMint (txId, slotNo, policyId, name, quantity)" assetMintRowQueue
Concurrently $ runInsertScripts databaseUri scriptQueue
Concurrently $
runChainSync
blockQueue
Expand All @@ -144,8 +148,9 @@ runBlockProcessor
-> TBQueueMaybe TxInRow
-> TBQueueMaybe AssetOutRow
-> TBQueueMaybe AssetMintRow
-> TBQueueMaybe ScriptRow
-> IO ()
runBlockProcessor blockQueue blockRowQueue txRowQueue txOutRowQueue txInRowQueue assetOutRowQueue assetMintRowQueue = go
runBlockProcessor blockQueue blockRowQueue txRowQueue txOutRowQueue txInRowQueue assetOutRowQueue assetMintRowQueue scriptQueue = go
where
go = join $ atomically do
mBlock <- readTBQueue blockQueue
Expand All @@ -161,13 +166,14 @@ runBlockProcessor blockQueue blockRowQueue txRowQueue txOutRowQueue txInRowQueue
Just block -> do
let (blockRow, txRows) = blockToRows block
writeTBQueue blockRowQueue $ Just blockRow
for_ txRows \(txRow, txInRows, txOutRows, txMintRows) -> do
for_ txRows \(txRow, txInRows, txOutRows, txMintRows, scriptRows) -> do
writeTBQueue txRowQueue $ Just txRow
traverse_ (writeTBQueue txInRowQueue . Just) txInRows
for_ txOutRows \(txOutRow, assetOutRows) -> do
writeTBQueue txOutRowQueue $ Just txOutRow
traverse_ (writeTBQueue assetOutRowQueue . Just) assetOutRows
traverse_ (writeTBQueue assetMintRowQueue . Just) txMintRows
traverse_ (writeTBQueue scriptQueue . Just) scriptRows
pure go

type TBQueueMaybe a = TBQueue (Maybe a)
Expand Down Expand Up @@ -277,5 +283,17 @@ runCopy dbUri table rowQueue = withConnection dbUri \conn -> mask \restore -> do
Right _ -> do
putCopyEnd conn

runInsertScripts :: String -> TBQueueMaybe ScriptRow -> IO ()
runInsertScripts dbUri rowQueue = withConnection dbUri \conn -> do
let go = do
rows <- atomically do
rows <- flushTBQueue rowQueue
guard $ not $ null rows
pure rows
let (rows', reachedEnd) = foldr (\r (acc, end) -> maybe (acc, True) ((,end) . (: acc)) r) ([], False) rows
_ <- executeMany conn [sql| INSERT INTO chain.script VALUES (?,?,?) ON CONFLICT (id) DO NOTHING |] rows'
unless reachedEnd go
go

withConnection :: (MonadUnliftIO m) => String -> (Connection -> m a) -> m a
withConnection uri = bracket (liftIO $ connectPostgreSQL $ fromString uri) (liftIO . close)
1 change: 1 addition & 0 deletions marlowe-chain-sync/marlowe-chain-sync.cabal
Expand Up @@ -100,6 +100,7 @@ library
, ouroboros-consensus-cardano
, plutus-core ^>=1.21
, plutus-ledger-api ^>=1.21
, postgresql-simple
, scientific
, serialise ^>=0.2.6
, sop-core
Expand Down
Expand Up @@ -6,19 +6,24 @@
module Language.Marlowe.Runtime.ChainSync.Database.PostgreSQL.Allegra where

import Cardano.Ledger.Allegra
import Cardano.Ledger.Allegra.Scripts (Timelock)
import Cardano.Ledger.Allegra.TxAuxData (AllegraTxAuxData (..))
import Cardano.Ledger.Allegra.TxBody (AllegraTxBody (..), ValidityInterval (..))
import Cardano.Ledger.BaseTypes (shelleyProtVer)
import Cardano.Ledger.Binary (serialize')
import Cardano.Ledger.Core (TxAuxData)
import Cardano.Ledger.Crypto
import Cardano.Ledger.Shelley.API
import Cardano.Ledger.Shelley.TxWits (ShelleyTxWits (..))
import Data.ByteString (ByteString)
import Data.Foldable (Foldable (..))
import Data.Int
import qualified Data.Map as Map
import qualified Data.Set as Set
import Language.Marlowe.Runtime.ChainSync.Database.PostgreSQL.Shelley (
hashToBytea,
mapStrictMaybe,
originalBytea,
shelleyTxInRow,
shelleyTxOutRow,
)
Expand All @@ -30,11 +35,23 @@ allegraTxToRows slotNo blockHash txId ShelleyTx{..} =
, shelleyTxInRow slotNo txId <$> Set.toAscList (atbInputs body)
, zipWith (allegraTxOutRow slotNo txId) [0 ..] $ toList $ atbOutputs body
, []
, allegraTxScripts wits
)

encodeAllegraMetadata :: AllegraTxAuxData (AllegraEra StandardCrypto) -> ByteString
encodeAllegraMetadata (AllegraTxAuxData md _) = serialize' shelleyProtVer md

allegraTxScripts :: ShelleyTxWits (AllegraEra StandardCrypto) -> [ScriptRow]
allegraTxScripts ShelleyTxWits{..} = uncurry allegraScriptRow <$> Map.toList scriptWits

allegraScriptRow :: ScriptHash StandardCrypto -> Timelock (AllegraEra StandardCrypto) -> ScriptRow
allegraScriptRow (ScriptHash hash) script =
ScriptRow
{ scriptHash = hashToBytea hash
, scriptBytes = originalBytea script
, scriptLanguage = Timelock
}

allegraTxRow
:: (TxAuxData era -> ByteString)
-> Int64
Expand Down
Expand Up @@ -23,6 +23,9 @@ import Cardano.Ledger.Alonzo.Scripts (
AsItem (..),
)
import Cardano.Ledger.Alonzo.Tx (AlonzoTx (..), IsValid (..), indexRedeemers, txdats')
import Cardano.Ledger.Alonzo
import Cardano.Ledger.Alonzo.Scripts (AlonzoScript (..))
import Cardano.Ledger.Alonzo.Tx (AlonzoTx (..), IsValid (..), ScriptPurpose (Spending), indexedRdmrs, txdats')
import Cardano.Ledger.Alonzo.TxAuxData (AlonzoTxAuxData (..))
import Cardano.Ledger.Alonzo.TxBody (AlonzoEraTxBody, AlonzoTxBody (..), AlonzoTxOut (..))
import Cardano.Ledger.Alonzo.TxWits (AlonzoEraTxWits, TxDats)
Expand All @@ -43,6 +46,7 @@ import qualified Data.Map as Map
import qualified Data.Set as Set
import Language.Marlowe.Runtime.ChainSync.Database.PostgreSQL.Mary (maryAssetMintRows, maryTxOutRow, maryTxRow)
import Language.Marlowe.Runtime.ChainSync.Database.PostgreSQL.Shelley (
hashToBytea,
mapStrictMaybe,
originalBytea,
shelleyTxInRow,
Expand All @@ -64,11 +68,25 @@ alonzoTxToRows slotNo blockHash txId tx@AlonzoTx{..} =
, alonzoTxInRows slotNo txId isValid tx (atbInputs body) (atbCollateral body)
, zipWith (alonzoTxOutRow slotNo txId $ txdats' wits) [0 ..] $ toList $ atbOutputs body
, maryAssetMintRows slotNo txId $ atbMint body
, alonzoTxScripts wits
)

encodeAlonzoMetadata :: AlonzoTxAuxData (AlonzoEra StandardCrypto) -> ByteString
encodeAlonzoMetadata (AlonzoTxAuxData md _ _) = L.serialize' shelleyProtVer md

alonzoTxScripts :: Alonzo.AlonzoTxWits (AlonzoEra StandardCrypto) -> [ScriptRow]
alonzoTxScripts Alonzo.AlonzoTxWits{..} = uncurry alonzoScriptRow <$> Map.toList txscripts

alonzoScriptRow :: ScriptHash StandardCrypto -> AlonzoScript (AlonzoEra StandardCrypto) -> ScriptRow
alonzoScriptRow (ScriptHash hash) script =
ScriptRow
{ scriptHash = hashToBytea hash
, scriptBytes = originalBytea script
, scriptLanguage = case script of
TimelockScript _ -> Timelock
PlutusScript _ -> PlutusV1
}

alonzoTxRow
:: (TxAuxData era -> ByteString)
-> Int64
Expand Down
Expand Up @@ -6,24 +6,31 @@
module Language.Marlowe.Runtime.ChainSync.Database.PostgreSQL.Babbage where

import Cardano.Binary (serialize')
import Cardano.Ledger.Alonzo.Tx (AlonzoTx (..), txdats')
import Cardano.Ledger.Alonzo.Tx (AlonzoTx (..))
import Cardano.Ledger.Alonzo.TxAuxData (AlonzoTxAuxData (..))
import Cardano.Ledger.Alonzo.TxWits (TxDats, unTxDats)
import Cardano.Ledger.Babbage (BabbageEra, BabbageTxOut)
import Cardano.Ledger.Alonzo.TxWits (AlonzoTxWits (..), TxDats, unTxDats)
import Cardano.Ledger.Babbage (AlonzoScript, BabbageEra, BabbageTxOut)
import Cardano.Ledger.Babbage.Core (EraScript (..))
import Cardano.Ledger.Babbage.Scripts (AlonzoScript (..))
import Cardano.Ledger.Babbage.Tx (IsValid (..))
import Cardano.Ledger.Babbage.TxBody (BabbageTxBody (..), BabbageTxOut (..))
import Cardano.Ledger.Binary (Sized (..), shelleyProtVer)
import qualified Cardano.Ledger.Binary as L
import Cardano.Ledger.Crypto
import Cardano.Ledger.Plutus.Data (Datum (..), binaryDataToData, hashBinaryData)
import Cardano.Ledger.Shelley.API (ShelleyTxOut (..), StrictMaybe (..))
import Cardano.Ledger.Plutus.Data (binaryDataToData, hashBinaryData)
import Cardano.Ledger.Plutus.Language (Plutus (..))
import qualified Cardano.Ledger.Plutus.Language as P
import Cardano.Ledger.Shelley.API (ScriptHash (..), ShelleyTxOut (..), StrictMaybe (..))
import Control.Arrow (Arrow (..))
import Data.ByteString (ByteString)
import Data.Foldable (Foldable (..))
import Data.Int
import qualified Data.Map as Map
import Language.Marlowe.Runtime.ChainSync.Database.PostgreSQL.Alonzo (alonzoTxInRows, alonzoTxRow)
import Language.Marlowe.Runtime.ChainSync.Database.PostgreSQL.Mary (maryAssetMintRows, maryTxOutRow)
import Language.Marlowe.Runtime.ChainSync.Database.PostgreSQL.Shelley (originalBytea)
import Language.Marlowe.Runtime.ChainSync.Database.PostgreSQL.Shelley (hashToBytea, originalBytea)
import Language.Marlowe.Runtime.ChainSync.Database.PostgreSQL.Types

babbageTxToRows :: Int64 -> Bytea -> Bytea -> AlonzoTx (BabbageEra StandardCrypto) -> TxRowGroup
Expand All @@ -32,11 +39,35 @@ babbageTxToRows slotNo blockHash txId tx@AlonzoTx{..} =
, alonzoTxInRows slotNo txId isValid tx (btbInputs body) (btbCollateral body)
, babbageTxOutRows slotNo txId isValid (txdats' wits) (btbCollateralReturn body) $ toList $ btbOutputs body
, maryAssetMintRows slotNo txId $ btbMint body
, babbageTxScripts wits $ toList $ btbOutputs body
)

encodeBabbageMetadata :: AlonzoTxAuxData (BabbageEra StandardCrypto) -> ByteString
encodeBabbageMetadata (AlonzoTxAuxData md _ _) = L.serialize' shelleyProtVer md

babbageTxScripts
:: AlonzoTxWits (BabbageEra StandardCrypto)
-> [Sized (BabbageTxOut (BabbageEra StandardCrypto))]
-> [ScriptRow]
babbageTxScripts AlonzoTxWits{..} outputs =
uncurry babbageScriptRow <$> (Map.toList txscripts <> foldMap babbageReferenceScript outputs)

babbageReferenceScript
:: Sized (BabbageTxOut (BabbageEra StandardCrypto))
-> [(ScriptHash StandardCrypto, AlonzoScript (BabbageEra StandardCrypto))]
babbageReferenceScript (Sized (BabbageTxOut _ _ _ ref) _) = foldMap (pure . (hashScript &&& id)) ref

babbageScriptRow :: ScriptHash StandardCrypto -> AlonzoScript (BabbageEra StandardCrypto) -> ScriptRow
babbageScriptRow (ScriptHash hash) script =
ScriptRow
{ scriptHash = hashToBytea hash
, scriptBytes = originalBytea script
, scriptLanguage = case script of
TimelockScript _ -> Timelock
PlutusScript (Plutus P.PlutusV1 _) -> PlutusV1
PlutusScript _ -> PlutusV2
}

babbageTxOutRows
:: Int64
-> Bytea
Expand Down

0 comments on commit 122eb9c

Please sign in to comment.