Skip to content

Commit

Permalink
Populate the tx_out.consumed_by_tx_in_id field
Browse files Browse the repository at this point in the history
  • Loading branch information
kderme committed May 30, 2023
1 parent dc40f63 commit d5b762a
Show file tree
Hide file tree
Showing 9 changed files with 132 additions and 50 deletions.
35 changes: 14 additions & 21 deletions cardano-db-sync/src/Cardano/DbSync/Era/Byron/Insert.hs
Expand Up @@ -216,10 +216,10 @@ insertTx tracer blkId tx blockIndex = do
, DB.txScriptSize = 0
}

-- Insert outputs for a transaction before inputs in case the inputs for this transaction
-- references the output (not sure this can even happen).
lift $ zipWithM_ (insertTxOut tracer txId) [0 ..] (toList . Byron.txOutputs $ Byron.taTx tx)
mapMVExceptT (insertTxIn tracer txId) resolvedInputs
txInIds <- mapM (insertTxIn tracer txId) resolvedInputs
when False $
lift $ DB.updateListTxOutConsumedByTxInId $ zip (thrd3 <$> resolvedInputs) txInIds
where
annotateTx :: SyncNodeError -> SyncNodeError
annotateTx ee =
Expand Down Expand Up @@ -254,34 +254,34 @@ insertTxIn ::
(MonadBaseControl IO m, MonadIO m) =>
Trace IO Text ->
DB.TxId ->
(Byron.TxIn, DB.TxId, DbLovelace) ->
ExceptT SyncNodeError (ReaderT SqlBackend m) ()
insertTxIn _tracer txInId (Byron.TxInUtxo _txHash inIndex, txOutId, _lovelace) = do
void . lift . DB.insertTxIn $
(Byron.TxIn, DB.TxId, DB.TxOutId, DbLovelace) ->
ExceptT SyncNodeError (ReaderT SqlBackend m) DB.TxInId
insertTxIn _tracer txInId (Byron.TxInUtxo _txHash inIndex, txOutTxId, _, _) = do
lift . DB.insertTxIn $
DB.TxIn
{ DB.txInTxInId = txInId
, DB.txInTxOutId = txOutId
, DB.txInTxOutId = txOutTxId
, DB.txInTxOutIndex = fromIntegral inIndex
, DB.txInRedeemerId = Nothing
}

-- -----------------------------------------------------------------------------

resolveTxInputs :: MonadIO m => Byron.TxIn -> ExceptT SyncNodeError (ReaderT SqlBackend m) (Byron.TxIn, DB.TxId, DbLovelace)
resolveTxInputs :: MonadIO m => Byron.TxIn -> ExceptT SyncNodeError (ReaderT SqlBackend m) (Byron.TxIn, DB.TxId, DB.TxOutId, DbLovelace)
resolveTxInputs txIn@(Byron.TxInUtxo txHash index) = do
res <- liftLookupFail "resolveInput" $ DB.queryTxOutValue (Byron.unTxHash txHash, fromIntegral index)
res <- liftLookupFail "resolveInput" $ DB.queryTxOutValue2 (Byron.unTxHash txHash, fromIntegral index)
pure $ convert res
where
convert :: (DB.TxId, DbLovelace) -> (Byron.TxIn, DB.TxId, DbLovelace)
convert (txId, lovelace) = (txIn, txId, lovelace)
convert :: (DB.TxId, DB.TxOutId, DbLovelace) -> (Byron.TxIn, DB.TxId, DB.TxOutId, DbLovelace)
convert (txId, txOutId, lovelace) = (txIn, txId, txOutId, lovelace)

calculateTxFee :: Byron.Tx -> [(Byron.TxIn, DB.TxId, DbLovelace)] -> Either SyncNodeError ValueFee
calculateTxFee :: Byron.Tx -> [(Byron.TxIn, DB.TxId, DB.TxOutId, DbLovelace)] -> Either SyncNodeError ValueFee
calculateTxFee tx resolvedInputs = do
outval <- first (\e -> NEError $ "calculateTxFee: " <> textShow e) output
when (null resolvedInputs) $
Left $
NEError "calculateTxFee: List of transaction inputs is zero."
let inval = sum $ map (unDbLovelace . thrd3) resolvedInputs
let inval = sum $ map (unDbLovelace . forth4) resolvedInputs
if inval < outval
then Left $ NEInvariant "calculateTxFee" $ EInvInOut inval outval
else Right $ ValueFee (DbLovelace outval) (DbLovelace $ inval - outval)
Expand All @@ -290,10 +290,3 @@ calculateTxFee tx resolvedInputs = do
output =
Byron.unsafeGetLovelace
<$> Byron.sumLovelace (map Byron.txOutValue $ Byron.txOutputs tx)

