diff --git a/src/simulation/CoreTests.cpp b/src/simulation/CoreTests.cpp index 7ff11b8ecb..d0755b5d58 100644 --- a/src/simulation/CoreTests.cpp +++ b/src/simulation/CoreTests.cpp @@ -337,7 +337,8 @@ TEST_CASE( auto nodes = simulation->getNodes(); auto& app = *nodes[0]; // pick a node to generate load - app.getLoadGenerator().generateLoad(true, 3, 0, 0, 10, 100); + auto& lg = app.getLoadGenerator(); + lg.generateLoad(true, 3, 0, 0, 10, 100); try { simulation->crankUntil( @@ -346,21 +347,21 @@ TEST_CASE( // to the second node in time and the second node gets the // nomination return simulation->haveAllExternalized(5, 2) && - simulation->accountsOutOfSyncWithDb(app).empty(); + lg.checkAccountSynced(app, true).empty(); }, 3 * Herder::EXP_LEDGER_TIMESPAN_SECONDS, false); - app.getLoadGenerator().generateLoad(false, 3, 0, 10, 10, 100); + lg.generateLoad(false, 3, 0, 10, 10, 100); simulation->crankUntil( [&]() { return simulation->haveAllExternalized(8, 2) && - simulation->accountsOutOfSyncWithDb(app).empty(); + lg.checkAccountSynced(app, false).empty(); }, 2 * Herder::EXP_LEDGER_TIMESPAN_SECONDS, true); } catch (...) { - auto problems = simulation->accountsOutOfSyncWithDb(app); + auto problems = lg.checkAccountSynced(app, false); REQUIRE(problems.empty()); } @@ -510,14 +511,15 @@ netTopologyTest(std::string const& name, assert(!nodes.empty()); auto& app = *nodes[0]; - app.getLoadGenerator().generateLoad(true, 50, 0, 0, 10, 100); + auto& lg = app.getLoadGenerator(); + lg.generateLoad(true, 50, 0, 0, 10, 100); auto& complete = app.getMetrics().NewMeter({"loadgen", "run", "complete"}, "run"); sim->crankUntil( [&]() { return sim->haveAllExternalized(8, 2) && - sim->accountsOutOfSyncWithDb(app).empty() && + lg.checkAccountSynced(app, true).empty() && complete.count() == 1; }, 2 * Herder::EXP_LEDGER_TIMESPAN_SECONDS, true); diff --git a/src/simulation/LoadGenerator.cpp b/src/simulation/LoadGenerator.cpp index 1498818323..78a77c3951 100644 --- a/src/simulation/LoadGenerator.cpp +++ b/src/simulation/LoadGenerator.cpp @@ -36,22 +36,30 @@ namespace stellar using namespace std; using namespace txtest; -// Units of load are is scheduled at 100ms intervals. +// Units of load are scheduled at 100ms intervals. const uint32_t LoadGenerator::STEP_MSECS = 100; -// -const uint32_t LoadGenerator::TX_SUBMIT_MAX_TRIES = 1000; + +// If submission fails with txBAD_SEQ, attempt refreshing the account or +// re-submitting a new payment +const uint32_t LoadGenerator::TX_SUBMIT_MAX_TRIES = 10; + +// After successfully submitting desired load, wait a bit to let it get into the +// ledger. +const uint32_t LoadGenerator::TIMEOUT_NUM_LEDGERS = 20; LoadGenerator::LoadGenerator(Application& app) - : mMinBalance(0), mLastSecond(0), mApp(app), mTotalSubmitted(0) + : mMinBalance(0) + , mLastSecond(0) + , mApp(app) + , mTotalSubmitted(0) + , mLoadgenComplete( + mApp.getMetrics().NewMeter({"loadgen", "run", "complete"}, "run")) + , mLoadgenFail( + mApp.getMetrics().NewMeter({"loadgen", "run", "failed"}, "run")) { createRootAccount(); } -LoadGenerator::~LoadGenerator() -{ - clear(); -} - void LoadGenerator::createRootAccount() { @@ -92,12 +100,14 @@ LoadGenerator::getTxPerStep(uint32_t txRate) } void -LoadGenerator::clear() +LoadGenerator::reset() { mAccounts.clear(); mRoot.reset(); mStartTime.reset(); mTotalSubmitted = 0; + mWaitTillCompleteForLedgers = 0; + mFailed = false; } // Schedule a callback to generateLoad() STEP_MSECS miliseconds from now. @@ -106,6 +116,18 @@ LoadGenerator::scheduleLoadGeneration(bool isCreate, uint32_t nAccounts, uint32_t offset, uint32_t nTxs, uint32_t txRate, uint32_t batchSize) { + // If previously scheduled step of load did not succeed, fail this loadgen + // run. + if (mFailed) + { + CLOG(ERROR, "LoadGen") << "Load generation failed, ensure correct " + "number parameters are set and accounts are " + "created, or retry with smaller tx rate."; + mLoadgenFail.Mark(); + reset(); + return; + } + if (!mLoadTimer) { mLoadTimer = std::make_unique(mApp.getClock()); @@ -156,7 +178,7 @@ LoadGenerator::generateLoad(bool isCreate, uint32_t nAccounts, uint32_t offset, if ((isCreate && nAccounts == 0) || (!isCreate && nTxs == 0)) { // Done submitting the load, now ensure it propagates to the DB. - waitTillComplete(); + waitTillComplete(isCreate); return; } @@ -229,18 +251,23 @@ LoadGenerator::submitCreationTx(uint32_t nAccounts, uint32_t offset, while ((status = tx.execute(mApp, true, code, batchSize)) != TransactionQueue::AddResult::ADD_STATUS_PENDING) { - handleFailedSubmission(tx.mFrom, status, code); // Update seq num + // Ignore duplicate transactions, simply continue generating load if (status == TransactionQueue::AddResult::ADD_STATUS_DUPLICATE) { createDuplicate = true; break; } - if (++numTries >= TX_SUBMIT_MAX_TRIES) + + if (++numTries >= TX_SUBMIT_MAX_TRIES || + status != TransactionQueue::AddResult::ADD_STATUS_ERROR) { - CLOG(ERROR, "LoadGen") << "Error creating account!"; - clear(); + // Failed to submit the step of load + mFailed = true; return 0; } + + // In case of bad seqnum, attempt refreshing it from the DB + maybeHandleFailedTx(tx.mFrom, status, code); } if (!createDuplicate) @@ -267,16 +294,18 @@ LoadGenerator::submitPaymentTx(uint32_t nAccounts, uint32_t offset, while ((status = tx.execute(mApp, false, code, batchSize)) != TransactionQueue::AddResult::ADD_STATUS_PENDING) { - handleFailedSubmission(tx.mFrom, status, code); // Update seq num - tx = paymentTransaction(nAccounts, offset, ledgerNum, - sourceAccountId); // re-generate the tx - if (++numTries >= TX_SUBMIT_MAX_TRIES) + if (++numTries >= TX_SUBMIT_MAX_TRIES || + status != TransactionQueue::AddResult::ADD_STATUS_ERROR) { - CLOG(ERROR, "LoadGen") << "Error submitting tx: did you specify " - "correct number of accounts and offset?"; - clear(); + mFailed = true; return 0; } + + // In case of bad seqnum, attempt refreshing it from the DB + maybeHandleFailedTx(tx.mFrom, status, code); // Update seq num + + // Regenerate a new payment tx + tx = paymentTransaction(nAccounts, offset, ledgerNum, sourceAccountId); } nTxs -= 1; @@ -409,13 +438,13 @@ LoadGenerator::findAccount(uint64_t accountId, uint32_t ledgerNum) if (res == mAccounts.end()) { SequenceNumber sn = static_cast(ledgerNum) << 32; - auto name = "TestAccount-" + to_string(accountId); - auto account = TestAccount{mApp, txtest::getAccount(name.c_str()), sn}; - newAccountPtr = make_shared(account); + auto name = "TestAccount-" + std::to_string(accountId); + newAccountPtr = + std::make_shared(mApp, txtest::getAccount(name), sn); if (!loadAccount(newAccountPtr, mApp)) { - std::runtime_error( + throw std::runtime_error( fmt::format("Account {0} must exist in the DB.", accountId)); } mAccounts.insert( @@ -445,9 +474,9 @@ LoadGenerator::paymentTransaction(uint32_t numAccounts, uint32_t offset, } void -LoadGenerator::handleFailedSubmission(TestAccountPtr sourceAccount, - TransactionQueue::AddResult status, - TransactionResultCode code) +LoadGenerator::maybeHandleFailedTx(TestAccountPtr sourceAccount, + TransactionQueue::AddResult status, + TransactionResultCode code) { // Note that if transaction is a DUPLICATE, its sequence number is // incremented on the next call to execute. @@ -463,21 +492,43 @@ LoadGenerator::handleFailedSubmission(TestAccountPtr sourceAccount, } std::vector -LoadGenerator::checkAccountSynced(Application& app) +LoadGenerator::checkAccountSynced(Application& app, bool isCreate) { std::vector result; for (auto const& acc : mAccounts) { TestAccountPtr account = acc.second; - auto currentSeqNum = account->getLastSequenceNumber(); - auto reloadRes = loadAccount(account, app); - // reload the account - if (!reloadRes || currentSeqNum != account->getLastSequenceNumber()) + auto accountFromDB = *account; + + auto reloadRes = loadAccount(accountFromDB, app); + // For account creation, reload accounts from the DB + // For payments, ensure that the sequence number matches expected + // seqnum. Timeout after 20 ledgers. + if (isCreate) + { + if (!reloadRes) + { + CLOG(TRACE, "LoadGen") << "Account " << account->getAccountId() + << " is not created yet!"; + result.push_back(account); + } + } + else if (!reloadRes) + { + auto msg = + fmt::format("Account {} used to submit payment tx could not " + "load, DB might be in a corrupted state", + account->getAccountId()); + throw std::runtime_error(msg); + } + else if (account->getLastSequenceNumber() != + accountFromDB.getLastSequenceNumber()) { - CLOG(DEBUG, "LoadGen") + CLOG(TRACE, "LoadGen") << "Account " << account->getAccountId() - << " is at sequence num " << currentSeqNum - << ", but the DB is at " << account->getLastSequenceNumber(); + << " is at sequence num " << account->getLastSequenceNumber() + << ", but the DB is at " + << accountFromDB.getLastSequenceNumber(); result.push_back(account); } } @@ -485,30 +536,37 @@ LoadGenerator::checkAccountSynced(Application& app) } void -LoadGenerator::waitTillComplete() +LoadGenerator::waitTillComplete(bool isCreate) { if (!mLoadTimer) { mLoadTimer = std::make_unique(mApp.getClock()); } vector inconsistencies; - inconsistencies = checkAccountSynced(mApp); + inconsistencies = checkAccountSynced(mApp, isCreate); if (inconsistencies.empty()) { CLOG(INFO, "LoadGen") << "Load generation complete."; - mApp.getMetrics() - .NewMeter({"loadgen", "run", "complete"}, "run") - .Mark(); - clear(); + mLoadgenComplete.Mark(); + reset(); return; } else { + if (++mWaitTillCompleteForLedgers >= TIMEOUT_NUM_LEDGERS) + { + CLOG(INFO, "LoadGen") << "Load generation failed."; + mLoadgenFail.Mark(); + reset(); + return; + } + mLoadTimer->expires_from_now( mApp.getConfig().getExpectedLedgerCloseTime()); - mLoadTimer->async_wait([this]() { this->waitTillComplete(); }, - &VirtualTimer::onFailureNoop); + mLoadTimer->async_wait( + [this, isCreate]() { this->waitTillComplete(isCreate); }, + &VirtualTimer::onFailureNoop); } } diff --git a/src/simulation/LoadGenerator.h b/src/simulation/LoadGenerator.h index 3d2fa24c78..87c542dc15 100644 --- a/src/simulation/LoadGenerator.h +++ b/src/simulation/LoadGenerator.h @@ -29,20 +29,76 @@ class VirtualTimer; class LoadGenerator { public: + using TestAccountPtr = std::shared_ptr; LoadGenerator(Application& app); - ~LoadGenerator(); - void clear(); - struct TxInfo; - using TestAccountPtr = std::shared_ptr; + // Generate one "step" worth of load (assuming 1 step per STEP_MSECS) at a + // given target number of accounts and txs, and a given target tx/s rate. + // If work remains after the current step, call scheduleLoadGeneration() + // with the remainder. + void generateLoad(bool isCreate, uint32_t nAccounts, uint32_t offset, + uint32_t nTxs, uint32_t txRate, uint32_t batchSize); + + // Verify cached accounts are properly reflected in the database + // return any accounts that are inconsistent. + std::vector checkAccountSynced(Application& app, + bool isCreate); + + private: + struct TxMetrics + { + medida::Meter& mAccountCreated; + medida::Meter& mNativePayment; + medida::Meter& mTxnAttempted; + medida::Meter& mTxnRejected; + medida::Meter& mTxnBytes; + + TxMetrics(medida::MetricsRegistry& m); + void report(); + }; + + struct TxInfo + { + TestAccountPtr mFrom; + std::vector mOps; + // There are a few scenarios where tx submission might fail: + // * ADD_STATUS_DUPLICATE, should be just a no-op and not count toward + // total tx goal. + // * ADD_STATUS_TRY_AGAIN_LATER, transaction is banned/dropped. This + // indicates that the system is getting overloaded, so loadgen fails. + // * ADD_STATUS_ERROR, transaction didn't pass validation checks. If + // failure is due to txBAD_SEQ, synchronize accounts with the DB and + // re-submit. Any other code points to a loadgen misconfigurations, as + // transactions must have valid (pre-generated) source accounts, + // sufficient balances etc. + TransactionQueue::AddResult execute(Application& app, bool isCreate, + TransactionResultCode& code, + int32_t batchSize); + }; static const uint32_t STEP_MSECS; static const uint32_t TX_SUBMIT_MAX_TRIES; + static const uint32_t TIMEOUT_NUM_LEDGERS; std::unique_ptr mLoadTimer; int64 mMinBalance; uint64_t mLastSecond; + Application& mApp; + int64_t mTotalSubmitted; + // Set when load generation actually begins + std::unique_ptr mStartTime; + + TestAccountPtr mRoot; + // Accounts cache + std::map mAccounts; + + medida::Meter& mLoadgenComplete; + medida::Meter& mLoadgenFail; + bool mFailed{false}; + int mWaitTillCompleteForLedgers{0}; + + void reset(); void createRootAccount(); int64_t getTxPerStep(uint32_t txRate); @@ -51,13 +107,6 @@ class LoadGenerator uint32_t offset, uint32_t nTxs, uint32_t txRate, uint32_t batchSize); - // Generate one "step" worth of load (assuming 1 step per STEP_MSECS) at a - // given target number of accounts and txs, and a given target tx/s rate. - // If work remains after the current step, call scheduleLoadGeneration() - // with the remainder. - void generateLoad(bool isCreate, uint32_t nAccounts, uint32_t offset, - uint32_t nTxs, uint32_t txRate, uint32_t batchSize); - std::vector createAccounts(uint64_t i, uint64_t batchSize, uint32_t ledgerNum); bool loadAccount(TestAccount& account, Application& app); @@ -71,12 +120,11 @@ class LoadGenerator uint32_t offset, uint32_t ledgerNum, uint64_t sourceAccount); - void handleFailedSubmission(TestAccountPtr sourceAccount, - TransactionQueue::AddResult status, - TransactionResultCode code); + void maybeHandleFailedTx(TestAccountPtr sourceAccount, + TransactionQueue::AddResult status, + TransactionResultCode code); TxInfo creationTransaction(uint64_t startAccount, uint64_t numItems, uint32_t ledgerNum); - std::vector checkAccountSynced(Application& app); void logProgress(std::chrono::nanoseconds submitTimer, bool isCreate, uint32_t nAccounts, uint32_t nTxs, uint32_t batchSize, uint32_t txRate); @@ -86,39 +134,8 @@ class LoadGenerator uint32_t submitPaymentTx(uint32_t nAccounts, uint32_t offset, uint32_t batchSize, uint32_t ledgerNum, uint32_t nTxs); + void waitTillComplete(bool isCreate); void updateMinBalance(); - void waitTillComplete(); - - struct TxMetrics - { - medida::Meter& mAccountCreated; - medida::Meter& mNativePayment; - medida::Meter& mTxnAttempted; - medida::Meter& mTxnRejected; - medida::Meter& mTxnBytes; - - TxMetrics(medida::MetricsRegistry& m); - void report(); - }; - - struct TxInfo - { - TestAccountPtr mFrom; - std::vector mOps; - TransactionQueue::AddResult execute(Application& app, bool isCreate, - TransactionResultCode& code, - int32_t batchSize); - }; - - protected: - Application& mApp; - int64_t mTotalSubmitted; - // Set when load generation actually begins - std::unique_ptr mStartTime; - - TestAccountPtr mRoot; - // Accounts cache - std::map mAccounts; }; } diff --git a/src/simulation/Simulation.cpp b/src/simulation/Simulation.cpp index 3a38f501f5..f5a8cdb581 100644 --- a/src/simulation/Simulation.cpp +++ b/src/simulation/Simulation.cpp @@ -279,7 +279,6 @@ Simulation::startAllNodes() if (app->getState() == Application::APP_CREATED_STATE) { app->start(); - app->getLoadGenerator().updateMinBalance(); } } @@ -516,14 +515,6 @@ Simulation::crankForAtLeast(VirtualClock::duration seconds, bool finalCrank) } } -void -Simulation::crankUntilSync(Application& app, VirtualClock::duration timeout, - bool finalCrank) -{ - crankUntil([&]() { return this->accountsOutOfSyncWithDb(app).empty(); }, - timeout, finalCrank); -} - void Simulation::crankUntil(function const& predicate, VirtualClock::duration timeout, bool finalCrank) @@ -605,32 +596,6 @@ Simulation::crankUntil(VirtualClock::time_point timePoint, bool finalCrank) } } -vector -Simulation::accountsOutOfSyncWithDb(Application& mainApp) -{ - vector result; - int iApp = 0; - - for (auto const& p : mNodes) - { - iApp++; - vector res; - auto app = p.second.mApp; - res = mainApp.getLoadGenerator().checkAccountSynced(*app); - if (!res.empty()) - { - LOG(DEBUG) << "On node " << iApp - << " some accounts are not in sync."; - } - else - { - result.insert(result.end(), res.begin(), res.end()); - } - } - LOG(INFO) << "Ledger has not yet caught up to the simulation."; - return result; -} - Config Simulation::newConfig() { diff --git a/src/simulation/Simulation.h b/src/simulation/Simulation.h index be73416719..4520888a94 100644 --- a/src/simulation/Simulation.h +++ b/src/simulation/Simulation.h @@ -67,13 +67,9 @@ class Simulation size_t crankAllNodes(int nbTicks = 1); void crankForAtMost(VirtualClock::duration seconds, bool finalCrank); void crankForAtLeast(VirtualClock::duration seconds, bool finalCrank); - void crankUntilSync(Application& app, VirtualClock::duration timeout, - bool finalCrank); void crankUntil(std::function const& fn, VirtualClock::duration timeout, bool finalCrank); void crankUntil(VirtualClock::time_point timePoint, bool finalCrank); - std::vector accountsOutOfSyncWithDb( - Application& mainApp); // returns the accounts that don't match std::string metricsSummary(std::string domain = ""); void addConnection(NodeID initiator, NodeID acceptor);