Skip to content

Commit

Permalink
Merge pull request #2211 from marta-lokhova/avoid_gzip_race_on_publish
Browse files Browse the repository at this point in the history
Refactor publish-related work to avoid races

Reviewed-by: MonsieurNicolas
  • Loading branch information
latobarita committed Aug 15, 2019
2 parents c1937d0 + c056dbf commit 5e26a68
Show file tree
Hide file tree
Showing 31 changed files with 469 additions and 233 deletions.
4 changes: 2 additions & 2 deletions Builds/VisualStudio/stellar-core.vcxproj
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ exit /b 0
<ClCompile Include="..\..\src\historywork\GetRemoteFileWork.cpp" />
<ClCompile Include="..\..\src\historywork\GunzipFileWork.cpp" />
<ClCompile Include="..\..\src\historywork\GzipFileWork.cpp" />
<ClCompile Include="..\..\src\historywork\GzipAndPutFilesWork.cpp" />
<ClCompile Include="..\..\src\historywork\PutFilesWork.cpp" />
<ClCompile Include="..\..\src\historywork\MakeRemoteDirWork.cpp" />
<ClCompile Include="..\..\src\historywork\Progress.cpp" />
<ClCompile Include="..\..\src\historywork\PublishWork.cpp" />
Expand Down Expand Up @@ -626,7 +626,7 @@ exit /b 0
<ClInclude Include="..\..\src\historywork\GetRemoteFileWork.h" />
<ClInclude Include="..\..\src\historywork\GunzipFileWork.h" />
<ClInclude Include="..\..\src\historywork\GzipFileWork.h" />
<ClInclude Include="..\..\src\historywork\GzipAndPutFilesWork.h" />
<ClInclude Include="..\..\src\historywork\PutFilesWork.h" />
<ClInclude Include="..\..\src\historywork\MakeRemoteDirWork.h" />
<ClInclude Include="..\..\src\historywork\Progress.h" />
<ClInclude Include="..\..\src\historywork\PublishWork.h" />
Expand Down
4 changes: 2 additions & 2 deletions Builds/VisualStudio/stellar-core.vcxproj.filters
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@
<ClCompile Include="..\..\src\historywork\GunzipFileWork.cpp">
<Filter>historyWork</Filter>
</ClCompile>
<ClCompile Include="..\..\src\historywork\GzipAndPutFilesWork.cpp">
<ClCompile Include="..\..\src\historywork\PutFilesWork.cpp">
<Filter>historyWork</Filter>
</ClCompile>
<ClCompile Include="..\..\src\historywork\GzipFileWork.cpp">
Expand Down Expand Up @@ -1256,7 +1256,7 @@
<ClInclude Include="..\..\src\historywork\GzipFileWork.h">
<Filter>historyWork</Filter>
</ClInclude>
<ClInclude Include="..\..\src\historywork\GzipAndPutFilesWork.h">
<ClInclude Include="..\..\src\historywork\PutFilesWork.h">
<Filter>historyWork</Filter>
</ClInclude>
<ClInclude Include="..\..\src\historywork\MakeRemoteDirWork.h">
Expand Down
3 changes: 2 additions & 1 deletion src/bucket/test/BucketManagerTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,8 @@ TEST_CASE("bucketmanager reattach HAS from publish queue to finished merge",
auto& lm = app->getLedgerManager();
hm.setPublicationEnabled(false);
app->start();
app->getHistoryArchiveManager().initializeHistoryArchive("test");
app->getHistoryArchiveManager().initializeHistoryArchive(
tcfg.getArchiveDirName());
while (hm.getPublishQueueCount() < 5)
{
CLOG(INFO, "Bucket")
Expand Down
37 changes: 19 additions & 18 deletions src/catchup/CatchupWork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ CatchupWork::doReset()
LedgerNumHashPair(lcl.header.ledgerSeq, make_optional<Hash>(lcl.hash));
mCatchupSeq.reset();
mGetBucketStateWork.reset();
mRemoteState = {};
mApplyBucketsRemoteState = {};
mLastApplied = mApp.getLedgerManager().getLastClosedLedgerHeader();
}

Expand All @@ -73,7 +71,8 @@ CatchupWork::hasAnyLedgersToCatchupTo() const
assert(mGetHistoryArchiveStateWork);
assert(mGetHistoryArchiveStateWork->getState() == State::WORK_SUCCESS);

return mLastClosedLedgerHashPair.first <= mRemoteState.currentLedger;
return mLastClosedLedgerHashPair.first <=
mGetHistoryArchiveStateWork->getHistoryArchiveState().currentLedger;
}

