Skip to content

Commit

Permalink
Merge pull request #2194 from marta-lokhova/ledger_replay_conditional…
Browse files Browse the repository at this point in the history
…_work

Use ConditionalWork for history replay in catchup

Reviewed-by: MonsieurNicolas
  • Loading branch information
latobarita committed Aug 16, 2019
2 parents 5e26a68 + ad674d6 commit c284942
Show file tree
Hide file tree
Showing 15 changed files with 482 additions and 79 deletions.
8 changes: 6 additions & 2 deletions Builds/VisualStudio/stellar-core.vcxproj
Original file line number Diff line number Diff line change
Expand Up @@ -267,13 +267,14 @@ exit /b 0
<ClCompile Include="..\..\src\bucket\test\BucketMergeMapTests.cpp" />
<ClCompile Include="..\..\src\bucket\test\BucketTests.cpp" />
<ClCompile Include="..\..\src\catchup\ApplyBucketsWork.cpp" />
<ClCompile Include="..\..\src\catchup\ApplyLedgerChainWork.cpp" />
<ClCompile Include="..\..\src\catchup\ApplyCheckpointWork.cpp" />
<ClCompile Include="..\..\src\catchup\CatchupConfiguration.cpp" />
<ClCompile Include="..\..\src\catchup\CatchupManagerImpl.cpp" />
<ClCompile Include="..\..\src\catchup\CatchupWork.cpp" />
<ClCompile Include="..\..\src\catchup\simulation\ApplyTransactionsWork.cpp" />
<ClCompile Include="..\..\src\catchup\simulation\HistoryArchiveStream.cpp" />
<ClCompile Include="..\..\src\catchup\test\CatchupWorkTests.cpp" />
<ClCompile Include="..\..\src\catchup\DownloadApplyTxsWork.cpp" />
<ClCompile Include="..\..\src\catchup\VerifyLedgerChainWork.cpp" />
<ClCompile Include="..\..\src\crypto\ByteSliceHasher.cpp" />
<ClCompile Include="..\..\src\crypto\ECDH.cpp" />
Expand Down Expand Up @@ -556,6 +557,7 @@ exit /b 0
<ClCompile Include="..\..\src\util\Logging.cpp" />
<ClCompile Include="..\..\src\work\test\WorkTests.cpp" />
<ClCompile Include="..\..\src\work\BasicWork.cpp" />
<ClCompile Include="..\..\src\work\ConditionalWork.cpp" />
<ClCompile Include="..\..\src\work\Work.cpp" />
<ClCompile Include="..\..\src\work\WorkScheduler.cpp" />
<ClCompile Include="..\..\src\work\WorkSequence.cpp" />
Expand All @@ -576,11 +578,12 @@ exit /b 0
<ClInclude Include="..\..\src\bucket\MergeKey.h" />
<ClInclude Include="..\..\src\bucket\PublishQueueBuckets.h" />
<ClInclude Include="..\..\src\catchup\ApplyBucketsWork.h" />
<ClInclude Include="..\..\src\catchup\ApplyLedgerChainWork.h" />
<ClInclude Include="..\..\src\catchup\ApplyCheckpointWork.h" />
<ClInclude Include="..\..\src\catchup\CatchupConfiguration.h" />
<ClInclude Include="..\..\src\catchup\CatchupManager.h" />
<ClInclude Include="..\..\src\catchup\CatchupManagerImpl.h" />
<ClInclude Include="..\..\src\catchup\CatchupWork.h" />
<ClInclude Include="..\..\src\catchup\DownloadApplyTxsWork.h" />
<ClInclude Include="..\..\src\catchup\simulation\ApplyTransactionsWork.h" />
<ClInclude Include="..\..\src\catchup\simulation\HistoryArchiveStream.h" />
<ClInclude Include="..\..\src\catchup\test\CatchupWorkTests.h" />
Expand Down Expand Up @@ -789,6 +792,7 @@ exit /b 0
<ClInclude Include="..\..\src\util\XDRStream.h" />
<ClInclude Include="..\..\src\util\RandomEvictionCache.h" />
<ClInclude Include="..\..\src\work\BasicWork.h" />
<ClInclude Include="..\..\src\work\ConditionalWork.h" />
<ClInclude Include="..\..\src\work\Work.h" />
<ClInclude Include="..\..\src\work\WorkScheduler.h" />
<ClInclude Include="..\..\src\work\WorkSequence.h" />
Expand Down
16 changes: 14 additions & 2 deletions Builds/VisualStudio/stellar-core.vcxproj.filters
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,9 @@
<ClCompile Include="..\..\src\work\BasicWork.cpp">
<Filter>work</Filter>
</ClCompile>
<ClCompile Include="..\..\src\work\ConditionalWork.cpp">
<Filter>work</Filter>
</ClCompile>
<ClCompile Include="..\..\src\work\Work.cpp">
<Filter>work</Filter>
</ClCompile>
Expand Down Expand Up @@ -345,7 +348,7 @@
<ClCompile Include="..\..\src\catchup\ApplyBucketsWork.cpp">
<Filter>catchup</Filter>
</ClCompile>
<ClCompile Include="..\..\src\catchup\ApplyLedgerChainWork.cpp">
<ClCompile Include="..\..\src\catchup\ApplyCheckpointWork.cpp">
<Filter>catchup</Filter>
</ClCompile>
<ClCompile Include="..\..\src\catchup\CatchupConfiguration.cpp">
Expand All @@ -357,6 +360,9 @@
<ClCompile Include="..\..\src\catchup\CatchupWork.cpp">
<Filter>catchup</Filter>
</ClCompile>
<ClCompile Include="..\..\src\catchup\DownloadApplyTxsWork.cpp">
<Filter>catchup</Filter>
</ClCompile>
<ClCompile Include="..\..\src\catchup\VerifyLedgerChainWork.cpp">
<Filter>catchup</Filter>
</ClCompile>
Expand Down Expand Up @@ -1133,6 +1139,9 @@
<ClInclude Include="..\..\src\work\BasicWork.h">
<Filter>work</Filter>
</ClInclude>
<ClInclude Include="..\..\src\work\ConditionalWork.h">
<Filter>work</Filter>
</ClInclude>
<ClInclude Include="..\..\src\work\Work.h">
<Filter>work</Filter>
</ClInclude>
Expand Down Expand Up @@ -1211,7 +1220,7 @@
<ClInclude Include="..\..\src\catchup\ApplyBucketsWork.h">
<Filter>catchup</Filter>
</ClInclude>
<ClInclude Include="..\..\src\catchup\ApplyLedgerChainWork.h">
<ClInclude Include="..\..\src\catchup\ApplyCheckpointWork.h">
<Filter>catchup</Filter>
</ClInclude>
<ClInclude Include="..\..\src\catchup\CatchupConfiguration.h">
Expand All @@ -1226,6 +1235,9 @@
<ClInclude Include="..\..\src\catchup\CatchupWork.h">
<Filter>catchup</Filter>
</ClInclude>
<ClInclude Include="..\..\src\catchup\DownloadApplyTxsWork.h">
<Filter>catchup</Filter>
</ClInclude>
<ClInclude Include="..\..\src\catchup\VerifyLedgerChainWork.h">
<Filter>catchup</Filter>
</ClInclude>
Expand Down
1 change: 0 additions & 1 deletion src/catchup/ApplyBucketsWork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,6 @@ ApplyBucketsWork::onRun()
mBucketApplySuccess.Mark();
}

