diff --git a/docs/stellar-core_example.cfg b/docs/stellar-core_example.cfg index 4a2f50989f..e3595692fd 100644 --- a/docs/stellar-core_example.cfg +++ b/docs/stellar-core_example.cfg @@ -530,11 +530,6 @@ PEER_FLOOD_READING_CAPACITY_BYTES=300000 # processes `FLOW_CONTROL_SEND_MORE_BATCH_SIZE_BYTES` bytes FLOW_CONTROL_SEND_MORE_BATCH_SIZE_BYTES=100000 -# Enable flow control in bytes. This config allows core to process large -# transactions on the network more efficiently and apply back pressure if -# needed. -ENABLE_FLOW_CONTROL_BYTES=true - # Byte limit for outbound transaction queue. OUTBOUND_TX_QUEUE_BYTE_LIMIT=3145728 diff --git a/src/herder/test/HerderTests.cpp b/src/herder/test/HerderTests.cpp index b0c37e1b13..809e97850e 100644 --- a/src/herder/test/HerderTests.cpp +++ b/src/herder/test/HerderTests.cpp @@ -3832,6 +3832,9 @@ herderExternalizesValuesWithProtocol(uint32_t version) Simulation::OVER_LOOPBACK, networkID, [version](int i) { auto cfg = getTestConfig(i, Config::TESTDB_ON_DISK_SQLITE); cfg.TESTING_UPGRADE_LEDGER_PROTOCOL_VERSION = version; + // Let OverlayManagerImpl compute flow control bytes values + cfg.PEER_FLOOD_READING_CAPACITY_BYTES = 0; + cfg.FLOW_CONTROL_SEND_MORE_BATCH_SIZE_BYTES = 0; return cfg; }); diff --git a/src/history/test/HistoryTestsUtils.cpp b/src/history/test/HistoryTestsUtils.cpp index 87889f97f6..fd363b488f 100644 --- a/src/history/test/HistoryTestsUtils.cpp +++ b/src/history/test/HistoryTestsUtils.cpp @@ -65,6 +65,11 @@ TmpDirHistoryConfigurator::configure(Config& cfg, bool writable) const cfg.HISTORY[d] = HistoryArchiveConfiguration{d, getCmd, putCmd, mkdirCmd}; cfg.TESTING_SOROBAN_HIGH_LIMIT_OVERRIDE = true; + + // Let OverlayManagerImpl compute flow control bytes values + cfg.PEER_FLOOD_READING_CAPACITY_BYTES = 0; + cfg.FLOW_CONTROL_SEND_MORE_BATCH_SIZE_BYTES = 0; + return cfg; } diff --git a/src/main/Config.cpp b/src/main/Config.cpp index 8e96360365..5e694914fa 100644 --- a/src/main/Config.cpp +++ b/src/main/Config.cpp @@ -151,8 +151,8 @@ Config::Config() : NODE_SEED(SecretKey::random()) LEDGER_PROTOCOL_VERSION = CURRENT_LEDGER_PROTOCOL_VERSION; LEDGER_PROTOCOL_MIN_VERSION_INTERNAL_ERROR_REPORT = 18; - OVERLAY_PROTOCOL_MIN_VERSION = 32; - OVERLAY_PROTOCOL_VERSION = 34; + OVERLAY_PROTOCOL_MIN_VERSION = 33; + OVERLAY_PROTOCOL_VERSION = 35; VERSION_STR = STELLAR_CORE_VERSION; @@ -261,7 +261,6 @@ Config::Config() : NODE_SEED(SecretKey::random()) PEER_FLOOD_READING_CAPACITY_BYTES = 0; FLOW_CONTROL_SEND_MORE_BATCH_SIZE_BYTES = 0; OUTBOUND_TX_QUEUE_BYTE_LIMIT = 1024 * 1024 * 3; - ENABLE_FLOW_CONTROL_BYTES = true; // WORKER_THREADS: setting this too low risks a form of priority inversion // where a long-running background task occupies all worker threads and @@ -1031,10 +1030,6 @@ Config::processConfig(std::shared_ptr t) FLOW_CONTROL_SEND_MORE_BATCH_SIZE_BYTES = readInt(item, 1); } - else if (item.first == "ENABLE_FLOW_CONTROL_BYTES") - { - ENABLE_FLOW_CONTROL_BYTES = readBool(item); - } else if (item.first == "OUTBOUND_TX_QUEUE_BYTE_LIMIT") { OUTBOUND_TX_QUEUE_BYTE_LIMIT = readInt(item, 1); diff --git a/src/main/Config.h b/src/main/Config.h index 6c334bab6a..a43115181c 100644 --- a/src/main/Config.h +++ b/src/main/Config.h @@ -324,11 +324,6 @@ class Config : public std::enable_shared_from_this uint32_t PEER_FLOOD_READING_CAPACITY_BYTES; uint32_t FLOW_CONTROL_SEND_MORE_BATCH_SIZE_BYTES; - // Enable flow control in bytes. This config allows core to process large - // transactions on the network more efficiently and apply back pressure if - // needed. - bool ENABLE_FLOW_CONTROL_BYTES; - // Byte limit for outbound transaction queue. uint32_t OUTBOUND_TX_QUEUE_BYTE_LIMIT; diff --git a/src/overlay/FlowControl.cpp b/src/overlay/FlowControl.cpp index f60d2bf3e7..0960259ee2 100644 --- a/src/overlay/FlowControl.cpp +++ b/src/overlay/FlowControl.cpp @@ -33,8 +33,10 @@ FlowControl::getOutboundQueueByteLimit( FlowControl::FlowControl(OverlayAppConnector& connector, bool useBackgroundThread) - : mFlowControlCapacity(std::make_shared( - connector.getConfig(), mNodeID)) + : mFlowControlCapacity(connector.getConfig(), mNodeID) + , mFlowControlBytesCapacity( + connector.getConfig(), mNodeID, + connector.getOverlayManager().getFlowControlBytesConfig().mTotal) , mOverlayMetrics(connector.getOverlayManager().getOverlayMetrics()) , mAppConnector(connector) , mUseBackgroundThread(useBackgroundThread) @@ -49,24 +51,16 @@ FlowControl::hasOutboundCapacity(StellarMessage const& msg, std::lock_guard& lockGuard) const { releaseAssert(!threadIsMain() || !mUseBackgroundThread); - releaseAssert(mFlowControlCapacity); - return mFlowControlCapacity->hasOutboundCapacity(msg) && - (!mFlowControlBytesCapacity || - mFlowControlBytesCapacity->hasOutboundCapacity(msg)); + return mFlowControlCapacity.hasOutboundCapacity(msg) && + mFlowControlBytesCapacity.hasOutboundCapacity(msg); } void -FlowControl::start(NodeID const& peerID, std::optional enableFCBytes) +FlowControl::setPeerID(NodeID const& peerID) { releaseAssert(threadIsMain()); std::lock_guard guard(mFlowControlMutex); mNodeID = peerID; - - if (enableFCBytes) - { - mFlowControlBytesCapacity = std::make_shared( - mAppConnector.getConfig(), mNodeID, *enableFCBytes); - } } void @@ -76,7 +70,7 @@ FlowControl::maybeReleaseCapacity(StellarMessage const& msg) releaseAssert(threadIsMain()); std::lock_guard guard(mFlowControlMutex); - if (msg.type() == SEND_MORE || msg.type() == SEND_MORE_EXTENDED) + if (msg.type() == SEND_MORE_EXTENDED) { if (mNoOutboundCapacity) { @@ -85,11 +79,8 @@ FlowControl::maybeReleaseCapacity(StellarMessage const& msg) } mNoOutboundCapacity.reset(); - mFlowControlCapacity->releaseOutboundCapacity(msg); - if (mFlowControlBytesCapacity) - { - mFlowControlBytesCapacity->releaseOutboundCapacity(msg); - } + mFlowControlCapacity.releaseOutboundCapacity(msg); + mFlowControlBytesCapacity.releaseOutboundCapacity(msg); CLOG_TRACE(Overlay, "{}: Peer {} sent {} ({} messages, {} bytes)", mAppConnector.getConfig().toShortString( @@ -97,9 +88,7 @@ FlowControl::maybeReleaseCapacity(StellarMessage const& msg) mAppConnector.getConfig().toShortString(mNodeID), xdr::xdr_traits::enum_name(msg.type()), getNumMessages(msg), - mFlowControlBytesCapacity - ? std::to_string(msg.sendMoreExtendedMessage().numBytes) - : "N/A"); + std::to_string(msg.sendMoreExtendedMessage().numBytes)); } } @@ -137,26 +126,17 @@ FlowControl::getNextBatchToSend() batchToSend.push_back(front); ++sent; - auto& om = mOverlayMetrics; - auto const& diff = mAppConnector.now() - front.mTimeEmplaced; - mFlowControlCapacity->lockOutboundCapacity(msg); - if (mFlowControlBytesCapacity) - { - mFlowControlBytesCapacity->lockOutboundCapacity(msg); - } + mFlowControlCapacity.lockOutboundCapacity(msg); + mFlowControlBytesCapacity.lockOutboundCapacity(msg); switch (front.mMessage->type()) { case TRANSACTION: { - if (mFlowControlBytesCapacity) - { - size_t s = - mFlowControlBytesCapacity->getMsgResourceCount(msg); - releaseAssert(mTxQueueByteCount >= s); - mTxQueueByteCount -= s; - } + size_t s = mFlowControlBytesCapacity.getMsgResourceCount(msg); + releaseAssert(mTxQueueByteCount >= s); + mTxQueueByteCount -= s; } break; case SCP_MESSAGE: @@ -233,12 +213,9 @@ FlowControl::handleTxSizeIncrease(uint32_t increase) ZoneScoped; releaseAssert(threadIsMain()); std::lock_guard guard(mFlowControlMutex); - if (mFlowControlBytesCapacity) - { - releaseAssert(increase > 0); - // Bump flood capacity to accommodate the upgrade - mFlowControlBytesCapacity->handleTxSizeIncrease(increase); - } + releaseAssert(increase > 0); + // Bump flood capacity to accommodate the upgrade + mFlowControlBytesCapacity.handleTxSizeIncrease(increase); } bool @@ -248,9 +225,8 @@ FlowControl::beginMessageProcessing(StellarMessage const& msg) releaseAssert(!threadIsMain() || !mUseBackgroundThread); std::lock_guard guard(mFlowControlMutex); - return mFlowControlCapacity->lockLocalCapacity(msg) && - (!mFlowControlBytesCapacity || - mFlowControlBytesCapacity->lockLocalCapacity(msg)); + return mFlowControlCapacity.lockLocalCapacity(msg) && + mFlowControlBytesCapacity.lockLocalCapacity(msg); } SendMoreCapacity @@ -260,36 +236,27 @@ FlowControl::endMessageProcessing(StellarMessage const& msg) releaseAssert(threadIsMain()); std::lock_guard guard(mFlowControlMutex); - mFloodDataProcessed += mFlowControlCapacity->releaseLocalCapacity(msg); - if (mFlowControlBytesCapacity) - { - mFloodDataProcessedBytes += - mFlowControlBytesCapacity->releaseLocalCapacity(msg); - } + mFloodDataProcessed += mFlowControlCapacity.releaseLocalCapacity(msg); + mFloodDataProcessedBytes += + mFlowControlBytesCapacity.releaseLocalCapacity(msg); releaseAssert(mFloodDataProcessed <= mAppConnector.getConfig().FLOW_CONTROL_SEND_MORE_BATCH_SIZE); bool shouldSendMore = mFloodDataProcessed == mAppConnector.getConfig().FLOW_CONTROL_SEND_MORE_BATCH_SIZE; - if (mFlowControlBytesCapacity) - { - auto const byteBatchSize = mAppConnector.getOverlayManager() - .getFlowControlBytesConfig() - .mBatchSize; - shouldSendMore = - shouldSendMore || mFloodDataProcessedBytes >= byteBatchSize; - } + auto const byteBatchSize = mAppConnector.getOverlayManager() + .getFlowControlBytesConfig() + .mBatchSize; + shouldSendMore = + shouldSendMore || mFloodDataProcessedBytes >= byteBatchSize; - SendMoreCapacity res{0, std::nullopt}; + SendMoreCapacity res{0, 0}; if (shouldSendMore) { // First save result to return res.first = mFloodDataProcessed; - if (mFlowControlBytesCapacity) - { - res.second = mFloodDataProcessedBytes; - } + res.second = mFloodDataProcessedBytes; // Reset counters mFloodDataProcessed = 0; @@ -302,9 +269,8 @@ FlowControl::endMessageProcessing(StellarMessage const& msg) bool FlowControl::canRead(std::lock_guard const& guard) const { - bool canReadBytes = - !mFlowControlBytesCapacity || mFlowControlBytesCapacity->canRead(); - return canReadBytes && mFlowControlCapacity->canRead(); + return mFlowControlBytesCapacity.canRead() && + mFlowControlCapacity.canRead(); } bool @@ -317,8 +283,8 @@ FlowControl::canRead() const uint32_t FlowControl::getNumMessages(StellarMessage const& msg) { - return msg.type() == SEND_MORE ? msg.sendMoreMessage().numMessages - : msg.sendMoreExtendedMessage().numMessages; + releaseAssert(msg.type() == SEND_MORE_EXTENDED); + return msg.sendMoreExtendedMessage().numMessages; } bool @@ -328,11 +294,7 @@ FlowControl::isSendMoreValid(StellarMessage const& msg, releaseAssert(threadIsMain()); std::lock_guard guard(mFlowControlMutex); - bool sendMoreExtendedType = - mFlowControlBytesCapacity && msg.type() == SEND_MORE_EXTENDED; - bool sendMoreType = !mFlowControlBytesCapacity && msg.type() == SEND_MORE; - - if (!sendMoreExtendedType && !sendMoreType) + if (msg.type() != SEND_MORE_EXTENDED) { errorMsg = fmt::format("unexpected message type {}", @@ -343,9 +305,7 @@ FlowControl::isSendMoreValid(StellarMessage const& msg, // If flow control in bytes isn't enabled, SEND_MORE must have non-zero // messages. If flow control in bytes is enabled, SEND_MORE_EXTENDED must // have non-zero bytes, but _can_ have 0 messages to support upgrades - if ((!mFlowControlBytesCapacity && getNumMessages(msg) == 0) || - (mFlowControlBytesCapacity && - msg.sendMoreExtendedMessage().numBytes == 0)) + if (msg.sendMoreExtendedMessage().numBytes == 0) { errorMsg = fmt::format("invalid message {}", @@ -353,15 +313,11 @@ FlowControl::isSendMoreValid(StellarMessage const& msg, return false; } - auto overflow = getNumMessages(msg) > - (UINT64_MAX - mFlowControlCapacity->getOutboundCapacity()); - if (mFlowControlBytesCapacity) - { - overflow = - overflow || - (msg.sendMoreExtendedMessage().numBytes > - (UINT64_MAX - mFlowControlBytesCapacity->getOutboundCapacity())); - } + auto overflow = + getNumMessages(msg) > + (UINT64_MAX - mFlowControlCapacity.getOutboundCapacity()) || + msg.sendMoreExtendedMessage().numBytes > + (UINT64_MAX - mFlowControlBytesCapacity.getOutboundCapacity()); if (overflow) { errorMsg = "Peer capacity overflow"; @@ -402,17 +358,14 @@ FlowControl::addMsgAndMaybeTrimQueue(std::shared_ptr msg) case TRANSACTION: { msgQInd = 1; - if (mFlowControlBytesCapacity) + auto bytes = mFlowControlBytesCapacity.getMsgResourceCount(*msg); + // Don't accept transactions that are over allowed byte limit: those + // won't be properly flooded anyways + if (bytes > mAppConnector.getHerder().getMaxTxSize()) { - auto bytes = mFlowControlBytesCapacity->getMsgResourceCount(*msg); - // Don't accept transactions that are over allowed byte limit: those - // won't be properly flooded anyways - if (bytes > mAppConnector.getHerder().getMaxTxSize()) - { - return; - } - mTxQueueByteCount += bytes; + return; } + mTxQueueByteCount += bytes; } break; case FLOOD_DEMAND: @@ -444,12 +397,9 @@ FlowControl::addMsgAndMaybeTrimQueue(std::shared_ptr msg) if (type == TRANSACTION) { auto isOverLimit = [&](auto const& queue) { - bool overLimit = queue.size() > limit; - if (mFlowControlBytesCapacity) - { - overLimit = overLimit || mTxQueueByteCount > - getOutboundQueueByteLimit(guard); - } + bool overLimit = + queue.size() > limit || + mTxQueueByteCount > getOutboundQueueByteLimit(guard); // Time-based purge overLimit = overLimit || @@ -461,13 +411,10 @@ FlowControl::addMsgAndMaybeTrimQueue(std::shared_ptr msg) while (isOverLimit(queue)) { dropped++; - if (mFlowControlBytesCapacity) - { - size_t s = mFlowControlBytesCapacity->getMsgResourceCount( - *(queue.front().mMessage)); - releaseAssert(mTxQueueByteCount >= s); - mTxQueueByteCount -= s; - } + size_t s = mFlowControlBytesCapacity.getMsgResourceCount( + *(queue.front().mMessage)); + releaseAssert(mTxQueueByteCount >= s); + mTxQueueByteCount -= s; om.mOutboundQueueDropTxs.Mark(dropped); queue.pop_front(); } @@ -552,27 +499,24 @@ FlowControl::getFlowControlJsonInfo(bool compact) const std::lock_guard guard(mFlowControlMutex); Json::Value res; - if (mFlowControlCapacity->getCapacity().mTotalCapacity) + if (mFlowControlCapacity.getCapacity().mTotalCapacity) { res["local_capacity"]["reading"] = static_cast( - *(mFlowControlCapacity->getCapacity().mTotalCapacity)); + *(mFlowControlCapacity.getCapacity().mTotalCapacity)); } res["local_capacity"]["flood"] = static_cast( - mFlowControlCapacity->getCapacity().mFloodCapacity); + mFlowControlCapacity.getCapacity().mFloodCapacity); res["peer_capacity"] = - static_cast(mFlowControlCapacity->getOutboundCapacity()); - if (mFlowControlBytesCapacity) + static_cast(mFlowControlCapacity.getOutboundCapacity()); + if (mFlowControlBytesCapacity.getCapacity().mTotalCapacity) { - if (mFlowControlBytesCapacity->getCapacity().mTotalCapacity) - { - res["local_capacity_bytes"]["reading"] = static_cast( - *(mFlowControlBytesCapacity->getCapacity().mTotalCapacity)); - } - res["local_capacity_bytes"]["flood"] = static_cast( - mFlowControlBytesCapacity->getCapacity().mFloodCapacity); - res["peer_capacity_bytes"] = static_cast( - mFlowControlBytesCapacity->getOutboundCapacity()); + res["local_capacity_bytes"]["reading"] = static_cast( + *(mFlowControlBytesCapacity.getCapacity().mTotalCapacity)); } + res["local_capacity_bytes"]["flood"] = static_cast( + mFlowControlBytesCapacity.getCapacity().mFloodCapacity); + res["peer_capacity_bytes"] = static_cast( + mFlowControlBytesCapacity.getOutboundCapacity()); if (!compact) { diff --git a/src/overlay/FlowControl.h b/src/overlay/FlowControl.h index efbe20e287..92676c52b2 100644 --- a/src/overlay/FlowControl.h +++ b/src/overlay/FlowControl.h @@ -16,8 +16,8 @@ namespace stellar class OverlayAppConnector; struct OverlayMetrics; -// num messages, optional bytes if enabled -using SendMoreCapacity = std::pair>; +// num messages, bytes +using SendMoreCapacity = std::pair; // The FlowControl class allows core to throttle flood traffic among its // connections. If a connections wants to use flow control, it should maintain @@ -62,8 +62,8 @@ class FlowControl std::optional mLastThrottle; NodeID mNodeID; - std::shared_ptr mFlowControlCapacity; - std::shared_ptr mFlowControlBytesCapacity; + FlowControlMessageCapacity mFlowControlCapacity; + FlowControlByteCapacity mFlowControlBytesCapacity; OverlayMetrics& mOverlayMetrics; OverlayAppConnector& mAppConnector; @@ -107,14 +107,14 @@ class FlowControl VirtualClock::time_point const& timePlaced); #ifdef BUILD_TESTS - std::shared_ptr - getCapacity() const + FlowControlCapacity& + getCapacity() { return mFlowControlCapacity; } - std::shared_ptr - getCapacityBytes() const + FlowControlCapacity& + getCapacityBytes() { return mFlowControlBytesCapacity; } @@ -175,7 +175,8 @@ class FlowControl Json::Value getFlowControlJsonInfo(bool compact) const; - void start(NodeID const& peerID, std::optional enableFCBytes); + // Stores `peerID` to produce more useful log messages. + void setPeerID(NodeID const& peerID); // Stop reading from this peer until capacity is released bool maybeThrottleRead(); diff --git a/src/overlay/FlowControlCapacity.cpp b/src/overlay/FlowControlCapacity.cpp index 69d2f66bcf..ec86fab896 100644 --- a/src/overlay/FlowControlCapacity.cpp +++ b/src/overlay/FlowControlCapacity.cpp @@ -36,7 +36,7 @@ void FlowControlMessageCapacity::releaseOutboundCapacity(StellarMessage const& msg) { ZoneScoped; - releaseAssert(msg.type() == SEND_MORE || msg.type() == SEND_MORE_EXTENDED); + releaseAssert(msg.type() == SEND_MORE_EXTENDED); auto numMessages = FlowControl::getNumMessages(msg); if (!hasOutboundCapacity(msg) && numMessages != 0) { diff --git a/src/overlay/OverlayManagerImpl.cpp b/src/overlay/OverlayManagerImpl.cpp index 5bbc181901..fe44ea6c6a 100644 --- a/src/overlay/OverlayManagerImpl.cpp +++ b/src/overlay/OverlayManagerImpl.cpp @@ -359,8 +359,6 @@ OverlayManagerImpl::start() OverlayManager::AdjustedFlowControlConfig OverlayManagerImpl::getFlowControlBytesConfig() const { - auto const maxTxSize = mApp.getHerder().getMaxTxSize(); - releaseAssert(maxTxSize > 0); auto const& cfg = mApp.getConfig(); // If flow control parameters weren't provided in the config file, calculate @@ -369,6 +367,8 @@ OverlayManagerImpl::getFlowControlBytesConfig() const if (cfg.PEER_FLOOD_READING_CAPACITY_BYTES == 0 && cfg.FLOW_CONTROL_SEND_MORE_BATCH_SIZE_BYTES == 0) { + auto const maxTxSize = mApp.getHerder().getMaxTxSize(); + releaseAssert(maxTxSize > 0); if (!(INITIAL_PEER_FLOOD_READING_CAPACITY_BYTES - INITIAL_FLOW_CONTROL_SEND_MORE_BATCH_SIZE_BYTES >= maxTxSize)) diff --git a/src/overlay/Peer.cpp b/src/overlay/Peer.cpp index 5a9d4a3b27..a102bebd80 100644 --- a/src/overlay/Peer.cpp +++ b/src/overlay/Peer.cpp @@ -156,14 +156,10 @@ Peer::endMessageProcessing(StellarMessage const& msg) // We may release reading capacity, which gets taken by the background // thread immediately, so we can't assert `canRead` here auto res = mFlowControl->endMessageProcessing(msg); - if (res.second) + if (res.first > 0 || res.second > 0) { sendSendMore(static_cast(res.first), - static_cast(*res.second)); - } - else if (res.first > 0) - { - sendSendMore(static_cast(res.first)); + static_cast(res.second)); } // Now that we've released some capacity, maybe schedule more reads @@ -389,10 +385,7 @@ Peer::sendAuth() ZoneScoped; StellarMessage msg; msg.type(AUTH); - if (mAppConnector.getConfig().ENABLE_FLOW_CONTROL_BYTES) - { - msg.auth().flags = AUTH_MSG_FLAG_FLOW_CONTROL_BYTES_REQUESTED; - } + msg.auth().flags = AUTH_MSG_FLAG_FLOW_CONTROL_BYTES_REQUESTED; auto msgPtr = std::make_shared(msg); sendMessage(msgPtr); } @@ -593,18 +586,6 @@ Peer::sendErrorAndDrop(ErrorCode error, std::string const& message) drop(message, DropDirection::WE_DROPPED_REMOTE); } -void -Peer::sendSendMore(uint32_t numMessages) -{ - ZoneScoped; - releaseAssert(threadIsMain()); - - auto m = std::make_shared(); - m->type(SEND_MORE); - m->sendMoreMessage().numMessages = numMessages; - sendMessage(m); -} - void Peer::sendSendMore(uint32_t numMessages, uint32_t numBytes) { @@ -1597,6 +1578,7 @@ Peer::recvHello(Hello const& elo) mRemoteOverlayVersion = elo.overlayVersion; mRemoteVersion = elo.versionStr; mPeerID = elo.peerID; + mFlowControl->setPeerID(mPeerID); mRecvNonce = elo.nonce; mHmac.setSendMackey(peerAuth.getSendingMacKey(elo.cert.pubkey, mSendNonce, mRecvNonce, mRole)); @@ -1746,35 +1728,19 @@ Peer::recvAuth(StellarMessage const& msg) return; } - bool enableBytes = - (mAppConnector.getConfig().OVERLAY_PROTOCOL_VERSION >= - Peer::FIRST_VERSION_SUPPORTING_FLOW_CONTROL_IN_BYTES && - getRemoteOverlayVersion() >= - Peer::FIRST_VERSION_SUPPORTING_FLOW_CONTROL_IN_BYTES); - bool bothWantBytes = - enableBytes && - msg.auth().flags == AUTH_MSG_FLAG_FLOW_CONTROL_BYTES_REQUESTED && - mAppConnector.getConfig().ENABLE_FLOW_CONTROL_BYTES; - - std::optional fcBytes = - bothWantBytes - ? std::optional(mAppConnector.getOverlayManager() - .getFlowControlBytesConfig() - .mTotal) - : std::nullopt; - mFlowControl->start(mPeerID, fcBytes); + if (msg.auth().flags != AUTH_MSG_FLAG_FLOW_CONTROL_BYTES_REQUESTED) + { + sendErrorAndDrop(ERR_CONF, "flow control bytes disabled"); + return; + } + + uint32_t fcBytes = + mAppConnector.getOverlayManager().getFlowControlBytesConfig().mTotal; // Subtle: after successful auth, must send sendMore message first to // tell the other peer about the local node's reading capacity. - if (fcBytes) - { - sendSendMore(mAppConnector.getConfig().PEER_FLOOD_READING_CAPACITY, - *fcBytes); - } - else - { - sendSendMore(mAppConnector.getConfig().PEER_FLOOD_READING_CAPACITY); - } + sendSendMore(mAppConnector.getConfig().PEER_FLOOD_READING_CAPACITY, + fcBytes); auto weakSelf = std::weak_ptr(shared_from_this()); mTxAdverts->start([weakSelf](std::shared_ptr msg) { diff --git a/src/overlay/Peer.h b/src/overlay/Peer.h index 1271d9ae54..d986bb967d 100644 --- a/src/overlay/Peer.h +++ b/src/overlay/Peer.h @@ -71,8 +71,6 @@ class Peer : public std::enable_shared_from_this, std::chrono::milliseconds(1); static constexpr std::chrono::nanoseconds PEER_METRICS_RATE_UNIT = std::chrono::seconds(1); - static constexpr uint32_t FIRST_VERSION_SUPPORTING_FLOW_CONTROL_IN_BYTES = - 28; static constexpr uint32_t FIRST_VERSION_REQUIRED_FOR_PROTOCOL_20 = 32; // The reporting will be based on the previous @@ -352,7 +350,6 @@ class Peer : public std::enable_shared_from_this, // Queue up an advert to send, return true if the advert was queued, and // false otherwise (if advert is a duplicate, for example) bool sendAdvert(Hash const& txHash); - void sendSendMore(uint32_t numMessages); void sendSendMore(uint32_t numMessages, uint32_t numBytes); virtual void sendMessage(std::shared_ptr msg, diff --git a/src/overlay/test/LoopbackPeer.cpp b/src/overlay/test/LoopbackPeer.cpp index c020021969..cb5f0db21c 100644 --- a/src/overlay/test/LoopbackPeer.cpp +++ b/src/overlay/test/LoopbackPeer.cpp @@ -562,19 +562,11 @@ bool LoopbackPeer::checkCapacity(std::shared_ptr otherPeer) const { // Outbound capacity is equal to the config on the other node - bool flowControlInBytes = getFlowControl()->getCapacityBytes() && - otherPeer->getFlowControl()->getCapacityBytes(); - bool isValid = otherPeer->getConfig().PEER_FLOOD_READING_CAPACITY == - getFlowControl()->getCapacity()->getOutboundCapacity(); - if (flowControlInBytes) - { - isValid = isValid && - (otherPeer->mAppConnector.getOverlayManager() - .getFlowControlBytesConfig() - .mTotal == - getFlowControl()->getCapacityBytes()->getOutboundCapacity()); - } - - return isValid; + return otherPeer->getConfig().PEER_FLOOD_READING_CAPACITY == + getFlowControl()->getCapacity().getOutboundCapacity() && + otherPeer->mAppConnector.getOverlayManager() + .getFlowControlBytesConfig() + .mTotal == + getFlowControl()->getCapacityBytes().getOutboundCapacity(); } } diff --git a/src/overlay/test/LoopbackPeer.h b/src/overlay/test/LoopbackPeer.h index cadf75749f..97490a9da3 100644 --- a/src/overlay/test/LoopbackPeer.h +++ b/src/overlay/test/LoopbackPeer.h @@ -139,7 +139,7 @@ class LoopbackPeer : public Peer uint64_t getOutboundCapacity() { - return getFlowControl()->getCapacity()->getOutboundCapacity(); + return getFlowControl()->getCapacity().getOutboundCapacity(); } Config const& diff --git a/src/overlay/test/OverlayTests.cpp b/src/overlay/test/OverlayTests.cpp index 337759bb19..fb174146d1 100644 --- a/src/overlay/test/OverlayTests.cpp +++ b/src/overlay/test/OverlayTests.cpp @@ -199,7 +199,7 @@ TEST_CASE("flow control byte capacity", "[overlay][flowcontrol]") REQUIRE(conn.getInitiator() ->getFlowControl() ->getCapacityBytes() - ->getOutboundCapacity() == expectedCapacity); + .getOutboundCapacity() == expectedCapacity); REQUIRE(conn.getInitiator()->getTxQueueByteCount() == 0); auto msgTracker = std::make_shared( @@ -208,7 +208,7 @@ TEST_CASE("flow control byte capacity", "[overlay][flowcontrol]") REQUIRE(conn.getAcceptor() ->getFlowControl() ->getCapacityBytes() - ->getCapacity() + .getCapacity() .mFloodCapacity == expectedCapacity); } SECTION("send more flow") @@ -235,7 +235,7 @@ TEST_CASE("flow control byte capacity", "[overlay][flowcontrol]") REQUIRE(conn.getAcceptor() ->getFlowControl() ->getCapacityBytes() - ->getCapacity() + .getCapacity() .mFloodCapacity == cfg2.PEER_FLOOD_READING_CAPACITY_BYTES); if (shouldRequestMore) @@ -247,7 +247,7 @@ TEST_CASE("flow control byte capacity", "[overlay][flowcontrol]") REQUIRE(conn.getInitiator() ->getFlowControl() ->getCapacityBytes() - ->getOutboundCapacity() == + .getOutboundCapacity() == (cfg2.PEER_FLOOD_READING_CAPACITY_BYTES - txSize)); } REQUIRE(conn.getAcceptor()->checkCapacity(conn.getInitiator())); @@ -529,29 +529,6 @@ TEST_CASE("flow control byte capacity", "[overlay][flowcontrol]") } } -void -runWithBothFlowControlModes(std::vector& cfgs, - std::function f) -{ - SECTION("bytes") - { - for (auto& cfg : cfgs) - { - REQUIRE(cfg.OVERLAY_PROTOCOL_VERSION >= - Peer::FIRST_VERSION_SUPPORTING_FLOW_CONTROL_IN_BYTES); - } - f(true); - } - SECTION("message count") - { - for (auto& cfg : cfgs) - { - cfg.ENABLE_FLOW_CONTROL_BYTES = false; - } - f(false); - } -} - TEST_CASE("loopback peer flow control activation", "[overlay][flowcontrol]") { VirtualClock clock; @@ -569,61 +546,30 @@ TEST_CASE("loopback peer flow control activation", "[overlay][flowcontrol]") LoopbackPeerConnection conn(*app1, *app2); testutil::crankSome(clock); - bool const fcInBytes = - cfg1.OVERLAY_PROTOCOL_VERSION >= - Peer::FIRST_VERSION_SUPPORTING_FLOW_CONTROL_IN_BYTES && - cfg2.OVERLAY_PROTOCOL_VERSION >= - Peer::FIRST_VERSION_SUPPORTING_FLOW_CONTROL_IN_BYTES && - cfg2.ENABLE_FLOW_CONTROL_BYTES && cfg1.ENABLE_FLOW_CONTROL_BYTES; - REQUIRE(conn.getInitiator()->isAuthenticatedForTesting()); REQUIRE(conn.getAcceptor()->isAuthenticatedForTesting()); REQUIRE(conn.getInitiator()->checkCapacity(conn.getAcceptor())); REQUIRE(conn.getAcceptor()->checkCapacity(conn.getInitiator())); - if (fcInBytes) - { - REQUIRE(conn.getInitiator()->getFlowControl()->getCapacityBytes()); - REQUIRE(conn.getAcceptor()->getFlowControl()->getCapacityBytes()); - } - else - { - REQUIRE(!conn.getInitiator()->getFlowControl()->getCapacityBytes()); - REQUIRE(!conn.getAcceptor()->getFlowControl()->getCapacityBytes()); - } - // 1. Try sending invalid SEND_MORE with invalid value - // 2. Try sending invalid SEND_MORE message type + // Try sending invalid SEND_MORE with invalid value if (sendIllegalSendMore) { std::string dropReason; SECTION("invalid value in the message") { - // if flow control is enabled, ensure it can't be disabled, + // Flow control is enabled, ensure it can't be disabled, // and the misbehaving peer gets dropped - if (fcInBytes) - { - conn.getAcceptor()->sendSendMore(0, 0); - } - else - { - conn.getAcceptor()->sendSendMore(0); - } - dropReason = fcInBytes ? "invalid message SEND_MORE_EXTENDED" - : "invalid message SEND_MORE"; + conn.getAcceptor()->sendSendMore(0, 0); + dropReason = "invalid message SEND_MORE_EXTENDED"; } SECTION("invalid message type") { - if (fcInBytes) - { - conn.getAcceptor()->sendSendMore(1); - } - else - { - conn.getAcceptor()->sendSendMore(1, 1); - } - dropReason = fcInBytes - ? "unexpected message type SEND_MORE" - : "unexpected message type SEND_MORE_EXTENDED"; + // Manually construct a SEND_MORE message and send it + auto m = std::make_shared(); + m->type(SEND_MORE); + m->sendMoreMessage().numMessages = 1; + conn.getAcceptor()->sendAuthenticatedMessageForTesting(m); + dropReason = "unexpected message type SEND_MORE"; } testutil::crankSome(clock); REQUIRE(!conn.getInitiator()->isConnectedForTesting()); @@ -635,27 +581,17 @@ TEST_CASE("loopback peer flow control activation", "[overlay][flowcontrol]") testutil::shutdownWorkScheduler(*app1); }; - auto test = [&](bool fcBytes) { - SECTION("both enable") - { - SECTION("basic") - { - // Successfully enabled flow control - runTest({cfg1, cfg2}, false); - } - SECTION("bad peer") - { - runTest({cfg1, cfg2}, true); - } - SECTION("one disables") - { - cfg1.ENABLE_FLOW_CONTROL_BYTES = false; - runTest({cfg1, cfg2}, false); - } - } - }; + SECTION("basic") + { + // Flow control without illegal SEND_MORE + runTest({cfg1, cfg2}, false); + } - runWithBothFlowControlModes(cfgs, test); + SECTION("bad peer") + { + // Flow control with illegal SEND_MORE + runTest({cfg1, cfg2}, true); + } } TEST_CASE("drop peers that dont respect capacity", "[overlay][flowcontrol]") @@ -672,60 +608,42 @@ TEST_CASE("drop peers that dont respect capacity", "[overlay][flowcontrol]") getOperationGreaterThanMinMaxSizeBytes()); uint32 txSize = static_cast(xdr::xdr_argpack_size(msg)); - auto test = [&](bool fcBytes) { - if (fcBytes) - { - cfg1.PEER_FLOOD_READING_CAPACITY_BYTES = - txSize + 1 + Herder::FLOW_CONTROL_BYTES_EXTRA_BUFFER; - cfg1.FLOW_CONTROL_SEND_MORE_BATCH_SIZE_BYTES = 1; - } - else - { - // initiator can only accept 1 flood message at a time - cfg1.PEER_FLOOD_READING_CAPACITY = 1; - cfg1.FLOW_CONTROL_SEND_MORE_BATCH_SIZE = 1; - // Set PEER_READING_CAPACITY to something higher so that the - // initiator will read both messages right away and detect capacity - // violation - cfg1.PEER_READING_CAPACITY = 2; - } - auto app1 = createTestApplication(clock, cfg1, true, false); - if (fcBytes && - appProtocolVersionStartsFrom(*app1, SOROBAN_PROTOCOL_VERSION)) - { - modifySorobanNetworkConfig(*app1, - [txSize](SorobanNetworkConfig& cfg) { - cfg.mTxMaxSizeBytes = txSize; - }); - } + cfg1.PEER_FLOOD_READING_CAPACITY_BYTES = + txSize + 1 + Herder::FLOW_CONTROL_BYTES_EXTRA_BUFFER; + cfg1.FLOW_CONTROL_SEND_MORE_BATCH_SIZE_BYTES = 1; - auto app2 = createTestApplication(clock, cfg2, true, false); - app1->getHerder().setMaxClassicTxSize(txSize); - app2->getHerder().setMaxClassicTxSize(txSize); - app1->start(); - app2->start(); + auto app1 = createTestApplication(clock, cfg1, true, false); + if (appProtocolVersionStartsFrom(*app1, SOROBAN_PROTOCOL_VERSION)) + { + modifySorobanNetworkConfig(*app1, [txSize](SorobanNetworkConfig& cfg) { + cfg.mTxMaxSizeBytes = txSize; + }); + } - LoopbackPeerConnection conn(*app1, *app2); - testutil::crankSome(clock); - REQUIRE(conn.getInitiator()->isAuthenticatedForTesting()); - REQUIRE(conn.getAcceptor()->isAuthenticatedForTesting()); + auto app2 = createTestApplication(clock, cfg2, true, false); + app1->getHerder().setMaxClassicTxSize(txSize); + app2->getHerder().setMaxClassicTxSize(txSize); + app1->start(); + app2->start(); - // Acceptor sends too many flood messages, causing initiator to drop it - auto msgPtr = std::make_shared(msg); - conn.getAcceptor()->sendAuthenticatedMessage(msgPtr); - conn.getAcceptor()->sendAuthenticatedMessage(msgPtr); - testutil::crankSome(clock); + LoopbackPeerConnection conn(*app1, *app2); + testutil::crankSome(clock); + REQUIRE(conn.getInitiator()->isAuthenticatedForTesting()); + REQUIRE(conn.getAcceptor()->isAuthenticatedForTesting()); - REQUIRE(!conn.getInitiator()->isConnectedForTesting()); - REQUIRE(!conn.getAcceptor()->isConnectedForTesting()); - REQUIRE(conn.getInitiator()->getDropReason() == - "unexpected flood message, peer at capacity"); + // Acceptor sends too many flood messages, causing initiator to drop it + auto msgPtr = std::make_shared(msg); + conn.getAcceptor()->sendAuthenticatedMessage(msgPtr); + conn.getAcceptor()->sendAuthenticatedMessage(msgPtr); + testutil::crankSome(clock); - testutil::shutdownWorkScheduler(*app2); - testutil::shutdownWorkScheduler(*app1); - }; + REQUIRE(!conn.getInitiator()->isConnectedForTesting()); + REQUIRE(!conn.getAcceptor()->isConnectedForTesting()); + REQUIRE(conn.getInitiator()->getDropReason() == + "unexpected flood message, peer at capacity"); - runWithBothFlowControlModes(cfgs, test); + testutil::shutdownWorkScheduler(*app2); + testutil::shutdownWorkScheduler(*app1); } TEST_CASE("drop idle flow-controlled peers", "[overlay][flowcontrol]") @@ -739,65 +657,41 @@ TEST_CASE("drop idle flow-controlled peers", "[overlay][flowcontrol]") msg.type(TRANSACTION); uint32 txSize = static_cast(xdr::xdr_argpack_size(msg)); - auto test = [&](bool fcBytes) { - if (fcBytes) - { - cfg1.PEER_FLOOD_READING_CAPACITY_BYTES = txSize; - // Incorrectly set batch size, so that the node does not send flood - // requests - cfg1.FLOW_CONTROL_SEND_MORE_BATCH_SIZE_BYTES = txSize + 1; - } - else - { - cfg1.PEER_FLOOD_READING_CAPACITY = 1; - cfg1.PEER_READING_CAPACITY = 1; - // Incorrectly set batch size, so that the node does not send flood - // requests - cfg1.FLOW_CONTROL_SEND_MORE_BATCH_SIZE = 2; - } - auto app1 = createTestApplication(clock, cfg1); - auto app2 = createTestApplication(clock, cfg2); + cfg1.PEER_FLOOD_READING_CAPACITY_BYTES = txSize; + // Incorrectly set batch size, so that the node does not send flood + // requests + cfg1.FLOW_CONTROL_SEND_MORE_BATCH_SIZE_BYTES = txSize + 1; - LoopbackPeerConnection conn(*app1, *app2); - testutil::crankSome(clock); - REQUIRE(conn.getInitiator()->isAuthenticatedForTesting()); - REQUIRE(conn.getAcceptor()->isAuthenticatedForTesting()); + auto app1 = createTestApplication(clock, cfg1); + auto app2 = createTestApplication(clock, cfg2); - REQUIRE(conn.getAcceptor()->checkCapacity(conn.getInitiator())); - // Send outbound message and start the timer - conn.getAcceptor()->sendMessage(std::make_shared(msg), - false); - conn.getAcceptor()->sendMessage(std::make_shared(msg), - false); + LoopbackPeerConnection conn(*app1, *app2); + testutil::crankSome(clock); + REQUIRE(conn.getInitiator()->isAuthenticatedForTesting()); + REQUIRE(conn.getAcceptor()->isAuthenticatedForTesting()); - if (fcBytes) - { - REQUIRE(conn.getAcceptor() - ->getFlowControl() - ->getCapacityBytes() - ->getOutboundCapacity() < txSize); - } - else - { - REQUIRE(conn.getAcceptor() - ->getFlowControl() - ->getCapacity() - ->getOutboundCapacity() == 0); - } + REQUIRE(conn.getAcceptor()->checkCapacity(conn.getInitiator())); + // Send outbound message and start the timer + conn.getAcceptor()->sendMessage(std::make_shared(msg), + false); + conn.getAcceptor()->sendMessage(std::make_shared(msg), + false); - testutil::crankFor(clock, Peer::PEER_SEND_MODE_IDLE_TIMEOUT + - std::chrono::seconds(5)); + REQUIRE(conn.getAcceptor() + ->getFlowControl() + ->getCapacityBytes() + .getOutboundCapacity() < txSize); - REQUIRE(!conn.getInitiator()->isConnectedForTesting()); - REQUIRE(!conn.getAcceptor()->isConnectedForTesting()); - REQUIRE(conn.getAcceptor()->getDropReason() == - "idle timeout (no new flood requests)"); + testutil::crankFor(clock, Peer::PEER_SEND_MODE_IDLE_TIMEOUT + + std::chrono::seconds(5)); - testutil::shutdownWorkScheduler(*app2); - testutil::shutdownWorkScheduler(*app1); - }; + REQUIRE(!conn.getInitiator()->isConnectedForTesting()); + REQUIRE(!conn.getAcceptor()->isConnectedForTesting()); + REQUIRE(conn.getAcceptor()->getDropReason() == + "idle timeout (no new flood requests)"); - runWithBothFlowControlModes(cfgs, test); + testutil::shutdownWorkScheduler(*app2); + testutil::shutdownWorkScheduler(*app1); } TEST_CASE("drop peers that overflow capacity", "[overlay][flowcontrol]") @@ -824,14 +718,14 @@ TEST_CASE("drop peers that overflow capacity", "[overlay][flowcontrol]") conn.getInitiator() ->getFlowControl() ->getCapacity() - ->setOutboundCapacity(limit); + .setOutboundCapacity(limit); } SECTION("byte capacity") { conn.getInitiator() ->getFlowControl() ->getCapacityBytes() - ->setOutboundCapacity(limit); + .setOutboundCapacity(limit); } conn.getAcceptor()->sendSendMore(2, 2); @@ -990,8 +884,7 @@ TEST_CASE("outbound queue filtering", "[overlay][connections]") StellarMessage msg; msg.type(TRANSACTION); auto byteSize = - peer->getFlowControl()->getCapacityBytes()->getMsgResourceCount( - msg); + peer->getFlowControl()->getCapacityBytes().getMsgResourceCount(msg); SECTION("trim based on message count") { for (uint32_t i = 0; i < limit + 10; ++i) @@ -2264,22 +2157,7 @@ TEST_CASE("overlay flow control", "[overlay][flowcontrol]") SECTION("enabled") { - SECTION("flow control in bytes on all") - { - setupSimulation(); - } - SECTION("one peer disables") - { - configs[2].ENABLE_FLOW_CONTROL_BYTES = false; - setupSimulation(); - } - SECTION("all peers disable") - { - std::for_each(configs.begin(), configs.end(), [](Config& cfg) { - cfg.ENABLE_FLOW_CONTROL_BYTES = false; - }); - setupSimulation(); - } + setupSimulation(); simulation->crankUntil( [&] { return simulation->haveAllExternalized(2, 1); }, diff --git a/src/simulation/test/LoadGeneratorTests.cpp b/src/simulation/test/LoadGeneratorTests.cpp index 15515a1436..b0e8037eac 100644 --- a/src/simulation/test/LoadGeneratorTests.cpp +++ b/src/simulation/test/LoadGeneratorTests.cpp @@ -112,6 +112,10 @@ TEST_CASE("generate soroban load", "[loadgen][soroban]") cfg.LOADGEN_INSTRUCTIONS_FOR_TESTING = {1'000'000, 5'000'000, 10'000'000}; cfg.LOADGEN_INSTRUCTIONS_DISTRIBUTION_FOR_TESTING = {1, 2, 3}; + + // Let OverlayManager set flow control bytes parameters + cfg.PEER_FLOOD_READING_CAPACITY_BYTES = 0; + cfg.FLOW_CONTROL_SEND_MORE_BATCH_SIZE_BYTES = 0; return cfg; }); diff --git a/src/test/test.cpp b/src/test/test.cpp index c99192aa9b..15e0c279e9 100644 --- a/src/test/test.cpp +++ b/src/test/test.cpp @@ -309,6 +309,13 @@ getTestConfig(int instanceNumber, Config::TestDbMode mode) thisConfig.PEER_FLOOD_READING_CAPACITY = 20; thisConfig.FLOW_CONTROL_SEND_MORE_BATCH_SIZE = 10; + // Set nonzero default config values for flow control in bytes. Without + // these OverlayManagerImpl will attempt to compute them from Herder. + // However, some tests do not start Herder and those may crash if they + // start OverlayManagerImpl without these values set. + thisConfig.PEER_FLOOD_READING_CAPACITY_BYTES = 300000; + thisConfig.FLOW_CONTROL_SEND_MORE_BATCH_SIZE_BYTES = 100000; + // Tests default to using SQL for ledger state thisConfig.DEPRECATED_SQL_LEDGER_STATE = true; diff --git a/src/transactions/test/InvokeHostFunctionTests.cpp b/src/transactions/test/InvokeHostFunctionTests.cpp index cb94c717ae..2b7e59f19d 100644 --- a/src/transactions/test/InvokeHostFunctionTests.cpp +++ b/src/transactions/test/InvokeHostFunctionTests.cpp @@ -145,6 +145,19 @@ overrideNetworkSettingsToMin(Application& app) upgrade.newMaxSorobanTxSetSize() = 1; executeUpgrade(app, upgrade); } + +// Set high limits for soroban. Also configures flow control to support the +// higher limits. +void +setHighSorobanLimits(Config& cfg) +{ + cfg.TESTING_SOROBAN_HIGH_LIMIT_OVERRIDE = true; + + // Let OverlayManagerImpl compute flow control bytes values + cfg.PEER_FLOOD_READING_CAPACITY_BYTES = 0; + cfg.FLOW_CONTROL_SEND_MORE_BATCH_SIZE_BYTES = 0; +} + } // namespace TEST_CASE("Trustline stellar asset contract", @@ -290,7 +303,7 @@ TEST_CASE("Native stellar asset contract", "[tx][soroban][invariant][conservationoflumens]") { auto cfg = getTestConfig(); - cfg.TESTING_SOROBAN_HIGH_LIMIT_OVERRIDE = true; + setHighSorobanLimits(cfg); SorobanTest test(cfg); auto& app = test.getApp(); @@ -2519,7 +2532,7 @@ TEST_CASE("temp entry eviction", "[tx][soroban]") // override if (enableBucketListDB) { - cfg.TESTING_SOROBAN_HIGH_LIMIT_OVERRIDE = true; + setHighSorobanLimits(cfg); } SorobanTest test(cfg);