void
Expand All @@ -99,19 +98,19 @@ CatchupWork::downloadVerifyLedgerChain(CatchupRange const& catchupRange,
bool
CatchupWork::alreadyHaveBucketsHistoryArchiveState(uint32_t atCheckpoint) const
{
return atCheckpoint == mRemoteState.currentLedger;
return atCheckpoint ==
mGetHistoryArchiveStateWork->getHistoryArchiveState().currentLedger;
}

WorkSeqPtr
CatchupWork::downloadApplyBuckets()
{
std::vector<std::string> hashes =
mApplyBucketsRemoteState.differingBuckets(mLocalState);
auto const& has = mGetBucketStateWork->getHistoryArchiveState();
std::vector<std::string> hashes = has.differingBuckets(mLocalState);
auto getBuckets = std::make_shared<DownloadBucketsWork>(
mApp, mBuckets, hashes, *mDownloadDir);
auto applyBuckets = std::make_shared<ApplyBucketsWork>(
mApp, mBuckets, mApplyBucketsRemoteState,
mVerifiedLedgerRangeStart.header.ledgerVersion);
mApp, mBuckets, has, mVerifiedLedgerRangeStart.header.ledgerVersion);

std::vector<std::shared_ptr<BasicWork>> seq{getBuckets, applyBuckets};
return std::make_shared<WorkSequence>(mApp, "download-verify-apply-buckets",
Expand All @@ -121,11 +120,12 @@ CatchupWork::downloadApplyBuckets()
void
CatchupWork::assertBucketState()
{
// Consistency check: mRemoteState and mVerifiedLedgerRangeStart should
auto const& has = mGetBucketStateWork->getHistoryArchiveState();

// Consistency check: remote state and mVerifiedLedgerRangeStart should
// point to the same ledger and the same BucketList.
assert(mApplyBucketsRemoteState.currentLedger ==
mVerifiedLedgerRangeStart.header.ledgerSeq);
assert(mApplyBucketsRemoteState.getBucketListHash() ==
assert(has.currentLedger == mVerifiedLedgerRangeStart.header.ledgerSeq);
assert(has.getBucketListHash() ==
mVerifiedLedgerRangeStart.header.bucketListHash);

// Consistency check: LCL should be in the _past_ from
Expand Down Expand Up @@ -181,20 +181,21 @@ CatchupWork::doWork()
mCatchupConfiguration.toLedger() + 1) -
1;
mGetHistoryArchiveStateWork =
addWork<GetHistoryArchiveStateWork>(mRemoteState, toCheckpoint);
addWork<GetHistoryArchiveStateWork>(toCheckpoint);
return State::WORK_RUNNING;
}
else if (mGetHistoryArchiveStateWork->getState() != State::WORK_SUCCESS)
{
return mGetHistoryArchiveStateWork->getState();
}

auto const& has = mGetHistoryArchiveStateWork->getHistoryArchiveState();
// Step 2: Compare local and remote states
if (!hasAnyLedgersToCatchupTo())
{
CLOG(INFO, "History") << "*";
CLOG(INFO, "History")
<< "* Target ledger " << mRemoteState.currentLedger
<< "* Target ledger " << has.currentLedger
<< " is not newer than last closed ledger "
<< mLastClosedLedgerHashPair.first << " - nothing to do";

Expand All @@ -218,7 +219,7 @@ CatchupWork::doWork()
}

auto resolvedConfiguration =
mCatchupConfiguration.resolve(mRemoteState.currentLedger);
mCatchupConfiguration.resolve(has.currentLedger);
auto catchupRange =
CatchupRange{mLastClosedLedgerHashPair.first, resolvedConfiguration,
mApp.getHistoryManager()};
Expand All @@ -231,8 +232,8 @@ CatchupWork::doWork()
{
if (!mGetBucketStateWork)
{
mGetBucketStateWork = addWork<GetHistoryArchiveStateWork>(
mApplyBucketsRemoteState, applyBucketsAt);
mGetBucketStateWork =
addWork<GetHistoryArchiveStateWork>(applyBucketsAt);
}
if (mGetBucketStateWork->getState() != State::WORK_SUCCESS)
{
Expand All @@ -241,7 +242,7 @@ CatchupWork::doWork()
}
else
{
mApplyBucketsRemoteState = mRemoteState;
mGetBucketStateWork = mGetHistoryArchiveStateWork;
}
}

Expand Down
7 changes: 3 additions & 4 deletions src/catchup/CatchupWork.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "catchup/CatchupConfiguration.h"
#include "catchup/VerifyLedgerChainWork.h"
#include "history/HistoryArchive.h"
#include "historywork/GetHistoryArchiveStateWork.h"
#include "ledger/LedgerRange.h"
#include "work/Work.h"
#include "work/WorkSequence.h"
Expand Down Expand Up @@ -136,17 +137,15 @@ class CatchupWork : public Work
std::string getStatus() const override;

private:
HistoryArchiveState mRemoteState;
HistoryArchiveState mApplyBucketsRemoteState;
LedgerNumHashPair mLastClosedLedgerHashPair;
CatchupConfiguration const mCatchupConfiguration;
LedgerHeaderHistoryEntry mVerifiedLedgerRangeStart;
LedgerHeaderHistoryEntry mLastApplied;
ProgressHandler mProgressHandler;
bool mBucketsAppliedEmitted{false};

std::shared_ptr<BasicWork> mGetHistoryArchiveStateWork;
std::shared_ptr<BasicWork> mGetBucketStateWork;
std::shared_ptr<GetHistoryArchiveStateWork> mGetHistoryArchiveStateWork;
std::shared_ptr<GetHistoryArchiveStateWork> mGetBucketStateWork;

WorkSeqPtr mDownloadVerifyLedgersSeq;
std::shared_ptr<VerifyLedgerChainWork> mVerifyLedgers;
Expand Down
2 changes: 1 addition & 1 deletion src/history/HistoryArchive.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ HistoryArchiveState::localName(Application& app, std::string const& archiveName)
}

Hash
HistoryArchiveState::getBucketListHash()
HistoryArchiveState::getBucketListHash() const
{
// NB: This hash algorithm has to match "what the BucketList does" to
// calculate its BucketList hash exactly. It's not a particularly complex
Expand Down
2 changes: 1 addition & 1 deletion src/history/HistoryArchive.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ struct HistoryArchiveState
std::string const& archiveName);

// Return cumulative hash of the bucketlist for this archive state.
Hash getBucketListHash();
Hash getBucketListHash() const;

// Return vector of buckets to fetch/apply to turn 'other' into 'this'.
// Vector is sorted from largest/highest-numbered bucket to smallest/lowest,
Expand Down
4 changes: 1 addition & 3 deletions src/history/HistoryArchiveManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,9 @@ HistoryArchiveManager::initializeHistoryArchive(std::string const& arch) const
auto& ws = mApp.getWorkScheduler();

// First check that there's no existing HAS in the archive
HistoryArchiveState existing;
CLOG(INFO, "History") << "Probing history archive '" << arch
<< "' for existing state";
auto getHas =
ws.executeWork<GetHistoryArchiveStateWork>(existing, 0, archive, 0);
auto getHas = ws.executeWork<GetHistoryArchiveStateWork>(0, archive, 0);
if (getHas->getState() == BasicWork::State::WORK_SUCCESS)
{
CLOG(ERROR, "History")
Expand Down
26 changes: 26 additions & 0 deletions src/history/StateSnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,4 +154,30 @@ StateSnapshot::writeHistoryBlocks() const

return true;
}

std::vector<std::shared_ptr<FileTransferInfo>>
StateSnapshot::differingHASFiles(HistoryArchiveState const& other)
{
std::vector<std::shared_ptr<FileTransferInfo>> files{};
auto addIfExists = [&](std::shared_ptr<FileTransferInfo> const& f) {
if (f && fs::exists(f->localPath_nogz()))
{
files.push_back(f);
}
};

addIfExists(mLedgerSnapFile);
addIfExists(mTransactionSnapFile);
addIfExists(mTransactionResultSnapFile);
addIfExists(mSCPHistorySnapFile);

for (auto const& hash : mLocalState.differingBuckets(other))
{
auto b = mApp.getBucketManager().getBucketByHash(hexToBin256(hash));
assert(b);
addIfExists(std::make_shared<FileTransferInfo>(*b));
}

return files;
}
}
2 changes: 2 additions & 0 deletions src/history/StateSnapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,7 @@ struct StateSnapshot : public std::enable_shared_from_this<StateSnapshot>
StateSnapshot(Application& app, HistoryArchiveState const& state);
void makeLive();
bool writeHistoryBlocks() const;
std::vector<std::shared_ptr<FileTransferInfo>>
differingHASFiles(HistoryArchiveState const& other);
};
}
56 changes: 44 additions & 12 deletions src/history/test/HistoryTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ TEST_CASE("HistoryArchiveState get_put", "[history]")