-- | An 'ExceptT' version of 'mapM_' which will 'left' the first 'Left' it finds.
mapMVExceptT :: Monad m => (a -> ExceptT e m ()) -> [a] -> ExceptT e m ()
mapMVExceptT action xs =
case xs of
[] -> pure ()
(y : ys) -> action y >> mapMVExceptT action ys
23 changes: 14 additions & 9 deletions cardano-db-sync/src/Cardano/DbSync/Era/Shelley/Insert.hs
Expand Up @@ -227,7 +227,7 @@ insertTx tracer cache iopts network isMember blkId epochNo slotNo blockIndex tx
let !outSum = fromIntegral $ unCoin $ Generic.txOutSum tx
!withdrawalSum = fromIntegral $ unCoin $ Generic.txWithdrawalSum tx
!resolvedInputs <- mapM (resolveTxInputs (fst <$> groupedTxOut grouped)) (Generic.txInputs tx)
let !inSum = sum $ map (unDbLovelace . thrd3) resolvedInputs
let !inSum = sum $ map (unDbLovelace . forth4) resolvedInputs
let diffSum = if inSum >= outSum then inSum - outSum else 0
let !fees = maybe diffSum (fromIntegral . unCoin) (Generic.txFees tx)
let !txHash = Generic.txHash tx
Expand Down Expand Up @@ -379,15 +379,20 @@ insertCollateralTxOut tracer cache iopts (txId, _txHash) (Generic.TxOut index ad
prepareTxIn ::
DB.TxId ->
Map Word64 DB.RedeemerId ->
(Generic.TxIn, DB.TxId, DbLovelace) ->
DB.TxIn
prepareTxIn txInId redeemers (txIn, txOutId, _lovelace) =
DB.TxIn
{ DB.txInTxInId = txInId
, DB.txInTxOutId = txOutId
, DB.txInTxOutIndex = fromIntegral $ Generic.txInIndex txIn
, DB.txInRedeemerId = mlookup (Generic.txInRedeemerIndex txIn) redeemers
(Generic.TxIn, DB.TxId, Either Generic.TxIn DB.TxOutId, DbLovelace) ->
ExtendedTxIn
prepareTxIn txInId redeemers (txIn, txOutId, mTxOutId, _lovelace) =
ExtendedTxIn
{ etiTxIn = txInDB
, etiTxOutId = mTxOutId
}
where
txInDB = DB.TxIn
{ DB.txInTxInId = txInId
, DB.txInTxOutId = txOutId
, DB.txInTxOutIndex = fromIntegral $ Generic.txInIndex txIn
, DB.txInRedeemerId = mlookup (Generic.txInRedeemerIndex txIn) redeemers
}

insertCollateralTxIn ::
(MonadBaseControl IO m, MonadIO m) =>
Expand Down
78 changes: 61 additions & 17 deletions cardano-db-sync/src/Cardano/DbSync/Era/Shelley/Insert/Grouped.hs
Expand Up @@ -5,14 +5,15 @@
module Cardano.DbSync.Era.Shelley.Insert.Grouped (
BlockGroupedData (..),
MissingMaTxOut (..),
ExtendedTxIn (..),
ExtendedTxOut (..),
insertBlockGroupedData,
insertReverseIndex,
resolveTxInputs,
resolveScriptHash,
) where

import Cardano.BM.Trace (Trace)
import Cardano.BM.Trace (Trace, logWarning)
import Cardano.Db (DbLovelace (..), minIdsToText, textShow)
import qualified Cardano.Db as DB
import qualified Cardano.DbSync.Era.Shelley.Generic as Generic
Expand All @@ -23,6 +24,7 @@ import Cardano.Prelude
import Control.Monad.Trans.Control (MonadBaseControl)
import qualified Data.List as List
import Database.Persist.Sql (SqlBackend)
import qualified Data.Text as Text

-- | Group data within the same block, to insert them together in batches
--
Expand All @@ -36,7 +38,7 @@ import Database.Persist.Sql (SqlBackend)
-- other table references it in the future it has to be added here and delay its
-- insertion.
data BlockGroupedData = BlockGroupedData
{ groupedTxIn :: ![DB.TxIn]
{ groupedTxIn :: ![ExtendedTxIn]
, groupedTxOut :: ![(ExtendedTxOut, [MissingMaTxOut])]
, groupedTxMetadata :: ![DB.TxMetadata]
, groupedTxMint :: ![DB.MaTxMint]
Expand All @@ -56,6 +58,11 @@ data ExtendedTxOut = ExtendedTxOut
, etoTxOut :: !DB.TxOut
}

data ExtendedTxIn = ExtendedTxIn
{ etiTxIn :: !DB.TxIn
, etiTxOutId :: !(Either Generic.TxIn DB.TxOutId)
} deriving Show

instance Monoid BlockGroupedData where
mempty = BlockGroupedData [] [] [] []

Expand All @@ -72,14 +79,18 @@ insertBlockGroupedData ::
Trace IO Text ->
BlockGroupedData ->
ExceptT SyncNodeError (ReaderT SqlBackend m) DB.MinIds
insertBlockGroupedData _tracer grouped = do
insertBlockGroupedData tracer grouped = do
txOutIds <- lift . DB.insertManyTxOutPlex False $ etoTxOut . fst <$> groupedTxOut grouped
let maTxOuts = concatMap mkmaTxOuts $ zip txOutIds (snd <$> groupedTxOut grouped)
maTxOutIds <- lift $ DB.insertManyMaTxOut maTxOuts
txInId <- lift . DB.insertManyTxIn $ groupedTxIn grouped
txInIds <- lift . DB.insertManyTxIn $ etiTxIn <$> groupedTxIn grouped
when False $ do
etis <- resolveRemainingInputs (groupedTxIn grouped) $ zip txOutIds (fst <$> groupedTxOut grouped)
updateTuples <- lift $ mapM (prepareUpdates tracer) (zip txInIds etis)
lift $ DB.updateListTxOutConsumedByTxInId $ catMaybes updateTuples
void . lift . DB.insertManyTxMetadata $ groupedTxMetadata grouped
void . lift . DB.insertManyTxMint $ groupedTxMint grouped
pure $ DB.MinIds (minimumMaybe txInId) (minimumMaybe txOutIds) (minimumMaybe maTxOutIds)
pure $ DB.MinIds (minimumMaybe txInIds) (minimumMaybe txOutIds) (minimumMaybe maTxOutIds)
where
mkmaTxOuts :: (DB.TxOutId, [MissingMaTxOut]) -> [DB.MaTxOut]
mkmaTxOuts (txOutId, mmtos) = mkmaTxOut txOutId <$> mmtos
Expand All @@ -92,6 +103,17 @@ insertBlockGroupedData _tracer grouped = do
, DB.maTxOutTxOutId = txOutId
}

prepareUpdates ::
(MonadBaseControl IO m, MonadIO m) =>
Trace IO Text ->
(DB.TxInId, ExtendedTxIn) ->
m (Maybe (DB.TxOutId, DB.TxInId))
prepareUpdates trce (txInId, eti) = case etiTxOutId eti of
Right txOutId -> pure $ Just (txOutId, txInId)
Left _ -> do
liftIO $ logWarning trce $ "Failed to find output for " <> Text.pack (show eti)
pure Nothing

insertReverseIndex ::
(MonadBaseControl IO m, MonadIO m) =>
DB.BlockId ->
Expand All @@ -106,23 +128,45 @@ insertReverseIndex blockId minIds =

-- | If we can't resolve from the db, we fall back to the provided outputs
-- This happens the input consumes an output introduced in the same block.
-- In this case we also cannot find yet the 'TxOutId', so we return 'Nothing' for now
resolveTxInputs ::
MonadIO m =>
[ExtendedTxOut] ->
Generic.TxIn ->
ExceptT SyncNodeError (ReaderT SqlBackend m) (Generic.TxIn, DB.TxId, DbLovelace)
ExceptT SyncNodeError (ReaderT SqlBackend m) (Generic.TxIn, DB.TxId, Either Generic.TxIn DB.TxOutId, DbLovelace)
resolveTxInputs groupedOutputs txIn =
fmap convert $ liftLookupFail ("resolveTxInputs " <> textShow txIn <> " ") $ do
qres <- queryResolveInput txIn
liftLookupFail ("resolveTxInputs " <> textShow txIn <> " ") $ do
qres <-
if True then
fmap convertnotFound <$> queryResolveInput txIn
else
fmap convertFound <$> queryResolveInput2 txIn
case qres of
Right ret -> pure $ Right ret
Left err ->
case resolveInMemory txIn groupedOutputs of
Nothing -> pure $ Left err
Just eutxo -> pure $ Right (DB.txOutTxId (etoTxOut eutxo), DB.txOutValue (etoTxOut eutxo))
Just eutxo -> pure $ Right $ convertnotFound (DB.txOutTxId (etoTxOut eutxo), DB.txOutValue (etoTxOut eutxo))
where
convertFound :: (DB.TxId, DB.TxOutId, DbLovelace) -> (Generic.TxIn, DB.TxId, Either Generic.TxIn DB.TxOutId, DbLovelace)
convertFound (txId, txOutId, lovelace) = (txIn, txId, Right txOutId, lovelace)

convertnotFound :: (DB.TxId, DbLovelace) -> (Generic.TxIn, DB.TxId, Either Generic.TxIn DB.TxOutId, DbLovelace)
convertnotFound (txId, lovelace) = (txIn, txId, Left txIn, lovelace)

resolveRemainingInputs ::
MonadIO m =>
[ExtendedTxIn] ->
[(DB.TxOutId, ExtendedTxOut)] ->
ExceptT SyncNodeError (ReaderT SqlBackend m) [ExtendedTxIn]
resolveRemainingInputs etis mp =
mapM f etis
where
convert :: (DB.TxId, DbLovelace) -> (Generic.TxIn, DB.TxId, DbLovelace)
convert (txId, lovelace) = (txIn, txId, lovelace)
f eti = case etiTxOutId eti of
Right _ -> pure eti
Left txIn | Just txOutId <- fst <$> find (matches txIn . snd) mp ->
pure eti {etiTxOutId = Right txOutId}
_ -> pure eti

resolveScriptHash ::
(MonadBaseControl IO m, MonadIO m) =>
Expand All @@ -141,12 +185,12 @@ resolveScriptHash groupedOutputs txIn =

resolveInMemory :: Generic.TxIn -> [ExtendedTxOut] -> Maybe ExtendedTxOut
resolveInMemory txIn =
List.find matches
where
matches :: ExtendedTxOut -> Bool
matches eutxo =
Generic.txInHash txIn == etoTxHash eutxo
&& Generic.txInIndex txIn == DB.txOutIndex (etoTxOut eutxo)
List.find (matches txIn)

matches :: Generic.TxIn -> ExtendedTxOut -> Bool
matches txIn eutxo =
Generic.txInHash txIn == etoTxHash eutxo
&& Generic.txInIndex txIn == DB.txOutIndex (etoTxOut eutxo)

minimumMaybe :: (Ord a, Foldable f) => f a -> Maybe a
minimumMaybe xs
Expand Down
5 changes: 5 additions & 0 deletions cardano-db-sync/src/Cardano/DbSync/Era/Shelley/Query.hs
Expand Up @@ -10,6 +10,7 @@ module Cardano.DbSync.Era.Shelley.Query (
queryStakeAddress,
queryStakeRefPtr,
queryResolveInput,
queryResolveInput2,
queryResolveInputCredentials,
queryPoolUpdateByBlock,
) where
Expand Down Expand Up @@ -65,6 +66,10 @@ queryResolveInput :: MonadIO m => Generic.TxIn -> ReaderT SqlBackend m (Either L
queryResolveInput txIn =
queryTxOutValue (Generic.txInHash txIn, fromIntegral (Generic.txInIndex txIn))

queryResolveInput2 :: MonadIO m => Generic.TxIn -> ReaderT SqlBackend m (Either LookupFail (TxId, TxOutId, DbLovelace))
queryResolveInput2 txIn =
queryTxOutValue2 (Generic.txInHash txIn, fromIntegral (Generic.txInIndex txIn))

queryResolveInputCredentials :: MonadIO m => Generic.TxIn -> ReaderT SqlBackend m (Either LookupFail (Maybe ByteString, Bool))
queryResolveInputCredentials txIn = do
queryTxOutCredentials (Generic.txInHash txIn, fromIntegral (Generic.txInIndex txIn))
Expand Down
8 changes: 6 additions & 2 deletions cardano-db-sync/src/Cardano/DbSync/Util.hs
Expand Up @@ -26,6 +26,7 @@ module Cardano.DbSync.Util (
textPrettyShow,
textShow,
thrd3,
forth4,
traverseMEither,
whenStrictJust,
whenMaybe,
Expand Down Expand Up @@ -208,8 +209,11 @@ whenMaybe :: Monad m => Maybe a -> (a -> m b) -> m (Maybe b)
whenMaybe (Just a) f = Just <$> f a
whenMaybe Nothing _f = pure Nothing

thrd3 :: (a, b, c) -> c
thrd3 (_, _, c) = c
thrd3 :: (a, b, c, d) -> c
thrd3 (_, _, c, _) = c

forth4 :: (a, b, c, d) -> d
forth4 (_, _, _, d) = d

mlookup :: Ord k => Maybe k -> Map k a -> Maybe a
mlookup mKey mp = (`Map.lookup` mp) =<< mKey
Expand Down
Expand Up @@ -7,10 +7,19 @@ import Cardano.Db.Migration.Extra.CosnumedTxOut.Schema
import Control.Monad.IO.Class (MonadIO)
import Control.Monad.Trans.Control (MonadBaseControl)
import Control.Monad.Trans.Reader (ReaderT)
import Database.Persist ((=.))
import Database.Persist.Sql (SqlBackend)
import Database.Persist.Class (update)

insertTxOutExtra :: (MonadBaseControl IO m, MonadIO m) => TxOut -> ReaderT SqlBackend m TxOutId
insertTxOutExtra = insertUnchecked "TxOutExtra"

insertManyTxOutExtra :: (MonadBaseControl IO m, MonadIO m) => [TxOut] -> ReaderT SqlBackend m [TxOutId]
insertManyTxOutExtra = insertMany' "TxOut"

updateListTxOutConsumedByTxInId :: MonadIO m => [(TxOutId, TxInId)] -> ReaderT SqlBackend m ()
updateListTxOutConsumedByTxInId = mapM_ (uncurry updateTxOutConsumedByTxInId)

updateTxOutConsumedByTxInId :: MonadIO m => TxOutId -> TxInId -> ReaderT SqlBackend m ()
updateTxOutConsumedByTxInId txOutId txInId =
update txOutId [TxOutConsumedByTxInId =. Just txInId]
9 changes: 8 additions & 1 deletion cardano-db/src/Cardano/Db/Multiplex.hs
Expand Up @@ -6,6 +6,7 @@
module Cardano.Db.Multiplex (
insertTxOutPlex,
insertManyTxOutPlex,
updateListTxOutConsumedByTxInId,
) where

import Cardano.Db.Insert
Expand Down Expand Up @@ -58,4 +59,10 @@ toExtraTxOut txOut =
, ExtraCons.txOutInlineDatumId = changeKey <$> txOutInlineDatumId txOut
, ExtraCons.txOutReferenceScriptId = changeKey <$> txOutReferenceScriptId txOut
, ExtraCons.txOutConsumedByTxInId = Nothing
}
}

updateListTxOutConsumedByTxInId :: MonadIO m => [(TxOutId, TxInId)] -> ReaderT SqlBackend m ()
updateListTxOutConsumedByTxInId ls =
updateListTxOutConsumedByTxInId (f <$> ls)
where
f (txOutId, txInId) = (changeKey txOutId, changeKey txInId)
14 changes: 14 additions & 0 deletions cardano-db/src/Cardano/Db/Query.hs
Expand Up @@ -42,6 +42,7 @@ module Cardano.Db.Query (
queryTxCount,
queryTxId,
queryTxOutValue,
queryTxOutValue2,
queryTxOutCredentials,
queryEpochStakeCount,
queryMinRefId,
Expand Down Expand Up @@ -583,6 +584,19 @@ queryTxOutValue (hash, index) = do
pure (txOut ^. TxOutTxId, txOut ^. TxOutValue)
pure $ maybeToEither (DbLookupTxHash hash) unValue2 (listToMaybe res)

-- | Like 'queryTxOutValue' but also return the 'TxOutId'
queryTxOutValue2 :: MonadIO m => (ByteString, Word64) -> ReaderT SqlBackend m (Either LookupFail (TxId, TxOutId, DbLovelace))
queryTxOutValue2 (hash, index) = do
res <- select $ do
(tx :& txOut) <-
from
$ table @Tx
`innerJoin` table @TxOut
`on` (\(tx :& txOut) -> tx ^. TxId ==. txOut ^. TxOutTxId)
where_ (txOut ^. TxOutIndex ==. val index &&. tx ^. TxHash ==. val hash)
pure (txOut ^. TxOutTxId, txOut ^. TxOutId, txOut ^. TxOutValue)
pure $ maybeToEither (DbLookupTxHash hash) unValue3 (listToMaybe res)

-- | Give a (tx hash, index) pair, return the TxOut Credentials.
queryTxOutCredentials :: MonadIO m => (ByteString, Word64) -> ReaderT SqlBackend m (Either LookupFail (Maybe ByteString, Bool))
queryTxOutCredentials (hash, index) = do
Expand Down
1 change: 1 addition & 0 deletions cardano-db/src/Cardano/Db/Schema.hs
Expand Up @@ -168,6 +168,7 @@ share
txOutId TxId noreference -- The transaction where this was created as an output.
txOutIndex Word64 sqltype=txindex
redeemerId RedeemerId Maybe noreference
deriving Show

CollateralTxIn
txInId TxId noreference -- The transaction where this is used as an input.
Expand Down

0 comments on commit d5b762a

Please sign in to comment.