Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

XDR: fsync on close (also bucket dir), with option to disable. #2204

Merged
merged 1 commit into from
Aug 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/stellar-core_example.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,18 @@ ALLOW_LOCALHOST_FOR_TESTING=false
# unlikely to be for the latest ledger.
MAXIMUM_LEDGER_CLOSETIME_DRIFT=50

# DISABLE_XDR_FSYNC (true or false) defaults to false.
# If set to true, writing an XDR file (a bucket or a checkpoint) will not
# be followed by an fsync on the file. This in turn means that XDR files
# (which hold the canonical state of the ledger) may be corrupted if the
# operating system suddenly crashes or loses power, causing the node to
# diverge and get stuck on restart, or potentially even publish bad
# history. This option only exists as an escape hatch if the local
# filesystem is so unusably slow that you prefer operating without
# durability guarantees. Do not set it to true unless you're very certain
# you want to make that trade.
DISABLE_XDR_FSYNC=false

#####################
## Tables must come at the end. (TOML you are almost perfect!)

Expand Down
10 changes: 6 additions & 4 deletions src/bucket/Bucket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ std::shared_ptr<Bucket>
Bucket::fresh(BucketManager& bucketManager, uint32_t protocolVersion,
std::vector<LedgerEntry> const& initEntries,
std::vector<LedgerEntry> const& liveEntries,
std::vector<LedgerKey> const& deadEntries, bool countMergeEvents)
std::vector<LedgerKey> const& deadEntries, bool countMergeEvents,
bool doFsync)
{
// When building fresh buckets after protocol version 10 (i.e. version
// 11-or-after) we differentiate INITENTRY from LIVEENTRY. In older
Expand All @@ -151,7 +152,8 @@ Bucket::fresh(BucketManager& bucketManager, uint32_t protocolVersion,
convertToBucketEntry(useInit, initEntries, liveEntries, deadEntries);

MergeCounters mc;
BucketOutputIterator out(bucketManager.getTmpDir(), true, meta, mc);
BucketOutputIterator out(bucketManager.getTmpDir(), true, meta, mc,
doFsync);
for (auto const& e : entries)
{
out.put(e);
Expand Down Expand Up @@ -566,7 +568,7 @@ Bucket::merge(BucketManager& bucketManager, uint32_t maxProtocolVersion,
std::shared_ptr<Bucket> const& oldBucket,
std::shared_ptr<Bucket> const& newBucket,
std::vector<std::shared_ptr<Bucket>> const& shadows,
bool keepDeadEntries, bool countMergeEvents)
bool keepDeadEntries, bool countMergeEvents, bool doFsync)
{
// This is the key operation in the scheme: merging two (read-only)
// buckets together into a new 3rd bucket, while calculating its hash,
Expand All @@ -591,7 +593,7 @@ Bucket::merge(BucketManager& bucketManager, uint32_t maxProtocolVersion,
BucketMetadata meta;
meta.ledgerVersion = protocolVersion;
BucketOutputIterator out(bucketManager.getTmpDir(), keepDeadEntries, meta,
mc);
mc, doFsync);

BucketEntryIdCmp cmp;
while (oi || ni)
Expand Down
5 changes: 3 additions & 2 deletions src/bucket/Bucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ class Bucket : public std::enable_shared_from_this<Bucket>,
fresh(BucketManager& bucketManager, uint32_t protocolVersion,
std::vector<LedgerEntry> const& initEntries,
std::vector<LedgerEntry> const& liveEntries,
std::vector<LedgerKey> const& deadEntries, bool countMergeEvents);
std::vector<LedgerKey> const& deadEntries, bool countMergeEvents,
bool doFsync);

// Merge two buckets together, producing a fresh one. Entries in `oldBucket`
// are overridden in the fresh bucket by keywise-equal entries in
Expand All @@ -101,6 +102,6 @@ class Bucket : public std::enable_shared_from_this<Bucket>,
std::shared_ptr<Bucket> const& oldBucket,
std::shared_ptr<Bucket> const& newBucket,
std::vector<std::shared_ptr<Bucket>> const& shadows,
bool keepDeadEntries, bool countMergeEvents);
bool keepDeadEntries, bool countMergeEvents, bool doFsync);
};
}
3 changes: 2 additions & 1 deletion src/bucket/BucketList.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -527,11 +527,12 @@ BucketList::addBatch(Application& app, uint32_t currLedger,
// and we are checking for an expected number of merge events on restart.
bool countMergeEvents =
!app.getConfig().ARTIFICIALLY_REDUCE_MERGE_COUNTS_FOR_TESTING;
bool doFsync = !app.getConfig().DISABLE_XDR_FSYNC;
assert(shadows.size() == 0);
mLevels[0].prepare(app, currLedger, currLedgerProtocol,
Bucket::fresh(app.getBucketManager(), currLedgerProtocol,
initEntries, liveEntries, deadEntries,
countMergeEvents),
countMergeEvents, doFsync),
shadows, countMergeEvents);
mLevels[0].commit();

Expand Down
17 changes: 15 additions & 2 deletions src/bucket/BucketManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,19 @@ BucketManagerImpl::incrMergeCounters(MergeCounters const& delta)
mMergeCounters += delta;
}

bool
BucketManagerImpl::renameBucket(std::string const& src, std::string const& dst)
{
if (mApp.getConfig().DISABLE_XDR_FSYNC)
{
return rename(src.c_str(), dst.c_str()) == 0;
}
else
{
return fs::durableRename(src, dst, getBucketDir());
}
}