auto archive =
catchupSimulation.getApp().getHistoryArchiveManager().getHistoryArchive(
"test");
catchupSimulation.getHistoryConfigurator().getArchiveDirName());
REQUIRE(archive);

has.resolveAllFutures();
Expand All @@ -92,9 +92,9 @@ TEST_CASE("HistoryArchiveState get_put", "[history]")
auto put = wm.executeWork<PutHistoryArchiveStateWork>(has, archive);
REQUIRE(put->getState() == BasicWork::State::WORK_SUCCESS);

HistoryArchiveState has2;
auto get = wm.executeWork<GetHistoryArchiveStateWork>(has2, 0, archive);
auto get = wm.executeWork<GetHistoryArchiveStateWork>(0, archive);
REQUIRE(get->getState() == BasicWork::State::WORK_SUCCESS);
HistoryArchiveState has2 = get->getHistoryArchiveState();
REQUIRE(has2.currentLedger == 0x1234);
}

Expand All @@ -108,10 +108,12 @@ TEST_CASE("History bucket verification", "[history][catchup]")
auto cg = std::make_shared<TmpDirHistoryConfigurator>();
cg->configure(cfg, true);
Application::pointer app = createTestApplication(clock, cfg);
REQUIRE(app->getHistoryArchiveManager().initializeHistoryArchive("test"));
REQUIRE(app->getHistoryArchiveManager().initializeHistoryArchive(
cg->getArchiveDirName()));

