Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use ConditionalWork for history replay in catchup #2194

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions Builds/VisualStudio/stellar-core.vcxproj
Original file line number Diff line number Diff line change
Expand Up @@ -267,13 +267,14 @@ exit /b 0
<ClCompile Include="..\..\src\bucket\test\BucketMergeMapTests.cpp" />
<ClCompile Include="..\..\src\bucket\test\BucketTests.cpp" />
<ClCompile Include="..\..\src\catchup\ApplyBucketsWork.cpp" />
<ClCompile Include="..\..\src\catchup\ApplyLedgerChainWork.cpp" />
<ClCompile Include="..\..\src\catchup\ApplyCheckpointWork.cpp" />
<ClCompile Include="..\..\src\catchup\CatchupConfiguration.cpp" />
<ClCompile Include="..\..\src\catchup\CatchupManagerImpl.cpp" />
<ClCompile Include="..\..\src\catchup\CatchupWork.cpp" />
<ClCompile Include="..\..\src\catchup\simulation\ApplyTransactionsWork.cpp" />
<ClCompile Include="..\..\src\catchup\simulation\HistoryArchiveStream.cpp" />
<ClCompile Include="..\..\src\catchup\test\CatchupWorkTests.cpp" />
<ClCompile Include="..\..\src\catchup\DownloadApplyTxsWork.cpp" />
<ClCompile Include="..\..\src\catchup\VerifyLedgerChainWork.cpp" />
<ClCompile Include="..\..\src\crypto\ByteSliceHasher.cpp" />
<ClCompile Include="..\..\src\crypto\ECDH.cpp" />
Expand Down Expand Up @@ -556,6 +557,7 @@ exit /b 0
<ClCompile Include="..\..\src\util\Logging.cpp" />
<ClCompile Include="..\..\src\work\test\WorkTests.cpp" />
<ClCompile Include="..\..\src\work\BasicWork.cpp" />
<ClCompile Include="..\..\src\work\ConditionalWork.cpp" />
<ClCompile Include="..\..\src\work\Work.cpp" />
<ClCompile Include="..\..\src\work\WorkScheduler.cpp" />
<ClCompile Include="..\..\src\work\WorkSequence.cpp" />
Expand All @@ -576,11 +578,12 @@ exit /b 0
<ClInclude Include="..\..\src\bucket\MergeKey.h" />
<ClInclude Include="..\..\src\bucket\PublishQueueBuckets.h" />
<ClInclude Include="..\..\src\catchup\ApplyBucketsWork.h" />
<ClInclude Include="..\..\src\catchup\ApplyLedgerChainWork.h" />
<ClInclude Include="..\..\src\catchup\ApplyCheckpointWork.h" />
<ClInclude Include="..\..\src\catchup\CatchupConfiguration.h" />
<ClInclude Include="..\..\src\catchup\CatchupManager.h" />
<ClInclude Include="..\..\src\catchup\CatchupManagerImpl.h" />
<ClInclude Include="..\..\src\catchup\CatchupWork.h" />
<ClInclude Include="..\..\src\catchup\DownloadApplyTxsWork.h" />
<ClInclude Include="..\..\src\catchup\simulation\ApplyTransactionsWork.h" />
<ClInclude Include="..\..\src\catchup\simulation\HistoryArchiveStream.h" />
<ClInclude Include="..\..\src\catchup\test\CatchupWorkTests.h" />
Expand Down Expand Up @@ -789,6 +792,7 @@ exit /b 0
<ClInclude Include="..\..\src\util\XDRStream.h" />
<ClInclude Include="..\..\src\util\RandomEvictionCache.h" />
<ClInclude Include="..\..\src\work\BasicWork.h" />
<ClInclude Include="..\..\src\work\ConditionalWork.h" />
<ClInclude Include="..\..\src\work\Work.h" />
<ClInclude Include="..\..\src\work\WorkScheduler.h" />
<ClInclude Include="..\..\src\work\WorkSequence.h" />
Expand Down
16 changes: 14 additions & 2 deletions Builds/VisualStudio/stellar-core.vcxproj.filters
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,9 @@
<ClCompile Include="..\..\src\work\BasicWork.cpp">
<Filter>work</Filter>
</ClCompile>
<ClCompile Include="..\..\src\work\ConditionalWork.cpp">
<Filter>work</Filter>
</ClCompile>
<ClCompile Include="..\..\src\work\Work.cpp">
<Filter>work</Filter>
</ClCompile>
Expand Down Expand Up @@ -345,7 +348,7 @@
<ClCompile Include="..\..\src\catchup\ApplyBucketsWork.cpp">
<Filter>catchup</Filter>
</ClCompile>
<ClCompile Include="..\..\src\catchup\ApplyLedgerChainWork.cpp">
<ClCompile Include="..\..\src\catchup\ApplyCheckpointWork.cpp">
<Filter>catchup</Filter>
</ClCompile>
<ClCompile Include="..\..\src\catchup\CatchupConfiguration.cpp">
Expand All @@ -357,6 +360,9 @@
<ClCompile Include="..\..\src\catchup\CatchupWork.cpp">
<Filter>catchup</Filter>
</ClCompile>
<ClCompile Include="..\..\src\catchup\DownloadApplyTxsWork.cpp">
<Filter>catchup</Filter>
</ClCompile>
<ClCompile Include="..\..\src\catchup\VerifyLedgerChainWork.cpp">
<Filter>catchup</Filter>
</ClCompile>
Expand Down Expand Up @@ -1133,6 +1139,9 @@
<ClInclude Include="..\..\src\work\BasicWork.h">
<Filter>work</Filter>
</ClInclude>
<ClInclude Include="..\..\src\work\ConditionalWork.h">
<Filter>work</Filter>
</ClInclude>
<ClInclude Include="..\..\src\work\Work.h">
<Filter>work</Filter>
</ClInclude>
Expand Down Expand Up @@ -1211,7 +1220,7 @@
<ClInclude Include="..\..\src\catchup\ApplyBucketsWork.h">
<Filter>catchup</Filter>
</ClInclude>
<ClInclude Include="..\..\src\catchup\ApplyLedgerChainWork.h">
<ClInclude Include="..\..\src\catchup\ApplyCheckpointWork.h">
<Filter>catchup</Filter>
</ClInclude>
<ClInclude Include="..\..\src\catchup\CatchupConfiguration.h">
Expand All @@ -1226,6 +1235,9 @@
<ClInclude Include="..\..\src\catchup\CatchupWork.h">
<Filter>catchup</Filter>
</ClInclude>
<ClInclude Include="..\..\src\catchup\DownloadApplyTxsWork.h">
<Filter>catchup</Filter>
</ClInclude>
<ClInclude Include="..\..\src\catchup\VerifyLedgerChainWork.h">
<Filter>catchup</Filter>
</ClInclude>
Expand Down
1 change: 0 additions & 1 deletion src/catchup/ApplyBucketsWork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,6 @@ ApplyBucketsWork::onRun()
mBucketApplySuccess.Mark();
}

