Skip to content

Commit

Permalink
Add runLocalTxSubmissionPool
Browse files Browse the repository at this point in the history
  • Loading branch information
rvl committed Apr 8, 2021
1 parent ec573f6 commit 9f8c6ef
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 6 deletions.
2 changes: 1 addition & 1 deletion lib/core/cardano-wallet-core.cabal
Expand Up @@ -189,8 +189,8 @@ library
Cardano.Wallet.Primitive.Types.UTxO
Cardano.Wallet.Primitive.Types.UTxOIndex
Cardano.Wallet.Primitive.Types.UTxOIndex.Internal
Cardano.Wallet.TokenMetadata.MockServer
Cardano.Wallet.Registry
Cardano.Wallet.TokenMetadata.MockServer
Cardano.Wallet.Transaction
Cardano.Wallet.Unsafe
Cardano.Wallet.Version
Expand Down
86 changes: 82 additions & 4 deletions lib/core/src/Cardano/Wallet.hs
Expand Up @@ -137,6 +137,7 @@ module Cardano.Wallet
, getTransaction
, submitExternalTx
, submitTx
, runLocalTxSubmissionPool
, ErrMkTx (..)
, ErrSubmitTx (..)
, ErrSubmitExternalTx (..)
Expand Down Expand Up @@ -189,7 +190,7 @@ import Cardano.Wallet.DB
, sparseCheckpoints
)
import Cardano.Wallet.Logging
( traceWithExceptT )
( BracketLog, bracketTracer, traceWithExceptT )
import Cardano.Wallet.Network
( ErrGetAccountBalance (..)
, ErrPostTx (..)
Expand Down Expand Up @@ -312,6 +313,7 @@ import Cardano.Wallet.Primitive.Types.TokenBundle
( TokenBundle )
import Cardano.Wallet.Primitive.Types.Tx
( Direction (..)
, LocalTxSubmissionStatus
, SealedTx (..)
, TransactionInfo (..)
, Tx
Expand Down Expand Up @@ -344,11 +346,11 @@ import Cardano.Wallet.Transaction
import Control.DeepSeq
( NFData )
import Control.Monad
( forM, forM_, replicateM, unless, when )
( forM, forM_, forever, replicateM, unless, when )
import Control.Monad.Class.MonadTime
( getCurrentTime )
( DiffTime, MonadMonotonicTime (..), diffTime, getCurrentTime )
import Control.Monad.IO.Unlift
( liftIO )
( MonadUnliftIO, liftIO )
import Control.Monad.Trans.Class
( lift )
import Control.Monad.Trans.Except
Expand Down Expand Up @@ -424,6 +426,8 @@ import Type.Reflection
( Typeable, typeRep )
import UnliftIO.Exception
( Exception )
import UnliftIO.MVar
( modifyMVar_, newMVar )

import qualified Cardano.Crypto.Wallet as CC
import qualified Cardano.Wallet.Primitive.AddressDiscovery.Random as Rnd
Expand Down Expand Up @@ -1574,6 +1578,8 @@ submitTx ctx wid (tx, meta, binary) = db & \DBLayer{..} -> do
withExceptT ErrSubmitTxNetwork $ traceWithExceptT tr $
postTx nw binary
mapExceptT atomically $ do
lift $ putLocalTxSubmission (PrimaryKey wid)
(tx ^. #txId) binary (meta ^. #slotNo)
withExceptT ErrSubmitTxNoSuchWallet $
putTxHistory (PrimaryKey wid) [(tx, meta)]
where
Expand All @@ -1582,6 +1588,9 @@ submitTx ctx wid (tx, meta, binary) = db & \DBLayer{..} -> do
tr = contramap (MsgTxSubmit . MsgPostTxResult (tx ^. #txId)) (ctx ^. logger)

-- | Broadcast an externally-signed transaction to the network.
--
-- NOTE: external transactions will not be added to the LocalTxSubmission pool,
-- so the user must retry submission themselves.
submitExternalTx
:: forall ctx k.
( HasNetworkLayer ctx
Expand Down Expand Up @@ -1623,6 +1632,63 @@ forgetTx ctx wid tid = db & \DBLayer{..} -> do
where
db = ctx ^. dbLayer @s @k

-- | Given a LocalTxSubmission record, calculate the slot when it should be
-- retried next.
--
-- The current implementation is really basic. Retry every 10 slots.
scheduleLocalTxSubmission :: LocalTxSubmissionStatus tx -> SlotNo
scheduleLocalTxSubmission st = (st ^. #latestSubmission) + 10

-- | Retry submission of pending transactions.
runLocalTxSubmissionPool
:: forall ctx s k a.
( HasLogger WalletLog ctx
, HasNetworkLayer ctx
, HasDBLayer s k ctx
)
=> ctx
-> WalletId
-> IO a
runLocalTxSubmissionPool ctx wid = db & \DBLayer{..} -> forever $ do
submitPending <- rateLimited $ \bh -> bracketTracer trBracket $ do
pending <- atomically $ getLocalTxSubmissionPending (PrimaryKey wid)
let sl = bh ^. #slotNo
-- Re-submit transactions due, ignore errors
forM_ (filter (isScheduled sl) pending) $ \st -> do
res <- runExceptT $ postTx nw (st ^. #submittedTx)
traceWith tr (MsgRetryPostTxResult (st ^. #txId) res)
atomically $ putLocalTxSubmission
(PrimaryKey wid)
(st ^. #txId)
(st ^. #submittedTx)
sl
watchNodeTip submitPending
where
nw = ctx ^. networkLayer
db = ctx ^. dbLayer @s @k
NetworkLayer{watchNodeTip} = ctx ^. networkLayer
tr = contramap MsgTxSubmit (ctx ^. logger @WalletLog)
trBracket = contramap MsgProcessPendingPool tr

isScheduled now = (<= now) . scheduleLocalTxSubmission

-- Limit pool check frequency to every 1000ms at most.
rateLimited = throttle 1

-- | Return a function to run an action at most once every _interval_.
throttle
:: (MonadUnliftIO m, MonadMonotonicTime m)
=> DiffTime
-> (a -> m ())
-> m (a -> m ())
throttle interval action = do
var <- newMVar Nothing
pure $ \arg -> modifyMVar_ var $ \prev -> do
now <- getMonotonicTime
if (maybe interval (diffTime now) prev >= interval)
then action arg $> Just now
else pure prev

-- | List all transactions and metadata from history for a given wallet.
listTransactions
:: forall ctx s k.
Expand Down Expand Up @@ -2438,6 +2504,8 @@ instance HasSeverityAnnotation WalletLog where

data TxSubmitLog
= MsgPostTxResult (Hash "Tx") (Either ErrPostTx ())
| MsgRetryPostTxResult (Hash "Tx") (Either ErrPostTx ())
| MsgProcessPendingPool BracketLog
deriving (Show, Eq)

instance ToText TxSubmitLog where
Expand All @@ -2446,9 +2514,19 @@ instance ToText TxSubmitLog where
"Transaction " <> pretty txid <> " " <> case res of
Right _ -> "accepted by local node"
Left err -> "failed: " <> toText err
MsgRetryPostTxResult txid res ->
"Transaction " <> pretty txid <>
" resubmitted to local node and " <> case res of
Right _ -> "accepted again"
Left _ -> "not accepted (this is expected)"
MsgProcessPendingPool msg ->
"Processing the pending local tx submission pool: " <> toText msg

instance HasPrivacyAnnotation TxSubmitLog
instance HasSeverityAnnotation TxSubmitLog where
getSeverityAnnotation = \case
MsgPostTxResult _ (Right _) -> Info
MsgPostTxResult _ (Left _) -> Error
MsgRetryPostTxResult _ (Right _) -> Info
MsgRetryPostTxResult _ (Left _) -> Debug
MsgProcessPendingPool msg -> getSeverityAnnotation msg
4 changes: 3 additions & 1 deletion lib/core/src/Cardano/Wallet/Api/Server.hs
Expand Up @@ -2391,7 +2391,9 @@ registerWorker ctx before coworker wid =
-- fixme: ADP-641 Review error handling here
, workerMain = \ctx' _ -> race_
(unsafeRunExceptT $ W.restoreWallet ctx' wid)
(coworker ctx' wid)
(race_
(W.runLocalTxSubmissionPool ctx' wid)
(coworker ctx' wid))
}

-- | Something to pass as the coworker action to 'newApiLayer', which does
Expand Down

0 comments on commit 9f8c6ef

Please sign in to comment.