auto bucketGenerator = TestBucketGenerator{
*app, app->getHistoryArchiveManager().getHistoryArchive("test")};
*app, app->getHistoryArchiveManager().getHistoryArchive(
cg->getArchiveDirName())};
std::vector<std::string> hashes;
auto& wm = app->getWorkScheduler();
std::map<std::string, std::shared_ptr<Bucket>> mBuckets;
Expand Down Expand Up @@ -171,7 +173,8 @@ TEST_CASE("Ledger chain verification", "[ledgerheaderverification]")
auto cg = std::make_shared<TmpDirHistoryConfigurator>();
cg->configure(cfg, true);
Application::pointer app = createTestApplication(clock, cfg);
REQUIRE(app->getHistoryArchiveManager().initializeHistoryArchive("test"));
REQUIRE(app->getHistoryArchiveManager().initializeHistoryArchive(
cg->getArchiveDirName()));

auto tmpDir = app->getTmpDirManager().tmpDir("tmp-chain-test");
auto& wm = app->getWorkScheduler();
Expand All @@ -185,7 +188,9 @@ TEST_CASE("Ledger chain verification", "[ledgerheaderverification]")
initLedger + app->getHistoryManager().getCheckpointFrequency() * 10};
CheckpointRange checkpointRange{ledgerRange, app->getHistoryManager()};
auto ledgerChainGenerator = TestLedgerChainGenerator{
*app, app->getHistoryArchiveManager().getHistoryArchive("test"),
*app,
app->getHistoryArchiveManager().getHistoryArchive(
cg->getArchiveDirName()),
checkpointRange, tmpDir};

