From 002893f280cbd578f0693d79e7b2304e3d9cd1d8 Mon Sep 17 00:00:00 2001 From: Mark Travis Date: Mon, 11 Sep 2023 15:47:40 -0700 Subject: [PATCH] Apply transaction batches in periodic intervals (#4504) Add new transaction submission API field, "sync", which determines behavior of the server while submitting transactions: 1) sync (default): Process transactions in a batch immediately, and return only once the transaction has been processed. 2) async: Put transaction into the batch for the next processing interval and return immediately. 3) wait: Put transaction into the batch for the next processing interval and return only after it is processed. --- Builds/CMake/RippledCore.cmake | 1 + cfg/rippled-example.cfg | 2 +- cfg/rippled-reporting.cfg | 2 +- src/ripple/app/ledger/impl/LedgerMaster.cpp | 33 +-- src/ripple/app/main/Application.cpp | 1 + src/ripple/app/misc/NetworkOPs.cpp | 219 +++++++++--------- src/ripple/app/misc/NetworkOPs.h | 43 +++- src/ripple/app/tx/impl/apply.cpp | 10 +- src/ripple/basics/SubmitSync.h | 41 ++++ src/ripple/core/Config.h | 11 +- src/ripple/overlay/impl/PeerImp.cpp | 10 +- src/ripple/protocol/TER.h | 1 + src/ripple/protocol/impl/TER.cpp | 1 + src/ripple/protocol/jss.h | 1 + src/ripple/rpc/handlers/Submit.cpp | 10 +- src/ripple/rpc/handlers/SubmitMultiSigned.cpp | 9 +- src/ripple/rpc/impl/RPCHelpers.cpp | 21 ++ src/ripple/rpc/impl/RPCHelpers.h | 10 + src/ripple/rpc/impl/TransactionSign.cpp | 11 +- src/ripple/rpc/impl/TransactionSign.h | 14 +- src/test/app/Transaction_ordering_test.cpp | 4 + src/test/jtx/Env_test.cpp | 91 ++++++++ src/test/rpc/JSONRPC_test.cpp | 9 +- src/test/rpc/RobustTransaction_test.cpp | 6 +- 24 files changed, 398 insertions(+), 163 deletions(-) create mode 100644 src/ripple/basics/SubmitSync.h diff --git a/Builds/CMake/RippledCore.cmake b/Builds/CMake/RippledCore.cmake index 2b05ebc4df5..5f5dbf5eb0a 100644 --- a/Builds/CMake/RippledCore.cmake +++ b/Builds/CMake/RippledCore.cmake @@ -199,6 +199,7 @@ install ( src/ripple/basics/StringUtilities.h src/ripple/basics/TaggedCache.h src/ripple/basics/tagged_integer.h + src/ripple/basics/SubmitSync.h src/ripple/basics/ThreadSafetyAnalysis.h src/ripple/basics/ThreadUtilities.h src/ripple/basics/ToString.h diff --git a/cfg/rippled-example.cfg b/cfg/rippled-example.cfg index 9891e44c885..6e1d553a970 100644 --- a/cfg/rippled-example.cfg +++ b/cfg/rippled-example.cfg @@ -480,7 +480,7 @@ # # Configure the maximum number of transactions to have in the job queue # -# Must be a number between 100 and 1000, defaults to 250 +# Must be a number between 1000 and 100000, defaults to 10000 # # # [overlay] diff --git a/cfg/rippled-reporting.cfg b/cfg/rippled-reporting.cfg index dbafdd497fa..f09c17ae637 100644 --- a/cfg/rippled-reporting.cfg +++ b/cfg/rippled-reporting.cfg @@ -467,7 +467,7 @@ # # Configure the maximum number of transactions to have in the job queue # -# Must be a number between 100 and 1000, defaults to 250 +# Must be a number between 1000 and 100000, defaults to 10000 # # # [overlay] diff --git a/src/ripple/app/ledger/impl/LedgerMaster.cpp b/src/ripple/app/ledger/impl/LedgerMaster.cpp index ff42a88a84f..5c084e25874 100644 --- a/src/ripple/app/ledger/impl/LedgerMaster.cpp +++ b/src/ripple/app/ledger/impl/LedgerMaster.cpp @@ -549,22 +549,25 @@ void LedgerMaster::applyHeldTransactions() { std::lock_guard sl(m_mutex); + // It can be expensive to modify the open ledger even with no transactions + // to process. Regardless, make sure to reset held transactions with + // the parent. + if (mHeldTransactions.size()) + { + app_.openLedger().modify([&](OpenView& view, beast::Journal j) { + bool any = false; + for (auto const& it : mHeldTransactions) + { + ApplyFlags flags = tapNONE; + auto const result = + app_.getTxQ().apply(app_, view, it.second, flags, j); + if (result.second) + any = true; + } + return any; + }); + } - app_.openLedger().modify([&](OpenView& view, beast::Journal j) { - bool any = false; - for (auto const& it : mHeldTransactions) - { - ApplyFlags flags = tapNONE; - auto const result = - app_.getTxQ().apply(app_, view, it.second, flags, j); - if (result.second) - any = true; - } - return any; - }); - - // VFALCO TODO recreate the CanonicalTxSet object instead of resetting - // it. // VFALCO NOTE The hash for an open ledger is undefined so we use // something that is a reasonable substitute. mHeldTransactions.reset(app_.openLedger().current()->info().parentHash); diff --git a/src/ripple/app/main/Application.cpp b/src/ripple/app/main/Application.cpp index 42bf6d66c9d..83cf762cfcb 100644 --- a/src/ripple/app/main/Application.cpp +++ b/src/ripple/app/main/Application.cpp @@ -1527,6 +1527,7 @@ ApplicationImp::start(bool withTimers) { setSweepTimer(); setEntropyTimer(); + m_networkOPs->setBatchApplyTimer(); } m_io_latency_sampler.start(); diff --git a/src/ripple/app/misc/NetworkOPs.cpp b/src/ripple/app/misc/NetworkOPs.cpp index 4d90e0622f8..e59dd1128ff 100644 --- a/src/ripple/app/misc/NetworkOPs.cpp +++ b/src/ripple/app/misc/NetworkOPs.cpp @@ -42,6 +42,7 @@ #include #include #include +#include #include #include #include @@ -233,6 +234,7 @@ class NetworkOPsImp final : public NetworkOPs , heartbeatTimer_(io_svc) , clusterTimer_(io_svc) , accountHistoryTxTimer_(io_svc) + , batchApplyTimer_(io_svc) , mConsensus( app, make_FeeVote( @@ -282,43 +284,12 @@ class NetworkOPsImp final : public NetworkOPs processTransaction( std::shared_ptr& transaction, bool bUnlimited, + RPC::SubmitSync sync, bool bLocal, FailHard failType) override; - /** - * For transactions submitted directly by a client, apply batch of - * transactions and wait for this transaction to complete. - * - * @param transaction Transaction object. - * @param bUnliimited Whether a privileged client connection submitted it. - * @param failType fail_hard setting from transaction submission. - */ - void - doTransactionSync( - std::shared_ptr transaction, - bool bUnlimited, - FailHard failType); - - /** - * For transactions not submitted by a locally connected client, fire and - * forget. Add to batch and trigger it to be processed if there's no batch - * currently being applied. - * - * @param transaction Transaction object - * @param bUnlimited Whether a privileged client connection submitted it. - * @param failType fail_hard setting from transaction submission. - */ - void - doTransactionAsync( - std::shared_ptr transaction, - bool bUnlimited, - FailHard failtype); - - /** - * Apply transactions in batches. Continue until none are queued. - */ - void - transactionBatch(); + bool + transactionBatch(bool drain) override; /** * Attempt to apply transactions and post-process based on the results. @@ -592,6 +563,15 @@ class NetworkOPsImp final : public NetworkOPs << "NetworkOPs: accountHistoryTxTimer cancel error: " << ec.message(); } + + ec.clear(); + batchApplyTimer_.cancel(ec); + if (ec) + { + JLOG(m_journal.error()) + << "NetworkOPs: batchApplyTimer cancel error: " + << ec.message(); + } } // Make sure that any waitHandlers pending in our timers are done. using namespace std::chrono_literals; @@ -710,6 +690,9 @@ class NetworkOPsImp final : public NetworkOPs void setAccountHistoryJobTimer(SubAccountHistoryInfoWeak subInfo); + void + setBatchApplyTimer() override; + Application& app_; beast::Journal m_journal; @@ -728,6 +711,8 @@ class NetworkOPsImp final : public NetworkOPs boost::asio::steady_timer heartbeatTimer_; boost::asio::steady_timer clusterTimer_; boost::asio::steady_timer accountHistoryTxTimer_; + //! This timer is for applying transaction batches. + boost::asio::steady_timer batchApplyTimer_; RCLConsensus mConsensus; @@ -1002,6 +987,42 @@ NetworkOPsImp::setAccountHistoryJobTimer(SubAccountHistoryInfoWeak subInfo) [this, subInfo]() { setAccountHistoryJobTimer(subInfo); }); } +void +NetworkOPsImp::setBatchApplyTimer() +{ + using namespace std::chrono_literals; + // 100ms lag between batch intervals provides significant throughput gains + // with little increased latency. Tuning this figure further will + // require further testing. In general, increasing this figure will + // also increase theoretical throughput, but with diminishing returns. + auto constexpr batchInterval = 100ms; + + setTimer( + batchApplyTimer_, + batchInterval, + [this]() { + { + std::lock_guard lock(mMutex); + // Only do the job if there's work to do and it's not currently + // being done. + if (mTransactions.size() && + mDispatchState == DispatchState::none) + { + if (m_job_queue.addJob( + jtBATCH, "transactionBatch", [this]() { + transactionBatch(false); + })) + { + mDispatchState = DispatchState::scheduled; + } + return; + } + } + setBatchApplyTimer(); + }, + [this]() { setBatchApplyTimer(); }); +} + void NetworkOPsImp::processHeartbeatTimer() { @@ -1178,7 +1199,8 @@ NetworkOPsImp::submitTransaction(std::shared_ptr const& iTrans) m_job_queue.addJob(jtTRANSACTION, "submitTxn", [this, tx]() { auto t = tx; - processTransaction(t, false, false, FailHard::no); + processTransaction( + t, false, RPC::SubmitSync::async, false, FailHard::no); }); } @@ -1186,6 +1208,7 @@ void NetworkOPsImp::processTransaction( std::shared_ptr& transaction, bool bUnlimited, + RPC::SubmitSync sync, bool bLocal, FailHard failType) { @@ -1215,7 +1238,7 @@ NetworkOPsImp::processTransaction( // Not concerned with local checks at this point. if (validity == Validity::SigBad) { - JLOG(m_journal.info()) << "Transaction has bad signature: " << reason; + JLOG(m_journal.trace()) << "Transaction has bad signature: " << reason; transaction->setStatus(INVALID); transaction->setResult(temBAD_SIGNATURE); app_.getHashRouter().setFlags(transaction->getID(), SF_BAD); @@ -1225,100 +1248,72 @@ NetworkOPsImp::processTransaction( // canonicalize can change our pointer app_.getMasterTransaction().canonicalize(&transaction); - if (bLocal) - doTransactionSync(transaction, bUnlimited, failType); - else - doTransactionAsync(transaction, bUnlimited, failType); -} - -void -NetworkOPsImp::doTransactionAsync( - std::shared_ptr transaction, - bool bUnlimited, - FailHard failType) -{ - std::lock_guard lock(mMutex); - - if (transaction->getApplying()) - return; - - mTransactions.push_back( - TransactionStatus(transaction, bUnlimited, false, failType)); - transaction->setApplying(); - - if (mDispatchState == DispatchState::none) - { - if (m_job_queue.addJob( - jtBATCH, "transactionBatch", [this]() { transactionBatch(); })) - { - mDispatchState = DispatchState::scheduled; - } - } -} - -void -NetworkOPsImp::doTransactionSync( - std::shared_ptr transaction, - bool bUnlimited, - FailHard failType) -{ - std::unique_lock lock(mMutex); - + std::unique_lock lock(mMutex); if (!transaction->getApplying()) { - mTransactions.push_back( - TransactionStatus(transaction, bUnlimited, true, failType)); transaction->setApplying(); + mTransactions.push_back( + TransactionStatus(transaction, bUnlimited, bLocal, failType)); } - - do + switch (sync) { - if (mDispatchState == DispatchState::running) - { - // A batch processing job is already running, so wait. - mCond.wait(lock); - } - else - { - apply(lock); - - if (mTransactions.size()) + using enum RPC::SubmitSync; + case sync: + do { - // More transactions need to be applied, but by another job. - if (m_job_queue.addJob(jtBATCH, "transactionBatch", [this]() { - transactionBatch(); - })) - { - mDispatchState = DispatchState::scheduled; - } - } - } - } while (transaction->getApplying()); + // If a batch is being processed, then wait. Otherwise, + // process a batch. + if (mDispatchState == DispatchState::running) + mCond.wait(lock); + else + apply(lock); + } while (transaction->getApplying()); + break; + + case async: + // It's conceivable for the submitted transaction to be + // processed and its result to be modified before being returned + // to the client. Make a copy of the transaction and set its + // status to guarantee that the client gets the terSUBMITTED + // result in all cases. + transaction = std::make_shared(*transaction); + transaction->setResult(terSUBMITTED); + break; + + case wait: + mCond.wait( + lock, [&transaction] { return !transaction->getApplying(); }); + break; + + default: + assert(false); + } } -void -NetworkOPsImp::transactionBatch() +bool +NetworkOPsImp::transactionBatch(bool const drain) { - std::unique_lock lock(mMutex); - - if (mDispatchState == DispatchState::running) - return; - - while (mTransactions.size()) { - apply(lock); + std::unique_lock lock(mMutex); + if (mDispatchState == DispatchState::running || mTransactions.empty()) + return false; + + do + apply(lock); + while (drain && mTransactions.size()); } + setBatchApplyTimer(); + return true; } void NetworkOPsImp::apply(std::unique_lock& batchLock) { + assert(!mTransactions.empty()); + assert(mDispatchState != DispatchState::running); std::vector submit_held; std::vector transactions; mTransactions.swap(transactions); - assert(!transactions.empty()); - - assert(mDispatchState != DispatchState::running); mDispatchState = DispatchState::running; batchLock.unlock(); @@ -1702,7 +1697,9 @@ NetworkOPsImp::checkLastClosedLedger( switchLedgers = false; } else + { networkClosed = closedLedger; + } if (!switchLedgers) return false; diff --git a/src/ripple/app/misc/NetworkOPs.h b/src/ripple/app/misc/NetworkOPs.h index d53127ed3b6..59285311172 100644 --- a/src/ripple/app/misc/NetworkOPs.h +++ b/src/ripple/app/misc/NetworkOPs.h @@ -71,6 +71,10 @@ enum class OperatingMode { FULL = 4 //!< we have the ledger and can even validate }; +namespace RPC { +enum class SubmitSync; +} + /** Provides server functionality for clients. Clients include backend applications, local commands, and connected @@ -123,22 +127,47 @@ class NetworkOPs : public InfoSub::Source virtual void submitTransaction(std::shared_ptr const&) = 0; - /** - * Process transactions as they arrive from the network or which are - * submitted by clients. Process local transactions synchronously + /** Process a transaction. + * + * The transaction has been submitted either from the peer network or + * from a client. For client submissions, there are 3 distinct behaviors: + * 1) sync (default): process transactions in a batch immediately, + * and return only once the transaction has been processed. + * 2) async: Put transaction into the batch for the next processing + * interval and return immediately. + * 3) wait: Put transaction into the batch for the next processing + * interval and return only after it is processed. * - * @param transaction Transaction object + * @param transaction Transaction object. * @param bUnlimited Whether a privileged client connection submitted it. - * @param bLocal Client submission. - * @param failType fail_hard setting from transaction submission. + * @param sync Client submission synchronous behavior type requested. + * @param bLocal Whether submitted by client (local) or peer. + * @param failType Whether to fail hard or not. */ virtual void processTransaction( std::shared_ptr& transaction, bool bUnlimited, + RPC::SubmitSync sync, bool bLocal, FailHard failType) = 0; + /** Apply transactions in batches. + * + * Only a single batch unless drain is set. This is to optimize performance + * because there is significant overhead in applying each batch, whereas + * processing an individual transaction is fast. + * + * Setting the drain parameter is relevant for some transaction + * processing unit tests that expect all submitted transactions to + * be processed synchronously. + * + * @param drain Whether to process batches until none remain. + * @return Whether any transactions were processed. + */ + virtual bool + transactionBatch(bool drain) = 0; + //-------------------------------------------------------------------------- // // Owner functions @@ -187,6 +216,8 @@ class NetworkOPs : public InfoSub::Source setStandAlone() = 0; virtual void setStateTimer() = 0; + virtual void + setBatchApplyTimer() = 0; virtual void setNeedNetworkLedger() = 0; diff --git a/src/ripple/app/tx/impl/apply.cpp b/src/ripple/app/tx/impl/apply.cpp index c0704c5c3ae..4881f2a49b7 100644 --- a/src/ripple/app/tx/impl/apply.cpp +++ b/src/ripple/app/tx/impl/apply.cpp @@ -134,7 +134,7 @@ applyTransaction( if (retryAssured) flags = flags | tapRETRY; - JLOG(j.debug()) << "TXN " << txn.getTransactionID() + JLOG(j.trace()) << "TXN " << txn.getTransactionID() << (retryAssured ? "/retry" : "/final"); try @@ -142,7 +142,7 @@ applyTransaction( auto const result = apply(app, view, txn, flags, j); if (result.second) { - JLOG(j.debug()) + JLOG(j.trace()) << "Transaction applied: " << transHuman(result.first); return ApplyResult::Success; } @@ -151,17 +151,17 @@ applyTransaction( isTelLocal(result.first)) { // failure - JLOG(j.debug()) + JLOG(j.trace()) << "Transaction failure: " << transHuman(result.first); return ApplyResult::Fail; } - JLOG(j.debug()) << "Transaction retry: " << transHuman(result.first); + JLOG(j.trace()) << "Transaction retry: " << transHuman(result.first); return ApplyResult::Retry; } catch (std::exception const& ex) { - JLOG(j.warn()) << "Throws: " << ex.what(); + JLOG(j.trace()) << "Throws: " << ex.what(); return ApplyResult::Fail; } } diff --git a/src/ripple/basics/SubmitSync.h b/src/ripple/basics/SubmitSync.h new file mode 100644 index 00000000000..12311c676e8 --- /dev/null +++ b/src/ripple/basics/SubmitSync.h @@ -0,0 +1,41 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2023 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLE_BASICS_SUBMITSYNC_H_INCLUDED +#define RIPPLE_BASICS_SUBMITSYNC_H_INCLUDED + +namespace ripple { +namespace RPC { + +/** + * Possible values for defining synchronous behavior of the transaction + * submission API. + * 1) sync (default): Process transactions in a batch immediately, + * and return only once the transaction has been processed. + * 2) async: Put transaction into the batch for the next processing + * interval and return immediately. + * 3) wait: Put transaction into the batch for the next processing + * interval and return only after it is processed. + */ +enum class SubmitSync { sync, async, wait }; + +} // namespace RPC +} // namespace ripple + +#endif \ No newline at end of file diff --git a/src/ripple/core/Config.h b/src/ripple/core/Config.h index e86157762b3..6236f89fc52 100644 --- a/src/ripple/core/Config.h +++ b/src/ripple/core/Config.h @@ -210,7 +210,7 @@ class Config : public BasicConfig // Node storage configuration std::uint32_t LEDGER_HISTORY = 256; - std::uint32_t FETCH_DEPTH = 1000000000; + std::uint32_t FETCH_DEPTH = 1'000'000'000; // Tunable that adjusts various parameters, typically associated // with hardware parameters (RAM size and CPU cores). The default @@ -227,10 +227,11 @@ class Config : public BasicConfig // Enable the experimental Ledger Replay functionality bool LEDGER_REPLAY = false; - // Work queue limits - int MAX_TRANSACTIONS = 250; - static constexpr int MAX_JOB_QUEUE_TX = 1000; - static constexpr int MIN_JOB_QUEUE_TX = 100; + // Work queue limits. 10000 transactions is 2 full seconds of slowdown at + // 5000/s. + int MAX_TRANSACTIONS = 10'000; + static constexpr int MAX_JOB_QUEUE_TX = 100'000; + static constexpr int MIN_JOB_QUEUE_TX = 1'000; // Amendment majority time std::chrono::seconds AMENDMENT_MAJORITY_TIME = defaultAmendmentMajorityTime; diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp index 0d58a10abac..3afec605cfa 100644 --- a/src/ripple/overlay/impl/PeerImp.cpp +++ b/src/ripple/overlay/impl/PeerImp.cpp @@ -28,6 +28,7 @@ #include #include #include +#include #include #include #include @@ -39,13 +40,14 @@ #include #include #include +#include #include -#include #include #include #include +#include #include #include #include @@ -3109,7 +3111,11 @@ PeerImp::checkTransaction( bool const trusted(flags & SF_TRUSTED); app_.getOPs().processTransaction( - tx, trusted, false, NetworkOPs::FailHard::no); + tx, + trusted, + RPC::SubmitSync::async, + false, + NetworkOPs::FailHard::no); } catch (std::exception const& ex) { diff --git a/src/ripple/protocol/TER.h b/src/ripple/protocol/TER.h index a2743bace8d..edae58d83c9 100644 --- a/src/ripple/protocol/TER.h +++ b/src/ripple/protocol/TER.h @@ -208,6 +208,7 @@ enum TERcodes : TERUnderlyingType { terQUEUED, // Transaction is being held in TxQ until fee drops terPRE_TICKET, // Ticket is not yet in ledger but might be on its way terNO_AMM, // AMM doesn't exist for the asset pair + terSUBMITTED // Has been submitted async. }; //------------------------------------------------------------------------------ diff --git a/src/ripple/protocol/impl/TER.cpp b/src/ripple/protocol/impl/TER.cpp index 29b87351204..9da1bc70757 100644 --- a/src/ripple/protocol/impl/TER.cpp +++ b/src/ripple/protocol/impl/TER.cpp @@ -189,6 +189,7 @@ transResults() MAKE_ERROR(terQUEUED, "Held until escalated fee drops."), MAKE_ERROR(terPRE_TICKET, "Ticket is not yet in ledger."), MAKE_ERROR(terNO_AMM, "AMM doesn't exist for the asset pair."), + MAKE_ERROR(terSUBMITTED, "Transaction has been submitted."), MAKE_ERROR(tesSUCCESS, "The transaction was applied. Only final in a validated ledger."), }; diff --git a/src/ripple/protocol/jss.h b/src/ripple/protocol/jss.h index 567e63699e2..eaf0ffa74c3 100644 --- a/src/ripple/protocol/jss.h +++ b/src/ripple/protocol/jss.h @@ -600,6 +600,7 @@ JSS(sub_index); // in: LedgerEntry JSS(subcommand); // in: PathFind JSS(success); // rpc JSS(supported); // out: AmendmentTableImpl +JSS(sync_mode); // in: Submit JSS(system_time_offset); // out: NetworkOPs JSS(tag); // out: Peers JSS(taker); // in: Subscribe, BookOffers diff --git a/src/ripple/rpc/handlers/Submit.cpp b/src/ripple/rpc/handlers/Submit.cpp index 2b5c8bba925..8a702c5bd3e 100644 --- a/src/ripple/rpc/handlers/Submit.cpp +++ b/src/ripple/rpc/handlers/Submit.cpp @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -48,6 +49,10 @@ doSubmit(RPC::JsonContext& context) { context.loadType = Resource::feeMediumBurdenRPC; + auto const sync = RPC::getSubmitSyncMode(context.params); + if (!sync) + return sync.error(); + if (!context.params.isMember(jss::tx_blob)) { auto const failType = getFailHard(context); @@ -62,7 +67,8 @@ doSubmit(RPC::JsonContext& context) context.role, context.ledgerMaster.getValidatedLedgerAge(), context.app, - RPC::getProcessTxnFn(context.netOps)); + RPC::getProcessTxnFn(context.netOps), + *sync); ret[jss::deprecated] = "Signing support in the 'submit' command has been " @@ -131,7 +137,7 @@ doSubmit(RPC::JsonContext& context) auto const failType = getFailHard(context); context.netOps.processTransaction( - tpTrans, isUnlimited(context.role), true, failType); + tpTrans, isUnlimited(context.role), *sync, true, failType); } catch (std::exception& e) { diff --git a/src/ripple/rpc/handlers/SubmitMultiSigned.cpp b/src/ripple/rpc/handlers/SubmitMultiSigned.cpp index dbae6e95f7a..9b455a1961f 100644 --- a/src/ripple/rpc/handlers/SubmitMultiSigned.cpp +++ b/src/ripple/rpc/handlers/SubmitMultiSigned.cpp @@ -18,10 +18,12 @@ //============================================================================== #include +#include #include #include #include #include +#include #include namespace ripple { @@ -37,13 +39,18 @@ doSubmitMultiSigned(RPC::JsonContext& context) auto const failHard = context.params[jss::fail_hard].asBool(); auto const failType = NetworkOPs::doFailHard(failHard); + auto const sync = RPC::getSubmitSyncMode(context.params); + if (!sync) + return sync.error(); + return RPC::transactionSubmitMultiSigned( context.params, failType, context.role, context.ledgerMaster.getValidatedLedgerAge(), context.app, - RPC::getProcessTxnFn(context.netOps)); + RPC::getProcessTxnFn(context.netOps), + *sync); } } // namespace ripple diff --git a/src/ripple/rpc/impl/RPCHelpers.cpp b/src/ripple/rpc/impl/RPCHelpers.cpp index 7ad7fda4940..4a517733637 100644 --- a/src/ripple/rpc/impl/RPCHelpers.cpp +++ b/src/ripple/rpc/impl/RPCHelpers.cpp @@ -1166,5 +1166,26 @@ getLedgerByContext(RPC::JsonContext& context) return RPC::make_error( rpcNOT_READY, "findCreate failed to return an inbound ledger"); } + +ripple::Expected +getSubmitSyncMode(Json::Value const& params) +{ + using enum RPC::SubmitSync; + if (params.isMember(jss::sync_mode)) + { + std::string const syncMode = params[jss::sync_mode].asString(); + if (syncMode == "async") + return async; + else if (syncMode == "wait") + return wait; + else if (syncMode != "sync") + return Unexpected(RPC::make_error( + rpcINVALID_PARAMS, + "sync_mode parameter must be one of \"sync\", \"async\", or " + "\"wait\".")); + } + return sync; +} + } // namespace RPC } // namespace ripple diff --git a/src/ripple/rpc/impl/RPCHelpers.h b/src/ripple/rpc/impl/RPCHelpers.h index 0293fb15a21..5fa7ae804a2 100644 --- a/src/ripple/rpc/impl/RPCHelpers.h +++ b/src/ripple/rpc/impl/RPCHelpers.h @@ -26,6 +26,8 @@ #include #include +#include +#include #include #include #include @@ -293,6 +295,14 @@ getAPIVersionNumber(const Json::Value& value, bool betaEnabled); std::variant, Json::Value> getLedgerByContext(RPC::JsonContext& context); +/** Helper to parse submit_mode parameter to RPC submit. + * + * @param params RPC parameters + * @return Either the mode or an error object. + */ +ripple::Expected +getSubmitSyncMode(Json::Value const& params); + } // namespace RPC } // namespace ripple diff --git a/src/ripple/rpc/impl/TransactionSign.cpp b/src/ripple/rpc/impl/TransactionSign.cpp index 7610682fd1a..5cd0c9edee3 100644 --- a/src/ripple/rpc/impl/TransactionSign.cpp +++ b/src/ripple/rpc/impl/TransactionSign.cpp @@ -802,7 +802,8 @@ transactionSubmit( Role role, std::chrono::seconds validatedLedgerAge, Application& app, - ProcessTransactionFn const& processTransaction) + ProcessTransactionFn const& processTransaction, + RPC::SubmitSync sync) { using namespace detail; @@ -828,8 +829,7 @@ transactionSubmit( // Finally, submit the transaction. try { - // FIXME: For performance, should use asynch interface - processTransaction(txn.second, isUnlimited(role), true, failType); + processTransaction(txn.second, isUnlimited(role), sync, failType); } catch (std::exception&) { @@ -1038,7 +1038,8 @@ transactionSubmitMultiSigned( Role role, std::chrono::seconds validatedLedgerAge, Application& app, - ProcessTransactionFn const& processTransaction) + ProcessTransactionFn const& processTransaction, + RPC::SubmitSync sync) { auto const& ledger = app.openLedger().current(); auto j = app.journal("RPCHandler"); @@ -1211,7 +1212,7 @@ transactionSubmitMultiSigned( try { // FIXME: For performance, should use asynch interface - processTransaction(txn.second, isUnlimited(role), true, failType); + processTransaction(txn.second, isUnlimited(role), sync, failType); } catch (std::exception&) { diff --git a/src/ripple/rpc/impl/TransactionSign.h b/src/ripple/rpc/impl/TransactionSign.h index d9c76e189f0..a396e65af52 100644 --- a/src/ripple/rpc/impl/TransactionSign.h +++ b/src/ripple/rpc/impl/TransactionSign.h @@ -21,6 +21,7 @@ #define RIPPLE_RPC_TRANSACTIONSIGN_H_INCLUDED #include +#include #include #include @@ -75,7 +76,7 @@ checkFee( using ProcessTransactionFn = std::function& transaction, bool bUnlimited, - bool bLocal, + RPC::SubmitSync sync, NetworkOPs::FailHard failType)>; inline ProcessTransactionFn @@ -84,9 +85,10 @@ getProcessTxnFn(NetworkOPs& netOPs) return [&netOPs]( std::shared_ptr& transaction, bool bUnlimited, - bool bLocal, + RPC::SubmitSync sync, NetworkOPs::FailHard failType) { - netOPs.processTransaction(transaction, bUnlimited, bLocal, failType); + netOPs.processTransaction( + transaction, bUnlimited, sync, true, failType); }; } @@ -107,7 +109,8 @@ transactionSubmit( Role role, std::chrono::seconds validatedLedgerAge, Application& app, - ProcessTransactionFn const& processTransaction); + ProcessTransactionFn const& processTransaction, + RPC::SubmitSync sync); /** Returns a Json::objectValue. */ Json::Value @@ -126,7 +129,8 @@ transactionSubmitMultiSigned( Role role, std::chrono::seconds validatedLedgerAge, Application& app, - ProcessTransactionFn const& processTransaction); + ProcessTransactionFn const& processTransaction, + RPC::SubmitSync sync); } // namespace RPC } // namespace ripple diff --git a/src/test/app/Transaction_ordering_test.cpp b/src/test/app/Transaction_ordering_test.cpp index 0353df90663..01f870d0668 100644 --- a/src/test/app/Transaction_ordering_test.cpp +++ b/src/test/app/Transaction_ordering_test.cpp @@ -15,6 +15,7 @@ */ //============================================================================== +#include #include #include #include @@ -91,6 +92,7 @@ struct Transaction_ordering_test : public beast::unit_test::suite env(tx2, ter(terPRE_SEQ)); BEAST_EXPECT(env.seq(alice) == aliceSequence); env(tx1); + BEAST_EXPECT(env.app().getOPs().transactionBatch(false)); env.app().getJobQueue().rendezvous(); BEAST_EXPECT(env.seq(alice) == aliceSequence + 2); @@ -143,6 +145,8 @@ struct Transaction_ordering_test : public beast::unit_test::suite } env(tx[0]); + // Apply until no more deferred/held transactions. + BEAST_EXPECT(env.app().getOPs().transactionBatch(true)); env.app().getJobQueue().rendezvous(); BEAST_EXPECT(env.seq(alice) == aliceSequence + 5); diff --git a/src/test/jtx/Env_test.cpp b/src/test/jtx/Env_test.cpp index 6f09f49ed5d..6e26a40e25a 100644 --- a/src/test/jtx/Env_test.cpp +++ b/src/test/jtx/Env_test.cpp @@ -29,6 +29,7 @@ #include #include +#include #include namespace ripple { @@ -900,6 +901,95 @@ class Env_test : public beast::unit_test::suite pass(); } + void + testSyncSubmit() + { + using namespace jtx; + Env env(*this); + + auto const alice = Account{"alice"}; + auto const n = XRP(10000); + env.fund(n, alice); + BEAST_EXPECT(env.balance(alice) == n); + + // submit only + auto applyBlobTxn = [&env](char const* syncMode, auto&&... txnArgs) { + auto jt = env.jt(txnArgs...); + Serializer s; + jt.stx->add(s); + + Json::Value args{Json::objectValue}; + + args[jss::tx_blob] = strHex(s.slice()); + args[jss::fail_hard] = true; + args[jss::sync_mode] = syncMode; + + return env.rpc("json", "submit", args.toStyledString()); + }; + + auto jr = applyBlobTxn("sync", noop(alice)); + BEAST_EXPECT(jr[jss::result][jss::engine_result] == "tesSUCCESS"); + + jr = applyBlobTxn("async", noop(alice)); + BEAST_EXPECT(jr[jss::result][jss::engine_result] == "terSUBMITTED"); + // Make sure it gets processed before submitting and waiting. + env.app().getOPs().transactionBatch(true); + + auto applier = [&env]() { + while (!env.app().getOPs().transactionBatch(false)) + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + }; + auto t = std::thread(applier); + + jr = applyBlobTxn("wait", noop(alice)); + BEAST_EXPECT(jr[jss::result][jss::engine_result] == "tesSUCCESS"); + t.join(); + + jr = applyBlobTxn("scott", noop(alice)); + BEAST_EXPECT(jr[jss::result][jss::error] == "invalidParams"); + + // sign and submit + auto applyJsonTxn = [&env]( + char const* syncMode, + std::string const secret, + Json::Value const& val) { + Json::Value args{Json::objectValue}; + args[jss::secret] = secret; + args[jss::tx_json] = val; + args[jss::fail_hard] = true; + args[jss::sync_mode] = syncMode; + + return env.rpc("json", "submit", args.toStyledString()); + }; + + Json::Value payment; + auto secret = toBase58(generateSeed("alice")); + payment = noop("alice"); + payment[sfSequence.fieldName] = env.seq("alice"); + payment[sfSetFlag.fieldName] = 0; + jr = applyJsonTxn("sync", secret, payment); + BEAST_EXPECT(jr[jss::result][jss::engine_result] == "tesSUCCESS"); + + payment[sfSequence.fieldName] = env.seq("alice"); + jr = applyJsonTxn("async", secret, payment); + BEAST_EXPECT(jr[jss::result][jss::engine_result] == "terSUBMITTED"); + + env.app().getOPs().transactionBatch(true); + payment[sfSequence.fieldName] = env.seq("alice"); + + auto aSeq = env.seq("alice"); + t = std::thread(applier); + jr = applyJsonTxn("wait", secret, payment); + BEAST_EXPECT(jr[jss::result][jss::engine_result] == "tesSUCCESS"); + t.join(); + // Ensure the last transaction was processed. + BEAST_EXPECT(env.seq("alice") == aSeq + 1); + + payment[sfSequence.fieldName] = env.seq("alice"); + jr = applyJsonTxn("scott", secret, payment); + BEAST_EXPECT(jr[jss::result][jss::error] == "invalidParams"); + } + void run() override { @@ -925,6 +1015,7 @@ class Env_test : public beast::unit_test::suite testSignAndSubmit(); testFeatures(); testExceptionalShutdown(); + testSyncSubmit(); } }; diff --git a/src/test/rpc/JSONRPC_test.cpp b/src/test/rpc/JSONRPC_test.cpp index a0970bbd746..b6e54967c40 100644 --- a/src/test/rpc/JSONRPC_test.cpp +++ b/src/test/rpc/JSONRPC_test.cpp @@ -19,6 +19,7 @@ #include #include +#include #include #include #include @@ -2384,7 +2385,7 @@ class JSONRPC_test : public beast::unit_test::suite fakeProcessTransaction( std::shared_ptr&, bool, - bool, + SubmitSync, NetworkOPs::FailHard) { ; @@ -2432,7 +2433,8 @@ class JSONRPC_test : public beast::unit_test::suite Role role, std::chrono::seconds validatedLedgerAge, Application & app, - ProcessTransactionFn const& processTransaction); + ProcessTransactionFn const& processTransaction, + RPC::SubmitSync sync); using TestStuff = std::tuple; @@ -2485,7 +2487,8 @@ class JSONRPC_test : public beast::unit_test::suite testRole, 1s, env.app(), - processTxn); + processTxn, + RPC::SubmitSync::sync); } std::string errStr; diff --git a/src/test/rpc/RobustTransaction_test.cpp b/src/test/rpc/RobustTransaction_test.cpp index 37b16c58d7f..01ac71e272a 100644 --- a/src/test/rpc/RobustTransaction_test.cpp +++ b/src/test/rpc/RobustTransaction_test.cpp @@ -17,6 +17,7 @@ */ //============================================================================== +#include #include #include #include @@ -88,7 +89,8 @@ class RobustTransaction_test : public beast::unit_test::suite } BEAST_EXPECT(jv[jss::result][jss::engine_result] == "tefPAST_SEQ"); - // Submit future sequence transaction + // Submit future sequence transaction -- this transaction should be + // held until the sequence gap is closed. payment[jss::tx_json][sfSequence.fieldName] = env.seq("alice") + 1; jv = wsc->invoke("submit", payment); if (wsc->version() == 2) @@ -114,6 +116,8 @@ class RobustTransaction_test : public beast::unit_test::suite } BEAST_EXPECT(jv[jss::result][jss::engine_result] == "tesSUCCESS"); + // Apply held transactions. + env.app().getOPs().transactionBatch(true); // Wait for the jobqueue to process everything env.app().getJobQueue().rendezvous();