Skip to content

Commit

Permalink
Merge #3717
Browse files Browse the repository at this point in the history
3717: [ADP-2367] swap DbPendingTxs layer to new implementation r=paolino a=paolino

This is the last step of the new submissions store saga.

- swap the dbpendingtx implementation to use the new submissions store
- create migration code
- test migration code 

ADP-2367

Co-authored-by: paolo veronelli <paolo.veronelli@gmail.com>
Co-authored-by: Heinrich Apfelmus <heinrich.apfelmus@iohk.io>
  • Loading branch information
3 people committed Feb 10, 2023
2 parents 41deda8 + 7c9b05c commit 0537715
Show file tree
Hide file tree
Showing 24 changed files with 296 additions and 908 deletions.
13 changes: 6 additions & 7 deletions lib/wallet/cardano-wallet.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ library
, lattices
, lens
, lifted-async
, list-transformer
, math-functions
, memory
, monad-control
Expand Down Expand Up @@ -226,18 +227,15 @@ library
Cardano.Wallet.DB.Pure.Implementation
Cardano.Wallet.DB.Pure.Layer
Cardano.Wallet.DB.Sqlite.Migration
Cardano.Wallet.DB.Sqlite.Migration.NewSubmissionStore
Cardano.Wallet.DB.Sqlite.Schema
Cardano.Wallet.DB.Sqlite.Stores
Cardano.Wallet.DB.Sqlite.Types
Cardano.Wallet.DB.Store.Checkpoints
Cardano.Wallet.DB.Store.QueryStore
Cardano.Wallet.DB.Store.Meta.Model
Cardano.Wallet.DB.Store.Meta.Store
Cardano.Wallet.DB.Store.Submissions.Model
Cardano.Wallet.DB.Store.Submissions.New.Layer
Cardano.Wallet.DB.Store.Submissions.New.Operations
Cardano.Wallet.DB.Store.Submissions.Store
Cardano.Wallet.DB.Store.Submissions.Layer
Cardano.Wallet.DB.Store.Submissions.Operations
Cardano.Wallet.DB.Store.Transactions.Decoration
Cardano.Wallet.DB.Store.Transactions.Layer
Cardano.Wallet.DB.Store.Transactions.Model
Expand Down Expand Up @@ -795,6 +793,7 @@ test-suite unit
, splitmix
, strict-containers
, strict-non-empty-containers
, string-interpolate
, string-qq
, temporary
, text
Expand Down Expand Up @@ -847,8 +846,6 @@ test-suite unit
Cardano.Wallet.DB.StateMachine
Cardano.Wallet.DB.Store.Meta.ModelSpec
Cardano.Wallet.DB.Store.Meta.StoreSpec
Cardano.Wallet.DB.Store.Submissions.ModelSpec
Cardano.Wallet.DB.Store.Submissions.New.StoreSpec
Cardano.Wallet.DB.Store.Submissions.StoreSpec
Cardano.Wallet.DB.Store.Transactions.StoreSpec
Cardano.Wallet.DB.Store.Wallets.StoreSpec
Expand Down Expand Up @@ -928,6 +925,7 @@ test-suite integration
, lobemo-backend-ekg
, mock-token-metadata
, network-uri
, string-interpolate
, text
, text-class
, unliftio
Expand Down Expand Up @@ -957,6 +955,7 @@ benchmark restore
, generic-lens
, iohk-monitoring
, say
, string-interpolate
, text
, text-class
, time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ import Cardano.Wallet.Shelley.Network.Discriminant
import Cardano.Wallet.Unsafe
( unsafeFromHex, unsafeMkPercentage )
import Control.Monad
( forM_ )
( forM_, when )
import Control.Monad.IO.Class
( liftIO )
import Control.Monad.Trans.Resource
Expand Down Expand Up @@ -651,8 +651,8 @@ spec = describe "SHELLEY_STAKE_POOLS" $ do
, expectField (#withdrawals)
(\[ApiWithdrawal _ c] -> c .> Quantity 0)
]