auto checkExpectedBehavior = [&](Work::State expectedState,
Expand Down Expand Up @@ -289,6 +294,32 @@ TEST_CASE("History publish", "[history][publish]")
catchupSimulation.ensureOfflineCatchupPossible(checkpointLedger);
}

TEST_CASE("History publish to multiple archives", "[history]")
{
Config cfg(getTestConfig());
VirtualClock clock;
auto cg =
std::make_shared<MultiArchiveHistoryConfigurator>(/* numArchives */ 3);
CatchupSimulation catchupSimulation{VirtualClock::VIRTUAL_TIME, cg, false};

auto& app = catchupSimulation.getApp();
for (auto const& cfgtor : cg->getConfigurators())
{
CHECK(app.getHistoryArchiveManager().initializeHistoryArchive(
cfgtor->getArchiveDirName()));
}

app.start();
auto checkpointLedger = catchupSimulation.getLastCheckpointLedger(2);
catchupSimulation.ensureOfflineCatchupPossible(checkpointLedger);

auto catchupApp = catchupSimulation.createCatchupApplication(
64, Config::TESTDB_ON_DISK_SQLITE, "app");

// Actually perform catchup and make sure everything is correct
REQUIRE(catchupSimulation.catchupOffline(catchupApp, checkpointLedger));
}

static std::string
resumeModeName(uint32_t count)
{
Expand Down Expand Up @@ -791,7 +822,8 @@ TEST_CASE("persist publish queue", "[history][publish][acceptance]")
{
VirtualClock clock;
Application::pointer app1 = Application::create(clock, cfg, 0);
app1->getHistoryArchiveManager().initializeHistoryArchive("test");
app1->getHistoryArchiveManager().initializeHistoryArchive(
tcfg.getArchiveDirName());
for (size_t i = 0; i < 100; ++i)
clock.crank(false);
app1->start();
Expand Down Expand Up @@ -943,14 +975,14 @@ TEST_CASE("initialize existing history store fails", "[history]")
{
VirtualClock clock;
Application::pointer app = createTestApplication(clock, cfg);
REQUIRE(
app->getHistoryArchiveManager().initializeHistoryArchive("test"));
REQUIRE(app->getHistoryArchiveManager().initializeHistoryArchive(
tcfg.getArchiveDirName()));
}

{
VirtualClock clock;
Application::pointer app = createTestApplication(clock, cfg);
REQUIRE(
!app->getHistoryArchiveManager().initializeHistoryArchive("test"));
REQUIRE(!app->getHistoryArchiveManager().initializeHistoryArchive(
tcfg.getArchiveDirName()));
}
}
Loading

0 comments on commit 5e26a68

Please sign in to comment.