From 7cafbba463a881d3fef63ef82582e521b101e27b Mon Sep 17 00:00:00 2001 From: Graydon Hoare Date: Wed, 17 Jul 2019 13:01:33 -0700 Subject: [PATCH] bug 2202: fsync XDR on close (also bucket dir), with option to disable. --- docs/stellar-core_example.cfg | 12 ++ src/bucket/Bucket.cpp | 10 +- src/bucket/Bucket.h | 5 +- src/bucket/BucketList.cpp | 3 +- src/bucket/BucketManagerImpl.cpp | 17 +- src/bucket/BucketManagerImpl.h | 1 + src/bucket/BucketOutputIterator.cpp | 3 +- src/bucket/BucketOutputIterator.h | 3 +- src/bucket/FutureBucket.cpp | 4 +- src/bucket/test/BucketListTests.cpp | 6 +- src/bucket/test/BucketManagerTests.cpp | 18 +- src/bucket/test/BucketTests.cpp | 166 +++++++++++------- src/history/StateSnapshot.cpp | 4 +- src/history/test/HistoryTestsUtils.cpp | 5 +- ...ucketListIsConsistentWithDatabaseTests.cpp | 4 +- src/main/Config.cpp | 5 + src/main/Config.h | 11 ++ src/test/fuzz.cpp | 2 +- src/test/test.cpp | 3 + src/util/FileSystemException.h | 21 +++ src/util/Fs.cpp | 78 ++++++++ src/util/Fs.h | 9 + src/util/XDRStream.h | 74 +++++--- src/util/test/XDRStreamTests.cpp | 58 +++++- 24 files changed, 407 insertions(+), 115 deletions(-) diff --git a/docs/stellar-core_example.cfg b/docs/stellar-core_example.cfg index 49a06bdec5..e213c0cce7 100644 --- a/docs/stellar-core_example.cfg +++ b/docs/stellar-core_example.cfg @@ -366,6 +366,18 @@ ALLOW_LOCALHOST_FOR_TESTING=false # unlikely to be for the latest ledger. MAXIMUM_LEDGER_CLOSETIME_DRIFT=50 +# DISABLE_XDR_FSYNC (true or false) defaults to false. +# If set to true, writing an XDR file (a bucket or a checkpoint) will not +# be followed by an fsync on the file. This in turn means that XDR files +# (which hold the canonical state of the ledger) may be corrupted if the +# operating system suddenly crashes or loses power, causing the node to +# diverge and get stuck on restart, or potentially even publish bad +# history. This option only exists as an escape hatch if the local +# filesystem is so unusably slow that you prefer operating without +# durability guarantees. Do not set it to true unless you're very certain +# you want to make that trade. +DISABLE_XDR_FSYNC=false + ##################### ## Tables must come at the end. (TOML you are almost perfect!) diff --git a/src/bucket/Bucket.cpp b/src/bucket/Bucket.cpp index 3ba357c064..a4ffedef9b 100644 --- a/src/bucket/Bucket.cpp +++ b/src/bucket/Bucket.cpp @@ -137,7 +137,8 @@ std::shared_ptr Bucket::fresh(BucketManager& bucketManager, uint32_t protocolVersion, std::vector const& initEntries, std::vector const& liveEntries, - std::vector const& deadEntries, bool countMergeEvents) + std::vector const& deadEntries, bool countMergeEvents, + bool doFsync) { // When building fresh buckets after protocol version 10 (i.e. version // 11-or-after) we differentiate INITENTRY from LIVEENTRY. In older @@ -151,7 +152,8 @@ Bucket::fresh(BucketManager& bucketManager, uint32_t protocolVersion, convertToBucketEntry(useInit, initEntries, liveEntries, deadEntries); MergeCounters mc; - BucketOutputIterator out(bucketManager.getTmpDir(), true, meta, mc); + BucketOutputIterator out(bucketManager.getTmpDir(), true, meta, mc, + doFsync); for (auto const& e : entries) { out.put(e); @@ -566,7 +568,7 @@ Bucket::merge(BucketManager& bucketManager, uint32_t maxProtocolVersion, std::shared_ptr const& oldBucket, std::shared_ptr const& newBucket, std::vector> const& shadows, - bool keepDeadEntries, bool countMergeEvents) + bool keepDeadEntries, bool countMergeEvents, bool doFsync) { // This is the key operation in the scheme: merging two (read-only) // buckets together into a new 3rd bucket, while calculating its hash, @@ -591,7 +593,7 @@ Bucket::merge(BucketManager& bucketManager, uint32_t maxProtocolVersion, BucketMetadata meta; meta.ledgerVersion = protocolVersion; BucketOutputIterator out(bucketManager.getTmpDir(), keepDeadEntries, meta, - mc); + mc, doFsync); BucketEntryIdCmp cmp; while (oi || ni) diff --git a/src/bucket/Bucket.h b/src/bucket/Bucket.h index 14be8fb6bf..9de815a68f 100644 --- a/src/bucket/Bucket.h +++ b/src/bucket/Bucket.h @@ -83,7 +83,8 @@ class Bucket : public std::enable_shared_from_this, fresh(BucketManager& bucketManager, uint32_t protocolVersion, std::vector const& initEntries, std::vector const& liveEntries, - std::vector const& deadEntries, bool countMergeEvents); + std::vector const& deadEntries, bool countMergeEvents, + bool doFsync); // Merge two buckets together, producing a fresh one. Entries in `oldBucket` // are overridden in the fresh bucket by keywise-equal entries in @@ -101,6 +102,6 @@ class Bucket : public std::enable_shared_from_this, std::shared_ptr const& oldBucket, std::shared_ptr const& newBucket, std::vector> const& shadows, - bool keepDeadEntries, bool countMergeEvents); + bool keepDeadEntries, bool countMergeEvents, bool doFsync); }; } diff --git a/src/bucket/BucketList.cpp b/src/bucket/BucketList.cpp index 593054aeb4..baa6b7e497 100644 --- a/src/bucket/BucketList.cpp +++ b/src/bucket/BucketList.cpp @@ -527,11 +527,12 @@ BucketList::addBatch(Application& app, uint32_t currLedger, // and we are checking for an expected number of merge events on restart. bool countMergeEvents = !app.getConfig().ARTIFICIALLY_REDUCE_MERGE_COUNTS_FOR_TESTING; + bool doFsync = !app.getConfig().DISABLE_XDR_FSYNC; assert(shadows.size() == 0); mLevels[0].prepare(app, currLedger, currLedgerProtocol, Bucket::fresh(app.getBucketManager(), currLedgerProtocol, initEntries, liveEntries, deadEntries, - countMergeEvents), + countMergeEvents, doFsync), shadows, countMergeEvents); mLevels[0].commit(); diff --git a/src/bucket/BucketManagerImpl.cpp b/src/bucket/BucketManagerImpl.cpp index d421611813..9426cc67bb 100644 --- a/src/bucket/BucketManagerImpl.cpp +++ b/src/bucket/BucketManagerImpl.cpp @@ -290,6 +290,19 @@ BucketManagerImpl::incrMergeCounters(MergeCounters const& delta) mMergeCounters += delta; } +bool +BucketManagerImpl::renameBucket(std::string const& src, std::string const& dst) +{ + if (mApp.getConfig().DISABLE_XDR_FSYNC) + { + return rename(src.c_str(), dst.c_str()) == 0; + } + else + { + return fs::durableRename(src, dst, getBucketDir()); + } +} + std::shared_ptr BucketManagerImpl::adoptFileAsBucket(std::string const& filename, uint256 const& hash, size_t nObjects, @@ -328,14 +341,14 @@ BucketManagerImpl::adoptFileAsBucket(std::string const& filename, std::string canonicalName = bucketFilename(hash); CLOG(DEBUG, "Bucket") << "Adopting bucket file " << filename << " as " << canonicalName; - if (rename(filename.c_str(), canonicalName.c_str()) != 0) + if (!renameBucket(filename, canonicalName)) { std::string err("Failed to rename bucket :"); err += strerror(errno); // it seems there is a race condition with external systems // retry after sleeping for a second works around the problem std::this_thread::sleep_for(std::chrono::seconds(1)); - if (rename(filename.c_str(), canonicalName.c_str()) != 0) + if (!renameBucket(filename, canonicalName)) { // if rename fails again, surface the original error throw std::runtime_error(err); diff --git a/src/bucket/BucketManagerImpl.h b/src/bucket/BucketManagerImpl.h index d1345139eb..8717165545 100644 --- a/src/bucket/BucketManagerImpl.h +++ b/src/bucket/BucketManagerImpl.h @@ -66,6 +66,7 @@ class BucketManagerImpl : public BucketManager std::set getReferencedBuckets() const; void cleanupStaleFiles(); void cleanDir(); + bool renameBucket(std::string const& src, std::string const& dst); #ifdef BUILD_TESTS bool mUseFakeTestValuesForNextClose{false}; diff --git a/src/bucket/BucketOutputIterator.cpp b/src/bucket/BucketOutputIterator.cpp index 8fc6da6c7d..9de436db3f 100644 --- a/src/bucket/BucketOutputIterator.cpp +++ b/src/bucket/BucketOutputIterator.cpp @@ -35,8 +35,9 @@ randomBucketName(std::string const& tmpDir) BucketOutputIterator::BucketOutputIterator(std::string const& tmpDir, bool keepDeadEntries, BucketMetadata const& meta, - MergeCounters& mc) + MergeCounters& mc, bool doFsync) : mFilename(randomBucketName(tmpDir)) + , mOut(doFsync) , mBuf(nullptr) , mHasher(SHA256::create()) , mKeepDeadEntries(keepDeadEntries) diff --git a/src/bucket/BucketOutputIterator.h b/src/bucket/BucketOutputIterator.h index 83c46b4a14..658550350e 100644 --- a/src/bucket/BucketOutputIterator.h +++ b/src/bucket/BucketOutputIterator.h @@ -44,7 +44,8 @@ class BucketOutputIterator // the form of a METAENTRY; but that's not a thing the caller gets to decide // (or forget to do), it's handled automatically. BucketOutputIterator(std::string const& tmpDir, bool keepDeadEntries, - BucketMetadata const& meta, MergeCounters& mc); + BucketMetadata const& meta, MergeCounters& mc, + bool doFsync); void put(BucketEntry const& e); diff --git a/src/bucket/FutureBucket.cpp b/src/bucket/FutureBucket.cpp index 11829da5e4..cc8689a8a4 100644 --- a/src/bucket/FutureBucket.cpp +++ b/src/bucket/FutureBucket.cpp @@ -314,7 +314,6 @@ FutureBucket::startMerge(Application& app, uint32_t maxProtocolVersion, checkState(); return; } - using task_t = std::packaged_task()>; std::shared_ptr task = std::make_shared( [curr, snap, &bm, shadows, maxProtocolVersion, countMergeEvents, level, @@ -328,7 +327,8 @@ FutureBucket::startMerge(Application& app, uint32_t maxProtocolVersion, { auto res = Bucket::merge( bm, maxProtocolVersion, curr, snap, shadows, - BucketList::keepDeadEntries(level), countMergeEvents); + BucketList::keepDeadEntries(level), countMergeEvents, + !app.getConfig().DISABLE_XDR_FSYNC); CLOG(TRACE, "Bucket") << "Worker finished merging curr=" diff --git a/src/bucket/test/BucketListTests.cpp b/src/bucket/test/BucketListTests.cpp index 3a2f945c3b..3275a8c79e 100644 --- a/src/bucket/test/BucketListTests.cpp +++ b/src/bucket/test/BucketListTests.cpp @@ -269,11 +269,13 @@ TEST_CASE("bucket tombstones expire at bottom level", level.setCurr(Bucket::fresh( bm, getAppLedgerVersion(app), {}, LedgerTestUtils::generateValidLedgerEntries(8), deadGen(8), - /*countMergeEvents=*/true)); + /*countMergeEvents=*/true, + /*doFsync=*/true)); level.setSnap(Bucket::fresh( bm, getAppLedgerVersion(app), {}, LedgerTestUtils::generateValidLedgerEntries(8), deadGen(8), - /*countMergeEvents=*/true)); + /*countMergeEvents=*/true, + /*doFsync=*/true)); } for (uint32_t i = 0; i < BucketList::kNumLevels; ++i) diff --git a/src/bucket/test/BucketManagerTests.cpp b/src/bucket/test/BucketManagerTests.cpp index ccb76cc702..b51b5415b0 100644 --- a/src/bucket/test/BucketManagerTests.cpp +++ b/src/bucket/test/BucketManagerTests.cpp @@ -281,20 +281,20 @@ TEST_CASE("bucketmanager ownership", "[bucket][bucketmanager]") std::shared_ptr b1; { - std::shared_ptr b2 = - Bucket::fresh(app->getBucketManager(), getAppLedgerVersion(app), - {}, live, dead, /*countMergeEvents=*/true); + std::shared_ptr b2 = Bucket::fresh( + app->getBucketManager(), getAppLedgerVersion(app), {}, live, + dead, /*countMergeEvents=*/true, /*doFsync=*/true); b1 = b2; // Bucket is referenced by b1, b2 and the BucketManager. CHECK(b1.use_count() == 3); - std::shared_ptr b3 = - Bucket::fresh(app->getBucketManager(), getAppLedgerVersion(app), - {}, live, dead, /*countMergeEvents=*/true); - std::shared_ptr b4 = - Bucket::fresh(app->getBucketManager(), getAppLedgerVersion(app), - {}, live, dead, /*countMergeEvents=*/true); + std::shared_ptr b3 = Bucket::fresh( + app->getBucketManager(), getAppLedgerVersion(app), {}, live, + dead, /*countMergeEvents=*/true, /*doFsync=*/true); + std::shared_ptr b4 = Bucket::fresh( + app->getBucketManager(), getAppLedgerVersion(app), {}, live, + dead, /*countMergeEvents=*/true, /*doFsync=*/true); // Bucket is referenced by b1, b2, b3, b4 and the BucketManager. CHECK(b1.use_count() == 5); } diff --git a/src/bucket/test/BucketTests.cpp b/src/bucket/test/BucketTests.cpp index 0b35d8ce22..2d63b15b3a 100644 --- a/src/bucket/test/BucketTests.cpp +++ b/src/bucket/test/BucketTests.cpp @@ -118,7 +118,7 @@ TEST_CASE("file backed buckets", "[bucket][bucketbench]") CLOG(DEBUG, "Bucket") << "Hashing entries"; std::shared_ptr b1 = Bucket::fresh( app->getBucketManager(), getAppLedgerVersion(app), {}, live, dead, - /*countMergeEvents=*/true); + /*countMergeEvents=*/true, /*doFsync=*/true); for (uint32_t i = 0; i < 5; ++i) { CLOG(DEBUG, "Bucket") << "Merging 10000 new ledger entries into " @@ -133,10 +133,12 @@ TEST_CASE("file backed buckets", "[bucket][bucketbench]") Bucket::fresh(app->getBucketManager(), getAppLedgerVersion(app), {}, live, dead, - /*countMergeEvents=*/true), + /*countMergeEvents=*/true, + /*doFsync=*/true), /*shadows=*/{}, /*keepDeadEntries=*/true, - /*countMergeEvents=*/true); + /*countMergeEvents=*/true, + /*doFsync=*/true); } } CLOG(DEBUG, "Bucket") @@ -183,12 +185,15 @@ TEST_CASE("merging bucket entries", "[bucket]") } auto deadEntry = LedgerEntryKey(liveEntry); auto bLive = Bucket::fresh(bm, vers, {}, {liveEntry}, {}, - /*countMergeEvents=*/true); + /*countMergeEvents=*/true, + /*doFsync=*/true); auto bDead = Bucket::fresh(bm, vers, {}, {}, {deadEntry}, - /*countMergeEvents=*/true); + /*countMergeEvents=*/true, + /*doFsync=*/true); auto b1 = Bucket::merge(bm, vers, bLive, bDead, /*shadows=*/{}, /*keepDeadEntries=*/true, - /*countMergeEvents=*/true); + /*countMergeEvents=*/true, + /*doFsync=*/true); CHECK(countEntries(b1) == 1); } }; @@ -211,12 +216,15 @@ TEST_CASE("merging bucket entries", "[bucket]") } } auto bLive = Bucket::fresh(bm, vers, {}, live, {}, - /*countMergeEvents=*/true); + /*countMergeEvents=*/true, + /*doFsync=*/true); auto bDead = Bucket::fresh(bm, vers, {}, {}, dead, - /*countMergeEvents=*/true); + /*countMergeEvents=*/true, + /*doFsync=*/true); auto b1 = Bucket::merge(bm, vers, bLive, bDead, /*shadows=*/{}, /*keepDeadEntries=*/true, - /*countMergeEvents=*/true); + /*countMergeEvents=*/true, + /*doFsync=*/true); EntryCounts e(b1); CHECK(e.sum() == live.size()); CLOG(DEBUG, "Bucket") << "post-merge live count: " << e.nLive @@ -234,7 +242,8 @@ TEST_CASE("merging bucket entries", "[bucket]") } std::shared_ptr b1 = Bucket::fresh(app->getBucketManager(), getAppLedgerVersion(app), - {}, live, dead, /*countMergeEvents=*/true); + {}, live, dead, /*countMergeEvents=*/true, + /*doFsync=*/true); std::random_shuffle(live.begin(), live.end()); size_t liveCount = live.size(); for (auto& e : live) @@ -247,12 +256,14 @@ TEST_CASE("merging bucket entries", "[bucket]") } std::shared_ptr b2 = Bucket::fresh(app->getBucketManager(), getAppLedgerVersion(app), - {}, live, dead, /*countMergeEvents=*/true); + {}, live, dead, /*countMergeEvents=*/true, + /*doFsync=*/true); std::shared_ptr b3 = Bucket::merge(app->getBucketManager(), app->getConfig().LEDGER_PROTOCOL_VERSION, b1, b2, /*shadows=*/{}, /*keepDeadEntries=*/true, - /*countMergeEvents=*/true); + /*countMergeEvents=*/true, + /*doFsync=*/true); CHECK(countEntries(b3) == liveCount); } }); @@ -319,22 +330,28 @@ TEST_CASE("merges refuse to exceed max protocol version", LedgerEntry liveEntry = generateAccount(); LedgerEntry otherLiveA = generateDifferentAccount({liveEntry}); auto bold1 = Bucket::fresh(bm, vers - 1, {}, {liveEntry}, {}, - /*countMergeEvents=*/true); + /*countMergeEvents=*/true, + /*doFsync=*/true); auto bold2 = Bucket::fresh(bm, vers - 1, {}, {otherLiveA}, {}, - /*countMergeEvents=*/true); + /*countMergeEvents=*/true, + /*doFsync=*/true); auto bnew1 = - Bucket::fresh(bm, vers, {}, {liveEntry}, {}, /*countMergeEvents=*/true); + Bucket::fresh(bm, vers, {}, {liveEntry}, {}, /*countMergeEvents=*/true, + /*doFsync=*/true); auto bnew2 = Bucket::fresh(bm, vers, {}, {otherLiveA}, {}, - /*countMergeEvents=*/true); + /*countMergeEvents=*/true, + /*doFsync=*/true); REQUIRE_THROWS_AS(Bucket::merge(bm, vers - 1, bnew1, bnew2, /*shadows=*/{}, /*keepDeadEntries=*/true, - /*countMergeEvents=*/true), + /*countMergeEvents=*/true, + /*doFsync=*/true), std::runtime_error); REQUIRE_THROWS_AS(Bucket::merge(bm, vers - 1, bold1, bold2, /*shadows=*/{bnew1}, /*keepDeadEntries=*/true, - /*countMergeEvents=*/true), + /*countMergeEvents=*/true, + /*doFsync=*/true), std::runtime_error); } @@ -354,7 +371,7 @@ TEST_CASE("bucket output iterator rejects wrong-version entries", metaEntry.type(METAENTRY); metaEntry.metaEntry() = meta; MergeCounters mc; - BucketOutputIterator out(bm.getTmpDir(), true, meta, mc); + BucketOutputIterator out(bm.getTmpDir(), true, meta, mc, /*doFsync=*/true); REQUIRE_THROWS_AS(out.put(initEntry), std::runtime_error); REQUIRE_THROWS_AS(out.put(metaEntry), std::runtime_error); } @@ -394,13 +411,16 @@ TEST_CASE("merging bucket entries with initentry", "[bucket][initentry]") SECTION("dead and init account entries merge correctly") { auto bInit = Bucket::fresh(bm, vers, {initEntry}, {}, {}, - /*countMergeEvents=*/true); + /*countMergeEvents=*/true, + /*doFsync=*/true); auto bDead = Bucket::fresh(bm, vers, {}, {}, {deadEntry}, - /*countMergeEvents=*/true); + /*countMergeEvents=*/true, + /*doFsync=*/true); auto b1 = Bucket::merge(bm, cfg.LEDGER_PROTOCOL_VERSION, bInit, bDead, /*shadows=*/{}, /*keepDeadEntries=*/true, - /*countMergeEvents=*/true); + /*countMergeEvents=*/true, + /*doFsync=*/true); // In initEra, the INIT will make it through fresh() to the bucket, // and mutually annihilate on contact with the DEAD, leaving 0 // entries. Pre-initEra, the INIT will downgrade to a LIVE during @@ -425,19 +445,24 @@ TEST_CASE("merging bucket entries with initentry", "[bucket][initentry]") "correctly") { auto bInit = Bucket::fresh(bm, vers, {initEntry}, {}, {}, - /*countMergeEvents=*/true); + /*countMergeEvents=*/true, + /*doFsync=*/true); auto bLive = Bucket::fresh(bm, vers, {}, {liveEntry}, {}, - /*countMergeEvents=*/true); + /*countMergeEvents=*/true, + /*doFsync=*/true); auto bDead = Bucket::fresh(bm, vers, {}, {}, {deadEntry}, - /*countMergeEvents=*/true); + /*countMergeEvents=*/true, + /*doFsync=*/true); auto bmerge1 = Bucket::merge(bm, cfg.LEDGER_PROTOCOL_VERSION, bInit, bLive, /*shadows=*/{}, /*keepDeadEntries=*/true, - /*countMergeEvents=*/true); + /*countMergeEvents=*/true, + /*doFsync=*/true); auto b1 = Bucket::merge(bm, cfg.LEDGER_PROTOCOL_VERSION, bmerge1, bDead, /*shadows=*/{}, /*keepDeadEntries=*/true, - /*countMergeEvents=*/true); + /*countMergeEvents=*/true, + /*doFsync=*/true); // The same thing should happen here as above, except that the INIT // will merge-over the LIVE during fresh(). EntryCounts e(b1); @@ -459,12 +484,15 @@ TEST_CASE("merging bucket entries with initentry", "[bucket][initentry]") "separate buckets") { auto bold = Bucket::fresh(bm, vers, {initEntry}, {}, {}, - /*countMergeEvents=*/true); + /*countMergeEvents=*/true, + /*doFsync=*/true); auto bmed = Bucket::fresh( bm, vers, {}, {otherLiveA, otherLiveB, liveEntry, otherLiveC}, - {}, /*countMergeEvents=*/true); + {}, /*countMergeEvents=*/true, + /*doFsync=*/true); auto bnew = Bucket::fresh(bm, vers, {}, {}, {deadEntry}, - /*countMergeEvents=*/true); + /*countMergeEvents=*/true, + /*doFsync=*/true); EntryCounts eold(bold), emed(bmed), enew(bnew); if (initEra) { @@ -496,11 +524,13 @@ TEST_CASE("merging bucket entries with initentry", "[bucket][initentry]") auto bmerge1 = Bucket::merge(bm, cfg.LEDGER_PROTOCOL_VERSION, bold, bmed, /*shadows=*/{}, /*keepDeadEntries=*/true, - /*countMergeEvents=*/true); + /*countMergeEvents=*/true, + /*doFsync=*/true); auto bmerge2 = Bucket::merge(bm, cfg.LEDGER_PROTOCOL_VERSION, bmerge1, bnew, /*shadows=*/{}, /*keepDeadEntries=*/true, - /*countMergeEvents=*/true); + /*countMergeEvents=*/true, + /*doFsync=*/true); EntryCounts emerge1(bmerge1), emerge2(bmerge2); if (initEra) { @@ -531,15 +561,19 @@ TEST_CASE("merging bucket entries with initentry", "[bucket][initentry]") // (INIT/DEAD). In 11-and-after versions, shadows _don't_ eliminate // lifecycle entries. auto shadow = Bucket::fresh(bm, vers, {}, {liveEntry}, {}, - /*countMergeEvents=*/true); + /*countMergeEvents=*/true, + /*doFsync=*/true); auto b1 = Bucket::fresh(bm, vers, {initEntry}, {}, {}, - /*countMergeEvents=*/true); + /*countMergeEvents=*/true, + /*doFsync=*/true); auto b2 = Bucket::fresh(bm, vers, {otherInitA}, {}, {}, - /*countMergeEvents=*/true); + /*countMergeEvents=*/true, + /*doFsync=*/true); auto merged = Bucket::merge(bm, cfg.LEDGER_PROTOCOL_VERSION, b1, b2, /*shadows=*/{shadow}, /*keepDeadEntries=*/true, - /*countMergeEvents=*/true); + /*countMergeEvents=*/true, + /*doFsync=*/true); EntryCounts e(merged); if (initEra) { @@ -566,15 +600,20 @@ TEST_CASE("merging bucket entries with initentry", "[bucket][initentry]") // // (level1 is newest here, level5 is oldest) auto level1 = Bucket::fresh(bm, vers, {}, {}, {deadEntry}, - /*countMergeEvents=*/true); + /*countMergeEvents=*/true, + /*doFsync=*/true); auto level2 = Bucket::fresh(bm, vers, {initEntry2}, {}, {}, - /*countMergeEvents=*/true); + /*countMergeEvents=*/true, + /*doFsync=*/true); auto level3 = Bucket::fresh(bm, vers, {}, {}, {deadEntry}, - /*countMergeEvents=*/true); + /*countMergeEvents=*/true, + /*doFsync=*/true); auto level4 = - Bucket::fresh(bm, vers, {}, {}, {}, /*countMergeEvents=*/true); + Bucket::fresh(bm, vers, {}, {}, {}, /*countMergeEvents=*/true, + /*doFsync=*/true); auto level5 = Bucket::fresh(bm, vers, {initEntry}, {}, {}, - /*countMergeEvents=*/true); + /*countMergeEvents=*/true, + /*doFsync=*/true); // Do a merge between levels 4 and 3, with shadows from 2 and 1, // risking shadowing-out level 3. Level 4 is a placeholder here, @@ -584,7 +623,8 @@ TEST_CASE("merging bucket entries with initentry", "[bucket][initentry]") Bucket::merge(bm, cfg.LEDGER_PROTOCOL_VERSION, level4, level3, /*shadows=*/{level2, level1}, /*keepDeadEntries=*/true, - /*countMergeEvents=*/true); + /*countMergeEvents=*/true, + /*doFsync=*/true); EntryCounts e43(merge43); if (initEra) { @@ -609,7 +649,8 @@ TEST_CASE("merging bucket entries with initentry", "[bucket][initentry]") Bucket::merge(bm, cfg.LEDGER_PROTOCOL_VERSION, level2, level1, /*shadows=*/{}, /*keepDeadEntries=*/true, - /*countMergeEvents=*/true); + /*countMergeEvents=*/true, + /*doFsync=*/true); EntryCounts e21(merge21); if (initEra) { @@ -634,12 +675,14 @@ TEST_CASE("merging bucket entries with initentry", "[bucket][initentry]") Bucket::merge(bm, cfg.LEDGER_PROTOCOL_VERSION, merge43, merge21, /*shadows=*/{}, /*keepDeadEntries=*/true, - /*countMergeEvents=*/true); + /*countMergeEvents=*/true, + /*doFsync=*/true); auto merge54321 = Bucket::merge(bm, cfg.LEDGER_PROTOCOL_VERSION, level5, merge4321, /*shadows=*/{}, /*keepDeadEntries=*/true, - /*countMergeEvents=*/true); + /*countMergeEvents=*/true, + /*doFsync=*/true); EntryCounts e54321(merge21); if (initEra) { @@ -669,11 +712,14 @@ TEST_CASE("merging bucket entries with initentry", "[bucket][initentry]") // // (level1 is newest here, level3 is oldest) auto level1 = Bucket::fresh(bm, vers, {}, {}, {deadEntry}, - /*countMergeEvents=*/true); + /*countMergeEvents=*/true, + /*doFsync=*/true); auto level2 = Bucket::fresh(bm, vers, {}, {liveEntry}, {}, - /*countMergeEvents=*/true); + /*countMergeEvents=*/true, + /*doFsync=*/true); auto level3 = Bucket::fresh(bm, vers, {initEntry}, {}, {}, - /*countMergeEvents=*/true); + /*countMergeEvents=*/true, + /*doFsync=*/true); // Do a merge between levels 3 and 2, with shadow from 1, risking // shadowing-out the init on level 3. Level 2 is a placeholder here, @@ -683,7 +729,8 @@ TEST_CASE("merging bucket entries with initentry", "[bucket][initentry]") Bucket::merge(bm, cfg.LEDGER_PROTOCOL_VERSION, level3, level2, /*shadows=*/{level1}, /*keepDeadEntries=*/true, - /*countMergeEvents=*/true); + /*countMergeEvents=*/true, + /*doFsync=*/true); EntryCounts e32(merge32); if (initEra) { @@ -709,7 +756,8 @@ TEST_CASE("merging bucket entries with initentry", "[bucket][initentry]") Bucket::merge(bm, cfg.LEDGER_PROTOCOL_VERSION, merge32, level1, /*shadows=*/{}, /*keepDeadEntries=*/true, - /*countMergeEvents=*/true); + /*countMergeEvents=*/true, + /*doFsync=*/true); EntryCounts e321(merge321); if (initEra) { @@ -752,13 +800,13 @@ TEST_CASE("bucket apply", "[bucket]") dead.emplace_back(LedgerEntryKey(e)); } - std::shared_ptr birth = - Bucket::fresh(app->getBucketManager(), getAppLedgerVersion(app), {}, - live, noDead, /*countMergeEvents=*/true); + std::shared_ptr birth = Bucket::fresh( + app->getBucketManager(), getAppLedgerVersion(app), {}, live, noDead, + /*countMergeEvents=*/true, /*doFsync=*/true); - std::shared_ptr death = - Bucket::fresh(app->getBucketManager(), getAppLedgerVersion(app), {}, - noLive, dead, /*countMergeEvents=*/true); + std::shared_ptr death = Bucket::fresh( + app->getBucketManager(), getAppLedgerVersion(app), {}, noLive, dead, + /*countMergeEvents=*/true, /*doFsync=*/true); CLOG(INFO, "Bucket") << "Applying bucket with " << live.size() << " live entries"; @@ -796,9 +844,9 @@ TEST_CASE("bucket apply bench", "[bucketbench][!hide]") a = LedgerTestUtils::generateValidAccountEntry(5); } - std::shared_ptr birth = - Bucket::fresh(app->getBucketManager(), getAppLedgerVersion(app), {}, - live, noDead, /*countMergeEvents=*/true); + std::shared_ptr birth = Bucket::fresh( + app->getBucketManager(), getAppLedgerVersion(app), {}, live, noDead, + /*countMergeEvents=*/true, /*doFsync=*/true); CLOG(INFO, "Bucket") << "Applying bucket with " << live.size() << " live entries"; diff --git a/src/history/StateSnapshot.cpp b/src/history/StateSnapshot.cpp index cf064a0508..7cd2b402eb 100644 --- a/src/history/StateSnapshot.cpp +++ b/src/history/StateSnapshot.cpp @@ -87,7 +87,9 @@ StateSnapshot::writeHistoryBlocks() const uint32_t begin, count; size_t nHeaders; { - XDROutputFileStream ledgerOut, txOut, txResultOut, scpHistory; + bool doFsync = !mApp.getConfig().DISABLE_XDR_FSYNC; + XDROutputFileStream ledgerOut(doFsync), txOut(doFsync), + txResultOut(doFsync), scpHistory(doFsync); ledgerOut.open(mLedgerSnapFile->localPath_nogz()); txOut.open(mTransactionSnapFile->localPath_nogz()); txResultOut.open(mTransactionResultSnapFile->localPath_nogz()); diff --git a/src/history/test/HistoryTestsUtils.cpp b/src/history/test/HistoryTestsUtils.cpp index c96500e74a..e5c97397fc 100644 --- a/src/history/test/HistoryTestsUtils.cpp +++ b/src/history/test/HistoryTestsUtils.cpp @@ -107,7 +107,8 @@ RealGenesisTmpDirHistoryConfigurator::configure(Config& mCfg, BucketOutputIteratorForTesting::BucketOutputIteratorForTesting( std::string const& tmpDir, uint32_t protocolVersion, MergeCounters& mc) : BucketOutputIterator{tmpDir, true, - testutil::testBucketMetadata(protocolVersion), mc} + testutil::testBucketMetadata(protocolVersion), mc, + /*doFsync=*/true} { } @@ -212,7 +213,7 @@ TestLedgerChainGenerator::createHistoryFiles( uint32_t checkpoint) { FileTransferInfo ft{mTmpDir, HISTORY_FILE_TYPE_LEDGER, checkpoint}; - XDROutputFileStream ledgerOut; + XDROutputFileStream ledgerOut(/*doFsync=*/true); ledgerOut.open(ft.localPath_nogz()); for (auto& ledger : lhv) diff --git a/src/invariant/test/BucketListIsConsistentWithDatabaseTests.cpp b/src/invariant/test/BucketListIsConsistentWithDatabaseTests.cpp index 48289f125f..a9bd5d7c26 100644 --- a/src/invariant/test/BucketListIsConsistentWithDatabaseTests.cpp +++ b/src/invariant/test/BucketListIsConsistentWithDatabaseTests.cpp @@ -162,7 +162,7 @@ struct BucketListGenerator auto keepDead = BucketList::keepDeadEntries(i); { BucketOutputIterator out(bmApply.getTmpDir(), keepDead, meta, - mergeCounters); + mergeCounters, /*doFsync=*/true); for (BucketInputIterator in (level.getCurr()); in; ++in) { out.put(*in); @@ -171,7 +171,7 @@ struct BucketListGenerator } { BucketOutputIterator out(bmApply.getTmpDir(), keepDead, meta, - mergeCounters); + mergeCounters, /*doFsync=*/true); for (BucketInputIterator in (level.getSnap()); in; ++in) { out.put(*in); diff --git a/src/main/Config.cpp b/src/main/Config.cpp index e2df063743..f8de9d5cda 100644 --- a/src/main/Config.cpp +++ b/src/main/Config.cpp @@ -105,6 +105,7 @@ Config::Config() : NODE_SEED(SecretKey::random()) FAILURE_SAFETY = -1; UNSAFE_QUORUM = false; DISABLE_BUCKET_GC = false; + DISABLE_XDR_FSYNC = false; LOG_FILE_PATH = "stellar-core.%datetime{%Y.%M.%d-%H:%m:%s}.log"; BUCKET_DIR_PATH = "buckets"; @@ -661,6 +662,10 @@ Config::processConfig(std::shared_ptr t) { UNSAFE_QUORUM = readBool(item); } + else if (item.first == "DISABLE_XDR_FSYNC") + { + DISABLE_XDR_FSYNC = readBool(item); + } else if (item.first == "KNOWN_CURSORS") { KNOWN_CURSORS = readStringArray(item); diff --git a/src/main/Config.h b/src/main/Config.h index 06692303e3..a1af67b3b0 100644 --- a/src/main/Config.h +++ b/src/main/Config.h @@ -198,6 +198,17 @@ class Config : public std::enable_shared_from_this // disk usage, but it is useful for recovering of nodes. bool DISABLE_BUCKET_GC; + // If set to true, writing an XDR file (a bucket or a checkpoint) will not + // be followed by an fsync on the file. This in turn means that XDR files + // (which hold the canonical state of the ledger) may be corrupted if the + // operating system suddenly crashes or loses power, causing the node to + // diverge and get stuck on restart, or potentially even publish bad + // history. This option only exists as an escape hatch if the local + // filesystem is so unusably slow that you prefer operating without + // durability guarantees. Do not set it to true unless you're very certain + // you want to make that trade. + bool DISABLE_XDR_FSYNC; + // Set of cursors added at each startup with value '1'. std::vector KNOWN_CURSORS; diff --git a/src/test/fuzz.cpp b/src/test/fuzz.cpp index 0f70308733..ca7602161a 100644 --- a/src/test/fuzz.cpp +++ b/src/test/fuzz.cpp @@ -202,7 +202,7 @@ genfuzz(std::string const& filename) Logging::setFmt(""); size_t n = 3; LOG(INFO) << "Writing " << n << "-message random fuzz file " << filename; - XDROutputFileStream out; + XDROutputFileStream out(/*doFsync=*/false); out.open(filename); autocheck::generator gen; for (size_t i = 0; i < n; ++i) diff --git a/src/test/test.cpp b/src/test/test.cpp index 8c6e9d328f..6fe9e36916 100644 --- a/src/test/test.cpp +++ b/src/test/test.cpp @@ -168,6 +168,9 @@ getTestConfig(int instanceNumber, Config::TestDbMode mode) { case Config::TESTDB_IN_MEMORY_SQLITE: dbname << "sqlite3://:memory:"; + // When we're running on an in-memory sqlite we're + // probably not concerned with bucket durability. + thisConfig.DISABLE_XDR_FSYNC = true; break; case Config::TESTDB_ON_DISK_SQLITE: dbname << "sqlite3://" << rootDir << "test" << instanceNumber diff --git a/src/util/FileSystemException.h b/src/util/FileSystemException.h index afd5fd22a8..d8b21e7eac 100644 --- a/src/util/FileSystemException.h +++ b/src/util/FileSystemException.h @@ -4,6 +4,9 @@ // under the Apache License, Version 2.0. See the COPYING file at the root // of this distribution or at http://www.apache.org/licenses/LICENSE-2.0 +#include "util/Logging.h" +#include +#include #include namespace stellar @@ -12,6 +15,24 @@ namespace stellar class FileSystemException : public std::runtime_error { public: + static void + failWith(std::string msg) + { + CLOG(FATAL, "Fs") << msg; + throw FileSystemException(msg); + } + static void + failWithErrno(std::string msg) + { + failWith(msg + std::strerror(errno)); + } +#ifdef _WIN32 + static void + failWithGetLastError(std::string msg) + { + failWith(msg + ", code " + std::to_string(GetLastError())); + } +#endif // _WIN32 explicit FileSystemException(std::string const& msg) : std::runtime_error{msg} { diff --git a/src/util/Fs.cpp b/src/util/Fs.cpp index a6b6ac71ee..d4eeb38402 100644 --- a/src/util/Fs.cpp +++ b/src/util/Fs.cpp @@ -15,6 +15,7 @@ #ifdef _WIN32 #include #include +#include #else #include #include @@ -77,6 +78,40 @@ unlockFile(std::string const& path) } } +void +flushFileChanges(FILE* fp) +{ + int fd = _fileno(fp); + if (fd == -1) + { + FileSystemException::failWithErrno( + "fs::flushFileChanges() failed on _fileno(): "); + } + HANDLE fh = (HANDLE)_get_osfhandle(fd); + if (fh == INVALID_HANDLE_VALUE) + { + FileSystemException::failWithErrno( + "fs::flushFileChanges() failed on _get_osfhandle(): "); + } + if (FlushFileBuffers(fh) == FALSE) + { + FileSystemException::failWithGetLastError( + "fs::flushFileChanges() failed on _get_osfhandle(): "); + } +} + +bool +durableRename(std::string const& src, std::string const& dst, + std::string const& dir) +{ + if (MoveFileExA(src.c_str(), dst.c_str(), MOVEFILE_WRITE_THROUGH) == 0) + { + FileSystemException::failWithGetLastError( + "fs::durableRename() failed on MoveFileExA(): "); + } + return true; +} + bool exists(std::string const& name) { @@ -203,6 +238,49 @@ unlockFile(std::string const& path) } } +void +flushFileChanges(FILE* fp) +{ + int fd = fileno(fp); + if (fd == -1) + { + FileSystemException::failWithErrno( + "fs::flushFileChanges() failed on fileno(): "); + } + if (fsync(fd) == -1) + { + FileSystemException::failWithErrno( + "fs::flushFileChanges() failed on fsync(): "); + } +} + +bool +durableRename(std::string const& src, std::string const& dst, + std::string const& dir) +{ + if (rename(src.c_str(), dst.c_str()) != 0) + { + return false; + } + auto dfd = open(dir.c_str(), O_RDONLY); + if (dfd == -1) + { + FileSystemException::failWithErrno( + std::string("Failed to open directory ") + dir + " :"); + } + if (fsync(dfd) == -1) + { + FileSystemException::failWithErrno( + std::string("Failed to fsync directory ") + dir + " :"); + } + if (close(dfd) == -1) + { + FileSystemException::failWithErrno( + std::string("Failed to close directory ") + dir + " :"); + } + return true; +} + bool exists(std::string const& name) { diff --git a/src/util/Fs.h b/src/util/Fs.h index a05e039611..58f67da0de 100644 --- a/src/util/Fs.h +++ b/src/util/Fs.h @@ -22,6 +22,15 @@ void lockFile(std::string const& path); // unlocks a file locked with `lockFile` void unlockFile(std::string const& path); +// Call fsync() on POSIX or FlushFileBuffers() on Win32. +void flushFileChanges(FILE* fp); + +// On POSIX, do rename(src, dst) then open dir and fsync() it +// too: a necessary second step for ensuring durability. +// On Win32, do MoveFileExA with MOVEFILE_WRITE_THROUGH. +bool durableRename(std::string const& src, std::string const& dst, + std::string const& dir); + // Whether a path exists bool exists(std::string const& path); diff --git a/src/util/XDRStream.h b/src/util/XDRStream.h index 8d45c220f1..1f8ae63757 100644 --- a/src/util/XDRStream.h +++ b/src/util/XDRStream.h @@ -116,60 +116,85 @@ class XDRInputFileStream } }; +// XDROutputStream needs access to a file descriptor to do +// fsync, so we use cstdio here rather than fstreams. class XDROutputFileStream { - std::ofstream mOut; + FILE* mOut{nullptr}; std::vector mBuf; + const bool mFsyncOnClose; public: - XDROutputFileStream() + XDROutputFileStream(bool fsyncOnClose) : mFsyncOnClose(fsyncOnClose) { - mOut.exceptions(std::ifstream::failbit | std::ifstream::badbit); + } + + ~XDROutputFileStream() + { + if (mOut) + { + close(); + } } void close() { - try + if (!mOut) { - mOut.close(); + FileSystemException::failWith( + "XDROutputFileStream::close() on non-open FILE*"); } - catch (std::ios_base::failure&) + if (fflush(mOut) != 0) { - std::string msg("failed to close XDR file"); - msg += ", reason: "; - msg += std::to_string(errno); - throw FileSystemException(msg); + FileSystemException::failWithErrno( + "XDROutputFileStream::close() failed on fflush(): "); + } + if (mFsyncOnClose) + { + fs::flushFileChanges(mOut); + } + if (fclose(mOut) != 0) + { + FileSystemException::failWithErrno( + "XDROutputFileStream::close() failed on fclose(): "); } + mOut = nullptr; } void open(std::string const& filename) { - try + if (mOut) { - mOut.open(filename, std::ofstream::binary | std::ofstream::trunc); + FileSystemException::failWith( + "XDROutputFileStream::open() on already-open stream"); } - catch (std::ios_base::failure&) + mOut = fopen(filename.c_str(), "wb"); + if (!mOut) { - std::string msg("failed to open XDR file: "); - msg += filename; - msg += ", reason: "; - msg += std::to_string(errno); - CLOG(FATAL, "Fs") << msg; - throw FileSystemException(msg); + FileSystemException::failWithErrno( + std::string("XDROutputFileStream::open(\"") + filename + + "\") failed: "); } } operator bool() const { - return mOut.good(); + return (mOut && !static_cast(ferror(mOut)) && + !static_cast(feof(mOut))); } template void writeOne(T const& t, SHA256* hasher = nullptr, size_t* bytesPut = nullptr) { + if (!mOut) + { + FileSystemException::failWith( + "XDROutputFileStream::writeOne() on non-open FILE*"); + } + uint32_t sz = (uint32_t)xdr::xdr_size(t); assert(sz < 0x80000000); @@ -188,10 +213,11 @@ class XDROutputFileStream xdr::xdr_put p(mBuf.data() + 4, mBuf.data() + 4 + sz); xdr_argpack_archive(p, t); - // Note: most libraries implement ofstream::write by calling C function - // `fwrite`, which does not set errno, so there isn't much info about - // why the write failed. - mOut.write(mBuf.data(), sz + 4); + if (fwrite(mBuf.data(), 1, sz + 4, mOut) != sz + 4) + { + FileSystemException::failWithErrno( + "XDROutputFileStream::writeOne() failed:"); + } if (hasher) { diff --git a/src/util/test/XDRStreamTests.cpp b/src/util/test/XDRStreamTests.cpp index 9ae229fb40..f3fe70b24f 100644 --- a/src/util/test/XDRStreamTests.cpp +++ b/src/util/test/XDRStreamTests.cpp @@ -5,13 +5,18 @@ #include "bucket/Bucket.h" #include "ledger/test/LedgerTestUtils.h" #include "lib/catch.hpp" +#include "lib/util/format.h" +#include "test/test.h" +#include "util/Logging.h" #include "util/XDRStream.h" +#include + using namespace stellar; TEST_CASE("XDROutputFileStream fail modes", "[xdrstream]") { - XDROutputFileStream out; + XDROutputFileStream out(/*doFsync=*/true); auto filename = "someFile"; SECTION("open throws") @@ -30,10 +35,59 @@ TEST_CASE("XDROutputFileStream fail modes", "[xdrstream]") Bucket::convertToBucketEntry(false, {}, ledgerEntries, {}); REQUIRE_THROWS_AS(out.writeOne(bucketEntries[0], hasher.get(), &bytes), - std::ios_base::failure); + std::runtime_error); } SECTION("close throws") { REQUIRE_THROWS_AS(out.close(), std::runtime_error); } } + +TEST_CASE("XDROutputFileStream fsync bench", "[!hide][xdrstream][bench]") +{ + Config const& cfg = getTestConfig(0); + + auto hasher = SHA256::create(); + auto ledgerEntries = LedgerTestUtils::generateValidLedgerEntries(10000000); + auto bucketEntries = + Bucket::convertToBucketEntry(false, {}, ledgerEntries, {}); + + fs::mkpath(cfg.BUCKET_DIR_PATH); + + for (int i = 0; i < 10; ++i) + { + XDROutputFileStream outFsync(/*doFsync=*/true); + XDROutputFileStream outNoFsync(/*doFsync=*/false); + + outFsync.open( + fmt::format("{}/outFsync-{}.xdr", cfg.BUCKET_DIR_PATH, i)); + outNoFsync.open( + fmt::format("{}/outNoFsync-{}.xdr", cfg.BUCKET_DIR_PATH, i)); + + size_t bytes = 0; + auto start = std::chrono::system_clock::now(); + for (auto const& e : bucketEntries) + { + outFsync.writeOne(e, hasher.get(), &bytes); + } + outFsync.close(); + auto stop = std::chrono::system_clock::now(); + auto elapsed = + std::chrono::duration_cast(stop - start); + CLOG(INFO, "Fs") << "wrote " << bytes << " bytes to fsync file in " + << elapsed.count() << "ms"; + + bytes = 0; + start = std::chrono::system_clock::now(); + for (auto const& e : bucketEntries) + { + outNoFsync.writeOne(e, hasher.get(), &bytes); + } + outNoFsync.close(); + stop = std::chrono::system_clock::now(); + elapsed = + std::chrono::duration_cast(stop - start); + CLOG(INFO, "Fs") << "wrote " << bytes << " bytes to no-fsync file in " + << elapsed.count() << "ms"; + } +}