it "STAKE_POOLS_JOIN_05 - \
-- TODO: ADP-2662
when False $ it "STAKE_POOLS_JOIN_05 - \
\Can join when stake key already exists" $ \ctx -> runResourceT $ do
let walletWithPreRegKey =
[ "over", "decorate", "flock", "badge", "beauty"
Expand Down
9 changes: 7 additions & 2 deletions lib/wallet/src/Cardano/Wallet/DB.hs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import Prelude

import Cardano.Address.Derivation
( XPrv )
import Cardano.Wallet.DB.Store.Submissions.New.Operations
import Cardano.Wallet.DB.Store.Submissions.Operations
( SubmissionMeta (..) )
import Cardano.Wallet.DB.Store.Transactions.Decoration
( TxInDecorator )
Expand Down Expand Up @@ -752,7 +752,12 @@ getInSubmissionTransaction_ DBPendingTxs{getInSubmissionTransactions_} wid txid

-- | A database layer for storing in-submission transactions.
data DBPendingTxs stm = DBPendingTxs
{ putLocalTxSubmission_
{ emptyTxSubmissions_
:: WalletId
-> stm ()
-- ^ Add overwrite an empty submisison pool to the given wallet.

, putLocalTxSubmission_
:: WalletId
-> Hash "Tx"
-> SealedTx
Expand Down
162 changes: 14 additions & 148 deletions lib/wallet/src/Cardano/Wallet/DB/Layer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedLabels #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE RecursiveDo #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}

Expand Down Expand Up @@ -79,9 +80,6 @@ import Cardano.Wallet.DB
, DBTxHistory (..)
, DBWalletMeta (..)
, DBWallets (..)
, ErrNoSuchTransaction (..)
, ErrPutLocalTxSubmission (..)
, ErrRemoveTx (..)
, ErrWalletAlreadyExists (..)
, mkDBLayerFromParts
)
Expand All @@ -92,7 +90,6 @@ import Cardano.Wallet.DB.Sqlite.Schema
, DelegationReward (..)
, EntityField (..)
, Key (..)
, LocalTxSubmission (..)
, PrivateKey (..)
, StakeKeyCertificate (..)
, TxMeta (..)
Expand All @@ -109,16 +106,18 @@ import Cardano.Wallet.DB.Store.Meta.Model
, ManipulateTxMetaHistory (..)
, TxMetaHistory (..)
)
import Cardano.Wallet.DB.Store.Submissions.Model
( TxLocalSubmissionHistory (..) )
import Cardano.Wallet.DB.Store.Submissions.Layer
( mkDbPendingTxs )
import Cardano.Wallet.DB.Store.Submissions.Operations
( mkStoreWalletsSubmissions )
import Cardano.Wallet.DB.Store.Transactions.Decoration
( TxInDecorator, decorateTxInsForReadTx, decorateTxInsForRelation )
import Cardano.Wallet.DB.Store.Transactions.Model
( TxSet (..) )
import Cardano.Wallet.DB.Store.Transactions.TransactionInfo
( mkTransactionInfoFromRelation )
import Cardano.Wallet.DB.Store.Wallets.Model
( DeltaWalletsMetaWithSubmissions (..), TxWalletsHistory )
( TxWalletsHistory )
import Cardano.Wallet.DB.Store.Wallets.Store
( DeltaTxWalletsHistory (..), mkStoreTxWalletsHistory )
import Cardano.Wallet.DB.WalletState
Expand Down Expand Up @@ -149,7 +148,7 @@ import Control.Monad.IO.Class
import Control.Monad.Trans
( lift )
import Control.Monad.Trans.Except
( ExceptT (..), runExceptT )
( ExceptT (..) )
import Control.Tracer
( Tracer, contramap, traceWith )
import Data.Coerce
Expand All @@ -174,18 +173,13 @@ import Data.Text.Class
( ToText (..), fromText )
import Data.Word
( Word32 )
import Database.Persist.Class
( toPersistValue )
import Database.Persist.Sql
( Entity (..)
, Filter
, SelectOpt (..)
, Single (..)
, Update (..)
, deleteWhere
, insert_
, rawExecute
, rawSql
, repsert
, selectFirst
, selectKeysList
Expand Down Expand Up @@ -213,7 +207,6 @@ import UnliftIO.MVar
( modifyMVar, modifyMVar_, newMVar, readMVar, withMVar )

import qualified Cardano.Wallet.DB.Sqlite.Schema as DB
import qualified Cardano.Wallet.DB.Store.Submissions.Model as TxSubmissions
import qualified Cardano.Wallet.Primitive.Model as W
import qualified Cardano.Wallet.Primitive.Passphrase as W
import qualified Cardano.Wallet.Primitive.Types as W
Expand Down Expand Up @@ -506,7 +499,7 @@ newDBLayerWith
-> SqliteContext
-- ^ A (thread-)safe wrapper for query execution.
-> IO (DBLayer IO s k)
newDBLayerWith _cacheBehavior _tr ti SqliteContext{runQuery} = do
newDBLayerWith _cacheBehavior _tr ti SqliteContext{runQuery} = mdo
-- FIXME LATER during ADP-1043:
-- Remove the 'NoCache' behavior, we cannot get it back.
-- This will affect read benchmarks, they will need to benchmark
Expand Down Expand Up @@ -575,6 +568,7 @@ newDBLayerWith _cacheBehavior _tr ti SqliteContext{runQuery} = do
void $ modifyDBMaybe transactionsDBVar $ \(_txsOld, _ws) ->
let delta = Just $ ExpandTxWalletsHistory wid txs
in (delta, Right ())
emptyTxSubmissions_ dbPendingTxs wid
pure res

, readGenesisParameters_ = selectGenesisParameters
Expand Down Expand Up @@ -657,94 +651,9 @@ newDBLayerWith _cacheBehavior _tr ti SqliteContext{runQuery} = do
{-----------------------------------------------------------------------
Pending Txs
-----------------------------------------------------------------------}
let
dbPendingTxs = DBPendingTxs
{ putLocalTxSubmission_ = \wid txid tx sl -> do
let errNoSuchWallet = ErrPutLocalTxSubmissionNoSuchWallet $
ErrNoSuchWallet wid
let errNoSuchTx = ErrPutLocalTxSubmissionNoSuchTransaction $
ErrNoSuchTransaction wid txid
ExceptT $ modifyDBMaybe transactionsDBVar
$ \(_txsOld, ws) -> do
case Map.lookup wid ws of
Nothing -> (Nothing, Left errNoSuchWallet)
Just (TxMetaHistory metas, _) -> case
Map.lookup (TxId txid) metas of
Nothing -> (Nothing, Left errNoSuchTx)
Just _ ->
let
delta = Just
$ ChangeTxMetaWalletsHistory wid
$ ChangeSubmissions
$ TxSubmissions.Expand
$ TxLocalSubmissionHistory
$ Map.fromList [
( TxId txid
, LocalTxSubmission (TxId txid)
wid sl tx
)
]
in (delta, Right ())

, addTxSubmission_ = \wid (tx, meta, binary) sl -> do
putTxHistory_ dbTxHistory
wid [(tx, meta)]
void $ runExceptT $ putLocalTxSubmission_ dbPendingTxs
wid (tx ^. #txId) binary sl

, resubmitTx_ = \wid txId sealed tip ->
void $ runExceptT $ putLocalTxSubmission_ dbPendingTxs
wid txId sealed tip

, getInSubmissionTransactions_ = \_ -> pure []

, readLocalTxSubmissionPending_ =
fmap (map localTxSubmissionFromEntity)
. listPendingLocalTxSubmissionQuery

, rollForwardTxSubmissions_ = \wid tip _txs ->
selectWallet wid >>= \case
Nothing -> pure () -- non-existent wallet caught outside
Just _ -> modifyDBMaybe transactionsDBVar $ \_ ->
let
delta = Just
$ ChangeTxMetaWalletsHistory wid
$ ChangeMeta
$ Manipulate
$ AgeTxMetaHistory tip
in (delta, ())

, removePendingOrExpiredTx_ = \wid txId ->
let noTx =
( Nothing
, Left
$ ErrRemoveTxNoSuchTransaction
$ ErrNoSuchTransaction wid txId
)
in ExceptT $ selectWallet wid >>= \case
Nothing -> pure $ Left
$ ErrRemoveTxNoSuchWallet
$ ErrNoSuchWallet wid
Just _ -> modifyDBMaybe transactionsDBVar
$ \(_ , ws) -> fromMaybe noTx $ do
(TxMetaHistory metas, _) <- Map.lookup wid ws
DB.TxMeta{..} <- Map.lookup (TxId txId) metas
pure $
if txMetaStatus == W.InLedger
then (Nothing
, Left $ ErrRemoveTxAlreadyInLedger txId)
else
let delta = Just
$ ChangeTxMetaWalletsHistory wid
$ ChangeMeta
$ Manipulate
$ PruneTxMetaHistory $ TxId txId
in (delta, Right ())

, rollBackSubmissions_ = \_ _ -> pure ()

, pruneByFinality_ = \ _ _ -> pure ()
}
submissionDBVar <- runQuery $ loadDBVar mkStoreWalletsSubmissions

let dbPendingTxs = mkDbPendingTxs submissionDBVar

let rollbackTo_ wid requestedPoint = do
mNearestCheckpoint <-
Expand Down Expand Up @@ -783,7 +692,6 @@ newDBLayerWith _cacheBehavior _tr ti SqliteContext{runQuery} = do
let
delta = Just
$ ChangeTxMetaWalletsHistory wid
$ ChangeMeta
$ Manipulate
$ RollBackTxMetaHistory nearestPoint
in (delta, Right ())
Expand All @@ -799,7 +707,6 @@ newDBLayerWith _cacheBehavior _tr ti SqliteContext{runQuery} = do
Just cp -> Right <$> do
let tip = cp ^. #currentTip
pruneCheckpoints wid epochStability tip
pruneLocalTxSubmission wid epochStability tip
lift $ modifyDBMaybe transactionsDBVar $ \_ ->
(Just GarbageCollectTxWalletsHistory, ())
lift $ pruneByFinality_ dbPendingTxs wid finalitySlot
Expand Down Expand Up @@ -1045,7 +952,7 @@ getTxMetas
-> TxWalletsHistory
-> [DB.TxMeta]
getTxMetas wid (_,wmetas) = do
(TxMetaHistory metas, _) <- maybeToList $ Map.lookup wid wmetas
TxMetaHistory metas <- maybeToList $ Map.lookup wid wmetas
toList metas

-- | Lookup 'TxMeta' for a given wallet and 'TxId'.
Expand All @@ -1056,7 +963,7 @@ lookupTxMeta
-> TxWalletsHistory
-> Maybe DB.TxMeta
lookupTxMeta wid txid (_,wmetas) = do
(TxMetaHistory metas, _) <- Map.lookup wid wmetas
TxMetaHistory metas <- Map.lookup wid wmetas
Map.lookup txid metas

-- | For a given 'TxMeta', read all necessary data to construct
Expand Down Expand Up @@ -1086,47 +993,6 @@ selectTransactionInfo ti tip txSet meta =
decoration = decorateTxInsForRelation txSet transaction
in mkTransactionInfoFromRelation ti tip transaction decoration meta

-- | Returns the initial submission slot and submission record for all pending
-- transactions in the wallet.
listPendingLocalTxSubmissionQuery
:: W.WalletId
-> SqlPersistT IO [(W.SlotNo, LocalTxSubmission)]
listPendingLocalTxSubmissionQuery wid = fmap unRaw <$> rawSql query params
where
-- fixme: sort results
query =
"SELECT tx_meta.slot,?? " <>
"FROM tx_meta INNER JOIN local_tx_submission " <>
"ON tx_meta.wallet_id=local_tx_submission.wallet_id " <>
" AND tx_meta.tx_id=local_tx_submission.tx_id " <>
"WHERE tx_meta.wallet_id=? AND tx_meta.status=? " <>
"ORDER BY local_tx_submission.wallet_id, local_tx_submission.tx_id"
params = [toPersistValue wid, toPersistValue W.Pending]
unRaw (Single sl, Entity _ tx) = (sl, tx)

localTxSubmissionFromEntity
:: (W.SlotNo, LocalTxSubmission)
-> W.LocalTxSubmissionStatus W.SealedTx
localTxSubmissionFromEntity (_sl0, LocalTxSubmission (TxId txid) _ sl tx) =
W.LocalTxSubmissionStatus txid tx sl

-- | Remove transactions from the local submission pool once they can no longer
-- be rolled back.
pruneLocalTxSubmission
:: W.WalletId
-> Quantity "block" Word32
-> W.BlockHeader
-> SqlPersistT IO ()
pruneLocalTxSubmission wid (Quantity epochStability) tip =
rawExecute query params
where
query =
"DELETE FROM local_tx_submission " <>
"WHERE wallet_id=? AND tx_id IN " <>
"( SELECT tx_id FROM tx_meta WHERE tx_meta.block_height < ? )"
params = [toPersistValue wid, toPersistValue stableHeight]
stableHeight = getQuantity (tip ^. #blockHeight) - epochStability

selectPrivateKey
:: (MonadIO m, PersistPrivateKey (k 'RootK))
=> W.WalletId
Expand Down

0 comments on commit 0537715

Please sign in to comment.