Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Standardize work waiting pattern #2854

Merged
merged 1 commit into from Jan 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
19 changes: 1 addition & 18 deletions src/historywork/ResolveSnapshotWork.cpp
Expand Up @@ -15,7 +15,6 @@ ResolveSnapshotWork::ResolveSnapshotWork(
Application& app, std::shared_ptr<StateSnapshot> snapshot)
: BasicWork(app, "prepare-snapshot", BasicWork::RETRY_NEVER)
, mSnapshot(snapshot)
, mTimer(std::make_unique<VirtualTimer>(app.getClock()))
{
if (!mSnapshot)
{
Expand All @@ -27,11 +26,6 @@ BasicWork::State
ResolveSnapshotWork::onRun()
{
ZoneScoped;
if (mEc)
{
return State::WORK_FAILURE;
}

mSnapshot->mLocalState.prepareForPublish(mApp);
mSnapshot->mLocalState.resolveAnyReadyFutures();
if ((mApp.getLedgerManager().getLastClosedLedgerNum() >
Expand All @@ -43,18 +37,7 @@ ResolveSnapshotWork::onRun()
}
else
{
std::weak_ptr<ResolveSnapshotWork> weak(
std::static_pointer_cast<ResolveSnapshotWork>(shared_from_this()));
auto handler = [weak](asio::error_code const& ec) {
auto self = weak.lock();
if (self)
{
self->mEc = ec;
self->wakeUp();
}
};
mTimer->expires_from_now(std::chrono::seconds(1));
mTimer->async_wait(handler);
setupWaitingCallback(std::chrono::seconds(1));
return State::WORK_WAITING;
}
}
Expand Down
2 changes: 0 additions & 2 deletions src/historywork/ResolveSnapshotWork.h
Expand Up @@ -14,8 +14,6 @@ struct StateSnapshot;
class ResolveSnapshotWork : public BasicWork
{
std::shared_ptr<StateSnapshot> mSnapshot;
std::unique_ptr<VirtualTimer> mTimer;
asio::error_code mEc{asio::error_code()};

public:
ResolveSnapshotWork(Application& app,
Expand Down
4 changes: 1 addition & 3 deletions src/transactions/simulation/TxSimGenerateBucketsWork.cpp
Expand Up @@ -29,7 +29,6 @@ TxSimGenerateBucketsWork::TxSimGenerateBucketsWork(
, mMultiplier(multiplier)
, mLevel(0)
, mIsCurr(true)
, mTimer(std::make_unique<VirtualTimer>(app.getClock()))
{
}

Expand Down Expand Up @@ -121,8 +120,7 @@ TxSimGenerateBucketsWork::onRun()
// to finish
if (!checkOrStartMerges())
{
mTimer->expires_from_now(std::chrono::milliseconds(100));
mTimer->async_wait(wakeSelfUpCallback(), &VirtualTimer::onFailureNoop);
setupWaitingCallback(std::chrono::milliseconds(100));
return BasicWork::State::WORK_WAITING;
}

Expand Down
1 change: 0 additions & 1 deletion src/transactions/simulation/TxSimGenerateBucketsWork.h
Expand Up @@ -43,7 +43,6 @@ class TxSimGenerateBucketsWork : public BasicWork
std::list<std::shared_ptr<Bucket>> mIntermediateBuckets;
std::vector<FutureBucket> mMergesInProgress;
bool mIsCurr;
std::unique_ptr<VirtualTimer> mTimer;

void setFutureBucket(std::shared_ptr<Bucket> const& curr);
void startBucketGeneration(std::shared_ptr<Bucket> const& oldBucket);
Expand Down
39 changes: 39 additions & 0 deletions src/work/BasicWork.cpp
Expand Up @@ -3,6 +3,7 @@
// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0

#include "work/BasicWork.h"
#include "util/GlobalChecks.h"
#include "util/Logging.h"
#include "util/Math.h"
#include <Tracy.hpp>
Expand Down Expand Up @@ -46,12 +47,25 @@ BasicWork::~BasicWork()
assert(isDone() || mState == InternalState::PENDING);
}

void
BasicWork::resetWaitingTimer()
{
if (mWaitingTimer)
{
mWaitingTimer->cancel();
mWaitingTimer.reset();
}
}

void
BasicWork::shutdown()
{
CLOG_TRACE(Work, "Shutting down: {}", getName());
if (!isDone())
{
// We're transitioning into "ABORTING" state, so cancel
// the waiting timer
resetWaitingTimer();
setState(InternalState::ABORTING);
}
}
Expand Down Expand Up @@ -299,6 +313,9 @@ BasicWork::wakeUp(std::function<void()> innerCallback)
CLOG_TRACE(Work, "Waking up: {}", getName());
setState(InternalState::RUNNING);

// If we woke up because of the waiting timer firing, reset it
resetWaitingTimer();

if (innerCallback)
{
CLOG_TRACE(Work, "{} woke up and is executing its callback", getName());
Expand Down Expand Up @@ -327,6 +344,28 @@ BasicWork::wakeSelfUpCallback(std::function<void()> innerCallback)
return callback;
}

void
BasicWork::setupWaitingCallback(std::chrono::milliseconds wakeUpIn)
{
// Work must be running to schedule a timer
releaseAssert(mState == BasicWork::InternalState::RUNNING);

// No-op if timer is already set
if (mWaitingTimer)
{
CLOG_WARNING(Work, "{}: waiting timer is already set (no-op)",
getName());
return;
}

// Otherwise, setup the timer that no-ops on failure (if work is shutdown or
// destroyed, for example)
mWaitingTimer = std::make_unique<VirtualTimer>(mApp.getClock());
mWaitingTimer->expires_from_now(wakeUpIn);
mWaitingTimer->async_wait(wakeSelfUpCallback(),
&VirtualTimer::onFailureNoop);
}

void
BasicWork::crankWork()
{
Expand Down
9 changes: 8 additions & 1 deletion src/work/BasicWork.h
Expand Up @@ -188,12 +188,17 @@ class BasicWork : public std::enable_shared_from_this<BasicWork>,
// propagate the notification up to the scheduler. An example use of
// this would be RunCommandWork: a timer is used to async_wait for a
// process to exit, with a call to `wakeUp` upon completion.
void wakeUp(std::function<void()> innerCallback = nullptr);
virtual void wakeUp(std::function<void()> innerCallback = nullptr);

// Default wakeUp callback that implementers can use
std::function<void()>
wakeSelfUpCallback(std::function<void()> innerCallback = nullptr);

// To yield, setup a timer. Upon expiration, work will wake itself up and
// continue execution. Use this function before transitioning into
// WORK_WAITING state. Note: this function is idempotent
void setupWaitingCallback(std::chrono::milliseconds wakeUpIn);

Application& mApp;

private:
Expand Down Expand Up @@ -229,10 +234,12 @@ class BasicWork : public std::enable_shared_from_this<BasicWork>,
void assertValidTransition(Transition const& t) const;
static std::string stateName(InternalState st);
uint64_t getRetryETA() const;
void resetWaitingTimer();

std::function<void()> mNotifyCallback;
std::string const mName;
std::unique_ptr<VirtualTimer> mRetryTimer;
std::unique_ptr<VirtualTimer> mWaitingTimer;

InternalState mState{InternalState::PENDING};
size_t mRetries{0};
Expand Down
14 changes: 1 addition & 13 deletions src/work/ConditionalWork.cpp
Expand Up @@ -18,7 +18,6 @@ ConditionalWork::ConditionalWork(Application& app, std::string name,
, mCondition(std::move(condition))
, mConditionedWork(std::move(conditionedWork))
, mSleepDelay(sleepTime)
, mSleepTimer(std::make_unique<VirtualTimer>(app.getClock()))
{
if (!mConditionedWork)
{
Expand Down Expand Up @@ -46,20 +45,9 @@ ConditionalWork::onRun()
// Work is not started, so check the condition
if (!mCondition())
{
std::weak_ptr<ConditionalWork> weak(
std::static_pointer_cast<ConditionalWork>(shared_from_this()));
auto handler = [weak](asio::error_code const& ec) {
auto self = weak.lock();
if (self)
{
self->wakeUp();
}
};

CLOG_TRACE(Work, "Condition for {} is not satisfied: sleeping {} ms",
getName(), mSleepDelay.count());
mSleepTimer->expires_from_now(mSleepDelay);
mSleepTimer->async_wait(handler);
setupWaitingCallback(mSleepDelay);
return State::WORK_WAITING;
}
else
Expand Down
1 change: 0 additions & 1 deletion src/work/ConditionalWork.h
Expand Up @@ -59,7 +59,6 @@ class ConditionalWork : public BasicWork
ConditionFn mCondition;
std::shared_ptr<BasicWork> mConditionedWork;
std::chrono::milliseconds const mSleepDelay;
std::unique_ptr<VirtualTimer> mSleepTimer;
bool mWorkStarted{false};

public:
Expand Down
44 changes: 30 additions & 14 deletions src/work/test/WorkTests.cpp
Expand Up @@ -112,26 +112,20 @@ class TestWaitingWork : public TestBasicWork
++mRunningCount;
if (--mCount > 0)
{
std::weak_ptr<TestWaitingWork> weak(
std::static_pointer_cast<TestWaitingWork>(shared_from_this()));
auto handler = [weak](asio::error_code const& ec) {
auto self = weak.lock();
if (self)
{
++(self->mWakeUpCount);
self->wakeUp();
}
};

mTimer.expires_from_now(std::chrono::milliseconds(1));
mTimer.async_wait(handler);

setupWaitingCallback(std::chrono::milliseconds(1));
++mWaitingCount;
return BasicWork::State::WORK_WAITING;
}

return BasicWork::State::WORK_SUCCESS;
}

void
wakeUp(std::function<void()> innerCallback) override
{
++mWakeUpCount;
TestBasicWork::wakeUp(innerCallback);
}
};

TEST_CASE("BasicWork test", "[work][basicwork]")
Expand Down Expand Up @@ -181,6 +175,28 @@ TEST_CASE("BasicWork test", "[work][basicwork]")
REQUIRE(w->mWaitingCount == w->mWakeUpCount);
REQUIRE(w->mWaitingCount == w->mNumSteps - 1);
}
SECTION("work waiting - shutdown")
{
auto w = wm.scheduleWork<TestWaitingWork>("waiting-test-work");
// Start work
while (w->getState() != TestBasicWork::State::WORK_WAITING)
{
clock.crank();
}

// Work started waiting, no wake up callback fired
REQUIRE(w->mWaitingCount == 1);
REQUIRE(w->mWakeUpCount == 0);
wm.shutdown();
while (wm.getState() != TestBasicWork::State::WORK_ABORTED)
{
clock.crank();
}

// Ensure aborted work did not fire wake up callback
REQUIRE(w->mWaitingCount == 1);
REQUIRE(w->mWakeUpCount == 0);
}
SECTION("new work added midway")
{
auto w1 = wm.scheduleWork<TestBasicWork>("test-work1");
Expand Down