mApp.getCatchupManager().logAndUpdateCatchupStatus(true);
if (mLevel != 0)
{
--mLevel;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// under the Apache License, Version 2.0. See the COPYING file at the root
// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0

#include "catchup/ApplyLedgerChainWork.h"
#include "catchup/ApplyCheckpointWork.h"
#include "bucket/BucketManager.h"
#include "herder/LedgerCloseData.h"
#include "history/FileTransferInfo.h"
Expand All @@ -23,57 +23,65 @@
namespace stellar
{

ApplyLedgerChainWork::ApplyLedgerChainWork(
Application& app, TmpDir const& downloadDir, LedgerRange range,
LedgerHeaderHistoryEntry& lastApplied)
: BasicWork(app, "apply-ledger-chain", BasicWork::RETRY_NEVER)
ApplyCheckpointWork::ApplyCheckpointWork(Application& app,
TmpDir const& downloadDir,
LedgerRange const& range)
: BasicWork(app,
"apply-ledgers-" +
fmt::format("{}-{}", range.mFirst, range.mLast),
BasicWork::RETRY_NEVER)
, mDownloadDir(downloadDir)
, mRange(range)
, mCurrSeq(0)
, mLastApplied(lastApplied)
, mCheckpointRange(range)
, mCheckpoint(
app.getHistoryManager().checkpointContainingLedger(range.mFirst))
, mApplyLedgerSuccess(app.getMetrics().NewMeter(
{"history", "apply-ledger-chain", "success"}, "event"))
, mApplyLedgerFailure(app.getMetrics().NewMeter(
{"history", "apply-ledger-chain", "failure"}, "event"))
{
// Ledger range check to enforce application of a single checkpoint
auto const& hm = mApp.getHistoryManager();
auto low = std::max(LedgerManager::GENESIS_LEDGER_SEQ,
hm.prevCheckpointLedger(mCheckpoint));
if (mCheckpointRange.mFirst != low)
{
throw std::runtime_error(
"Ledger range start must be aligned with checkpoint start");
}
if (mCheckpointRange.mLast > mCheckpoint)
{
throw std::runtime_error(
"Ledger range must span at most 1 checkpoint worth of ledgers");
}
}

std::string
ApplyLedgerChainWork::getStatus() const
ApplyCheckpointWork::getStatus() const
{
if (getState() == State::WORK_RUNNING)
{
std::string task = "applying checkpoint";
return fmtProgress(mApp, task, mRange.mFirst, mRange.mLast, mCurrSeq);
auto lcl = mApp.getLedgerManager().getLastClosedLedgerNum();
return fmt::format("Last applied ledger: {}", lcl);
}
return BasicWork::getStatus();
}

void
ApplyLedgerChainWork::onReset()
ApplyCheckpointWork::onReset()
{
auto& lm = mApp.getLedgerManager();
auto& hm = mApp.getHistoryManager();

CLOG(INFO, "History") << fmt::format(
"Applying transactions for ledgers {}, LCL is {}", mRange.toString(),
LedgerManager::ledgerAbbrev(lm.getLastClosedLedgerHeader()));

mLastApplied = lm.getLastClosedLedgerHeader();

mCurrSeq = hm.checkpointContainingLedger(mRange.mFirst);
mHdrIn.close();
mTxIn.close();
mFilesOpen = false;
}

void
ApplyLedgerChainWork::openCurrentInputFiles()
ApplyCheckpointWork::openInputFiles()
{
mHdrIn.close();
mTxIn.close();
FileTransferInfo hi(mDownloadDir, HISTORY_FILE_TYPE_LEDGER, mCurrSeq);
FileTransferInfo ti(mDownloadDir, HISTORY_FILE_TYPE_TRANSACTIONS, mCurrSeq);
FileTransferInfo hi(mDownloadDir, HISTORY_FILE_TYPE_LEDGER, mCheckpoint);
FileTransferInfo ti(mDownloadDir, HISTORY_FILE_TYPE_TRANSACTIONS,
mCheckpoint);
CLOG(DEBUG, "History") << "Replaying ledger headers from "
<< hi.localPath_nogz();
CLOG(DEBUG, "History") << "Replaying transactions from "
Expand All @@ -85,7 +93,7 @@ ApplyLedgerChainWork::openCurrentInputFiles()
}

TxSetFramePtr
ApplyLedgerChainWork::getCurrentTxSet()
ApplyCheckpointWork::getCurrentTxSet()
{
auto& lm = mApp.getLedgerManager();
auto seq = lm.getLastClosedLedgerNum() + 1;
Expand Down Expand Up @@ -119,7 +127,7 @@ ApplyLedgerChainWork::getCurrentTxSet()
}

bool
ApplyLedgerChainWork::applyHistoryOfSingleLedger()
ApplyCheckpointWork::applyHistoryOfSingleLedger()
{
LedgerHeaderHistoryEntry hHeader;
LedgerHeader& header = hHeader.header;
Expand Down Expand Up @@ -244,35 +252,29 @@ ApplyLedgerChainWork::applyHistoryOfSingleLedger()
}

mApplyLedgerSuccess.Mark();
mLastApplied = hHeader;
return true;
}

BasicWork::State
ApplyLedgerChainWork::onRun()
ApplyCheckpointWork::onRun()
{
try
{
if (!mFilesOpen)
{
openCurrentInputFiles();
openInputFiles();
}

if (!applyHistoryOfSingleLedger())
{
mCurrSeq += mApp.getHistoryManager().getCheckpointFrequency();
mFilesOpen = false;
}

mApp.getCatchupManager().logAndUpdateCatchupStatus(true);
auto result = applyHistoryOfSingleLedger();
auto const& lm = mApp.getLedgerManager();
auto done = lm.getLastClosedLedgerNum() == mCheckpointRange.mLast;

auto& lm = mApp.getLedgerManager();
auto const& lclHeader = lm.getLastClosedLedgerHeader();
if (lclHeader.header.ledgerSeq == mRange.mLast)
if (done)
{
return State::WORK_SUCCESS;
}
return State::WORK_RUNNING;

return result ? State::WORK_RUNNING : State::WORK_FAILURE;
}
catch (InvariantDoesNotHold&)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,35 +38,33 @@ struct LedgerHeaderHistoryEntry;
*
* Contructor of this class takes some important parameters:
* * downloadDir - directory containing ledger and transaction files
* * range - range of ledgers to apply (low boundary can overlap with local
* history)
* * lastApplied - reference to last applied ledger header (which is LCL)
* * range - LedgerRange to apply, must be checkpoint-aligned,
* and cover at most one checkpoint.
*/

class ApplyLedgerChainWork : public BasicWork
class ApplyCheckpointWork : public BasicWork
{
TmpDir const& mDownloadDir;
LedgerRange const mRange;
uint32_t mCurrSeq;
LedgerRange const mCheckpointRange;
uint32_t const mCheckpoint;

XDRInputFileStream mHdrIn;
XDRInputFileStream mTxIn;
TransactionHistoryEntry mTxHistoryEntry;
LedgerHeaderHistoryEntry& mLastApplied;

medida::Meter& mApplyLedgerSuccess;
medida::Meter& mApplyLedgerFailure;

bool mFilesOpen{false};

TxSetFramePtr getCurrentTxSet();
void openCurrentInputFiles();
void openInputFiles();
bool applyHistoryOfSingleLedger();

public:
ApplyLedgerChainWork(Application& app, TmpDir const& downloadDir,
LedgerRange range,
LedgerHeaderHistoryEntry& lastApplied);
~ApplyLedgerChainWork() = default;
ApplyCheckpointWork(Application& app, TmpDir const& downloadDir,
LedgerRange const& range);
~ApplyCheckpointWork() = default;
std::string getStatus() const override;

protected:
Expand Down
29 changes: 15 additions & 14 deletions src/catchup/CatchupWork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@

#include "catchup/CatchupWork.h"
#include "catchup/ApplyBucketsWork.h"
#include "catchup/ApplyLedgerChainWork.h"
#include "catchup/ApplyCheckpointWork.h"
#include "catchup/CatchupConfiguration.h"
#include "catchup/DownloadApplyTxsWork.h"
#include "catchup/VerifyLedgerChainWork.h"
#include "history/FileTransferInfo.h"
#include "history/HistoryManager.h"
Expand Down Expand Up @@ -45,7 +46,7 @@ CatchupWork::getStatus() const
{
return mCatchupSeq->getStatus();
}
return BasicWork::getStatus();
return Work::getStatus();
}

void
Expand Down Expand Up @@ -142,24 +143,17 @@ CatchupWork::assertBucketState()
}
}