std::shared_ptr<Bucket>
BucketManagerImpl::adoptFileAsBucket(std::string const& filename,
uint256 const& hash, size_t nObjects,
Expand Down Expand Up @@ -328,14 +341,14 @@ BucketManagerImpl::adoptFileAsBucket(std::string const& filename,
std::string canonicalName = bucketFilename(hash);
CLOG(DEBUG, "Bucket")
<< "Adopting bucket file " << filename << " as " << canonicalName;
if (rename(filename.c_str(), canonicalName.c_str()) != 0)
if (!renameBucket(filename, canonicalName))
{
std::string err("Failed to rename bucket :");
err += strerror(errno);
// it seems there is a race condition with external systems
// retry after sleeping for a second works around the problem
std::this_thread::sleep_for(std::chrono::seconds(1));
if (rename(filename.c_str(), canonicalName.c_str()) != 0)
if (!renameBucket(filename, canonicalName))
{
// if rename fails again, surface the original error
throw std::runtime_error(err);
Expand Down
1 change: 1 addition & 0 deletions src/bucket/BucketManagerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class BucketManagerImpl : public BucketManager
std::set<Hash> getReferencedBuckets() const;
void cleanupStaleFiles();
void cleanDir();
bool renameBucket(std::string const& src, std::string const& dst);

#ifdef BUILD_TESTS
bool mUseFakeTestValuesForNextClose{false};
Expand Down
3 changes: 2 additions & 1 deletion src/bucket/BucketOutputIterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ randomBucketName(std::string const& tmpDir)
BucketOutputIterator::BucketOutputIterator(std::string const& tmpDir,
bool keepDeadEntries,
BucketMetadata const& meta,
MergeCounters& mc)
MergeCounters& mc, bool doFsync)
: mFilename(randomBucketName(tmpDir))
, mOut(doFsync)
, mBuf(nullptr)
, mHasher(SHA256::create())
, mKeepDeadEntries(keepDeadEntries)
Expand Down
3 changes: 2 additions & 1 deletion src/bucket/BucketOutputIterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ class BucketOutputIterator
// the form of a METAENTRY; but that's not a thing the caller gets to decide
// (or forget to do), it's handled automatically.
BucketOutputIterator(std::string const& tmpDir, bool keepDeadEntries,
BucketMetadata const& meta, MergeCounters& mc);
BucketMetadata const& meta, MergeCounters& mc,
bool doFsync);

void put(BucketEntry const& e);

Expand Down
4 changes: 2 additions & 2 deletions src/bucket/FutureBucket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,6 @@ FutureBucket::startMerge(Application& app, uint32_t maxProtocolVersion,
checkState();
return;
}

using task_t = std::packaged_task<std::shared_ptr<Bucket>()>;
std::shared_ptr<task_t> task = std::make_shared<task_t>(
[curr, snap, &bm, shadows, maxProtocolVersion, countMergeEvents, level,
Expand All @@ -328,7 +327,8 @@ FutureBucket::startMerge(Application& app, uint32_t maxProtocolVersion,
{
auto res = Bucket::merge(
bm, maxProtocolVersion, curr, snap, shadows,
BucketList::keepDeadEntries(level), countMergeEvents);
BucketList::keepDeadEntries(level), countMergeEvents,
!app.getConfig().DISABLE_XDR_FSYNC);

CLOG(TRACE, "Bucket")
<< "Worker finished merging curr="
Expand Down
6 changes: 4 additions & 2 deletions src/bucket/test/BucketListTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -269,11 +269,13 @@ TEST_CASE("bucket tombstones expire at bottom level",
level.setCurr(Bucket::fresh(
bm, getAppLedgerVersion(app), {},
LedgerTestUtils::generateValidLedgerEntries(8), deadGen(8),
/*countMergeEvents=*/true));
/*countMergeEvents=*/true,
/*doFsync=*/true));
level.setSnap(Bucket::fresh(
bm, getAppLedgerVersion(app), {},
LedgerTestUtils::generateValidLedgerEntries(8), deadGen(8),
/*countMergeEvents=*/true));
/*countMergeEvents=*/true,
/*doFsync=*/true));
}

for (uint32_t i = 0; i < BucketList::kNumLevels; ++i)
Expand Down
18 changes: 9 additions & 9 deletions src/bucket/test/BucketManagerTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -281,20 +281,20 @@ TEST_CASE("bucketmanager ownership", "[bucket][bucketmanager]")
std::shared_ptr<Bucket> b1;

{
std::shared_ptr<Bucket> b2 =
Bucket::fresh(app->getBucketManager(), getAppLedgerVersion(app),
{}, live, dead, /*countMergeEvents=*/true);
std::shared_ptr<Bucket> b2 = Bucket::fresh(
app->getBucketManager(), getAppLedgerVersion(app), {}, live,
dead, /*countMergeEvents=*/true, /*doFsync=*/true);
b1 = b2;

// Bucket is referenced by b1, b2 and the BucketManager.
CHECK(b1.use_count() == 3);

std::shared_ptr<Bucket> b3 =
Bucket::fresh(app->getBucketManager(), getAppLedgerVersion(app),
{}, live, dead, /*countMergeEvents=*/true);
std::shared_ptr<Bucket> b4 =
Bucket::fresh(app->getBucketManager(), getAppLedgerVersion(app),
{}, live, dead, /*countMergeEvents=*/true);
std::shared_ptr<Bucket> b3 = Bucket::fresh(
app->getBucketManager(), getAppLedgerVersion(app), {}, live,
dead, /*countMergeEvents=*/true, /*doFsync=*/true);
std::shared_ptr<Bucket> b4 = Bucket::fresh(
app->getBucketManager(), getAppLedgerVersion(app), {}, live,
dead, /*countMergeEvents=*/true, /*doFsync=*/true);
// Bucket is referenced by b1, b2, b3, b4 and the BucketManager.
CHECK(b1.use_count() == 5);
}
Expand Down
Loading