mApp.getCatchupManager().logAndUpdateCatchupStatus(true);
if (mLevel != 0)
{
--mLevel;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// under the Apache License, Version 2.0. See the COPYING file at the root
// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0

#include "catchup/ApplyLedgerChainWork.h"
#include "catchup/ApplyCheckpointWork.h"
#include "bucket/BucketManager.h"
#include "herder/LedgerCloseData.h"
#include "history/FileTransferInfo.h"
Expand All @@ -23,57 +23,65 @@
namespace stellar
{

ApplyLedgerChainWork::ApplyLedgerChainWork(
Application& app, TmpDir const& downloadDir, LedgerRange range,
LedgerHeaderHistoryEntry& lastApplied)
: BasicWork(app, "apply-ledger-chain", BasicWork::RETRY_NEVER)
ApplyCheckpointWork::ApplyCheckpointWork(Application& app,
TmpDir const& downloadDir,
LedgerRange const& range)
: BasicWork(app,
"apply-ledgers-" +
fmt::format("{}-{}", range.mFirst, range.mLast),
BasicWork::RETRY_NEVER)
, mDownloadDir(downloadDir)
, mRange(range)
, mCurrSeq(0)
, mLastApplied(lastApplied)
, mCheckpointRange(range)
, mCheckpoint(
app.getHistoryManager().checkpointContainingLedger(range.mFirst))
, mApplyLedgerSuccess(app.getMetrics().NewMeter(
{"history", "apply-ledger-chain", "success"}, "event"))
, mApplyLedgerFailure(app.getMetrics().NewMeter(
{"history", "apply-ledger-chain", "failure"}, "event"))
{
// Ledger range check to enforce application of a single checkpoint
auto const& hm = mApp.getHistoryManager();
auto low = std::max(LedgerManager::GENESIS_LEDGER_SEQ,
hm.prevCheckpointLedger(mCheckpoint));
if (mCheckpointRange.mFirst != low)
{
throw std::runtime_error(
"Ledger range start must be aligned with checkpoint start");
}
if (mCheckpointRange.mLast > mCheckpoint)
{
throw std::runtime_error(
"Ledger range must span at most 1 checkpoint worth of ledgers");
}
}

std::string
ApplyLedgerChainWork::getStatus() const
ApplyCheckpointWork::getStatus() const
{
if (getState() == State::WORK_RUNNING)
{
std::string task = "applying checkpoint";
return fmtProgress(mApp, task, mRange.mFirst, mRange.mLast, mCurrSeq);
auto lcl = mApp.getLedgerManager().getLastClosedLedgerNum();
return fmt::format("Last applied ledger: {}", lcl);
}
return BasicWork::getStatus();
}

void
ApplyLedgerChainWork::onReset()
ApplyCheckpointWork::onReset()
{
auto& lm = mApp.getLedgerManager();
auto& hm = mApp.getHistoryManager();

CLOG(INFO, "History") << fmt::format(
"Applying transactions for ledgers {}, LCL is {}", mRange.toString(),
LedgerManager::ledgerAbbrev(lm.getLastClosedLedgerHeader()));

mLastApplied = lm.getLastClosedLedgerHeader();

mCurrSeq = hm.checkpointContainingLedger(mRange.mFirst);
mHdrIn.close();
mTxIn.close();
mFilesOpen = false;
}

void
ApplyLedgerChainWork::openCurrentInputFiles()
ApplyCheckpointWork::openInputFiles()
{
mHdrIn.close();
mTxIn.close();
FileTransferInfo hi(mDownloadDir, HISTORY_FILE_TYPE_LEDGER, mCurrSeq);
FileTransferInfo ti(mDownloadDir, HISTORY_FILE_TYPE_TRANSACTIONS, mCurrSeq);
FileTransferInfo hi(mDownloadDir, HISTORY_FILE_TYPE_LEDGER, mCheckpoint);
FileTransferInfo ti(mDownloadDir, HISTORY_FILE_TYPE_TRANSACTIONS,
mCheckpoint);
CLOG(DEBUG, "History") << "Replaying ledger headers from "
<< hi.localPath_nogz();
CLOG(DEBUG, "History") << "Replaying transactions from "
Expand All @@ -85,7 +93,7 @@ ApplyLedgerChainWork::openCurrentInputFiles()
}

TxSetFramePtr
ApplyLedgerChainWork::getCurrentTxSet()
ApplyCheckpointWork::getCurrentTxSet()
{
auto& lm = mApp.getLedgerManager();
auto seq = lm.getLastClosedLedgerNum() + 1;
Expand Down Expand Up @@ -119,7 +127,7 @@ ApplyLedgerChainWork::getCurrentTxSet()
}

bool
ApplyLedgerChainWork::applyHistoryOfSingleLedger()
ApplyCheckpointWork::applyHistoryOfSingleLedger()
{
LedgerHeaderHistoryEntry hHeader;
LedgerHeader& header = hHeader.header;
Expand Down Expand Up @@ -244,35 +252,29 @@ ApplyLedgerChainWork::applyHistoryOfSingleLedger()
}

mApplyLedgerSuccess.Mark();
mLastApplied = hHeader;
return true;
}

BasicWork::State
ApplyLedgerChainWork::onRun()
ApplyCheckpointWork::onRun()
{
try
{
if (!mFilesOpen)
{
openCurrentInputFiles();
openInputFiles();
}

if (!applyHistoryOfSingleLedger())
{
mCurrSeq += mApp.getHistoryManager().getCheckpointFrequency();
mFilesOpen = false;
}

mApp.getCatchupManager().logAndUpdateCatchupStatus(true);
auto result = applyHistoryOfSingleLedger();
auto const& lm = mApp.getLedgerManager();
auto done = lm.getLastClosedLedgerNum() == mCheckpointRange.mLast;

auto& lm = mApp.getLedgerManager();
auto const& lclHeader = lm.getLastClosedLedgerHeader();
if (lclHeader.header.ledgerSeq == mRange.mLast)
if (done)
{
return State::WORK_SUCCESS;
}
return State::WORK_RUNNING;

return result ? State::WORK_RUNNING : State::WORK_FAILURE;
}
catch (InvariantDoesNotHold&)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,35 +38,33 @@ struct LedgerHeaderHistoryEntry;
*
* Contructor of this class takes some important parameters:
* * downloadDir - directory containing ledger and transaction files
* * range - range of ledgers to apply (low boundary can overlap with local
* history)
* * lastApplied - reference to last applied ledger header (which is LCL)
* * range - LedgerRange to apply, must be checkpoint-aligned,
* and cover at most one checkpoint.
*/

class ApplyLedgerChainWork : public BasicWork
class ApplyCheckpointWork : public BasicWork
{
TmpDir const& mDownloadDir;
LedgerRange const mRange;
uint32_t mCurrSeq;
LedgerRange const mCheckpointRange;
uint32_t const mCheckpoint;

XDRInputFileStream mHdrIn;
XDRInputFileStream mTxIn;
TransactionHistoryEntry mTxHistoryEntry;
LedgerHeaderHistoryEntry& mLastApplied;

medida::Meter& mApplyLedgerSuccess;
medida::Meter& mApplyLedgerFailure;

bool mFilesOpen{false};

TxSetFramePtr getCurrentTxSet();
void openCurrentInputFiles();
void openInputFiles();
bool applyHistoryOfSingleLedger();

public:
ApplyLedgerChainWork(Application& app, TmpDir const& downloadDir,
LedgerRange range,
LedgerHeaderHistoryEntry& lastApplied);
~ApplyLedgerChainWork() = default;
ApplyCheckpointWork(Application& app, TmpDir const& downloadDir,
LedgerRange const& range);
~ApplyCheckpointWork() = default;
std::string getStatus() const override;

protected:
Expand Down
29 changes: 15 additions & 14 deletions src/catchup/CatchupWork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@

#include "catchup/CatchupWork.h"
#include "catchup/ApplyBucketsWork.h"
#include "catchup/ApplyLedgerChainWork.h"
#include "catchup/ApplyCheckpointWork.h"
#include "catchup/CatchupConfiguration.h"
#include "catchup/DownloadApplyTxsWork.h"
#include "catchup/VerifyLedgerChainWork.h"
#include "history/FileTransferInfo.h"
#include "history/HistoryManager.h"
Expand Down Expand Up @@ -45,7 +46,7 @@ CatchupWork::getStatus() const
{
return mCatchupSeq->getStatus();
}
return BasicWork::getStatus();
return Work::getStatus();
}

void
Expand Down Expand Up @@ -142,24 +143,17 @@ CatchupWork::assertBucketState()
}
}