WorkSeqPtr
void
CatchupWork::downloadApplyTransactions(CatchupRange const& catchupRange)
{
auto range =
LedgerRange{catchupRange.mLedgers.mFirst, catchupRange.getLast()};
auto checkpointRange = CheckpointRange{range, mApp.getHistoryManager()};
auto getTxs = std::make_shared<BatchDownloadWork>(
mApp, checkpointRange, HISTORY_FILE_TYPE_TRANSACTIONS, *mDownloadDir);
auto applyLedgers = std::make_shared<ApplyLedgerChainWork>(
mTransactionsVerifyApplySeq = std::make_shared<DownloadApplyTxsWork>(
mApp, *mDownloadDir, range, mLastApplied);

std::vector<std::shared_ptr<BasicWork>> seq{getTxs, applyLedgers};
return std::make_shared<WorkSequence>(mApp, "download-apply-transactions",
seq, RETRY_NEVER);
}

BasicWork::State
CatchupWork::doWork()
CatchupWork::runCatchupStep()
{
// Step 1: Get history archive state
if (!mGetHistoryArchiveStateWork)
Expand Down Expand Up @@ -296,8 +290,7 @@ CatchupWork::doWork()
if (catchupRange.applyLedgers())
{
// Step 4.3: Download and apply ledger chain
mTransactionsVerifyApplySeq =
downloadApplyTransactions(catchupRange);
downloadApplyTransactions(catchupRange);
seq.push_back(mTransactionsVerifyApplySeq);
}

Expand All @@ -316,6 +309,14 @@ CatchupWork::doWork()
return State::WORK_RUNNING;
}

BasicWork::State
CatchupWork::doWork()
{
auto nextState = runCatchupStep();
mApp.getCatchupManager().logAndUpdateCatchupStatus(true);
return nextState;
}

void
CatchupWork::onFailureRaise()
{
Expand Down
Loading

0 comments on commit c284942

Please sign in to comment.