WorkSeqPtr
void
CatchupWork::downloadApplyTransactions(CatchupRange const& catchupRange)
{
auto range =
LedgerRange{catchupRange.mLedgers.mFirst, catchupRange.getLast()};
auto checkpointRange = CheckpointRange{range, mApp.getHistoryManager()};
auto getTxs = std::make_shared<BatchDownloadWork>(
mApp, checkpointRange, HISTORY_FILE_TYPE_TRANSACTIONS, *mDownloadDir);
auto applyLedgers = std::make_shared<ApplyLedgerChainWork>(
mTransactionsVerifyApplySeq = std::make_shared<DownloadApplyTxsWork>(
mApp, *mDownloadDir, range, mLastApplied);

std::vector<std::shared_ptr<BasicWork>> seq{getTxs, applyLedgers};
return std::make_shared<WorkSequence>(mApp, "download-apply-transactions",
seq, RETRY_NEVER);
}

BasicWork::State
CatchupWork::doWork()
CatchupWork::runCatchupStep()
{
// Step 1: Get history archive state
if (!mGetHistoryArchiveStateWork)
Expand Down Expand Up @@ -295,8 +289,7 @@ CatchupWork::doWork()
if (catchupRange.applyLedgers())
{
// Step 4.3: Download and apply ledger chain
mTransactionsVerifyApplySeq =
downloadApplyTransactions(catchupRange);
downloadApplyTransactions(catchupRange);
seq.push_back(mTransactionsVerifyApplySeq);
}

Expand All @@ -315,6 +308,14 @@ CatchupWork::doWork()
return State::WORK_RUNNING;
}

BasicWork::State
CatchupWork::doWork()
{
auto nextState = runCatchupStep();
mApp.getCatchupManager().logAndUpdateCatchupStatus(true);
return nextState;
}

void
CatchupWork::onFailureRaise()
{
Expand Down
Loading