Skip to content

Commit

Permalink
SERVER-29418 Create a storage-engine agnostic checkpointing thread
Browse files Browse the repository at this point in the history
  • Loading branch information
GWlodarek authored and Evergreen Agent committed Sep 10, 2020
1 parent b68df4e commit f10e0ad
Show file tree
Hide file tree
Showing 21 changed files with 458 additions and 308 deletions.
3 changes: 3 additions & 0 deletions src/mongo/db/mongod_options.cpp
Expand Up @@ -404,6 +404,9 @@ Status storeMongodOptions(const moe::Environment& params) {

if (params.count("storage.syncPeriodSecs")) {
storageGlobalParams.syncdelay = params["storage.syncPeriodSecs"].as<double>();
storageGlobalParams.checkpointDelaySecs =
static_cast<size_t>(params["storage.syncPeriodSecs"].as<double>());

if (storageGlobalParams.syncdelay < 0 ||
storageGlobalParams.syncdelay > StorageGlobalParams::kMaxSyncdelaySecs) {
return Status(ErrorCodes::BadValue,
Expand Down
14 changes: 13 additions & 1 deletion src/mongo/db/repl/storage_interface_impl.cpp
Expand Up @@ -74,6 +74,7 @@
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/repl/rollback_gen.h"
#include "mongo/db/service_context.h"
#include "mongo/db/storage/checkpointer.h"
#include "mongo/db/storage/control/journal_flusher.h"
#include "mongo/db/storage/control/storage_control.h"
#include "mongo/db/storage/durable_catalog.h"
Expand Down Expand Up @@ -1271,7 +1272,18 @@ void StorageInterfaceImpl::setStableTimestamp(ServiceContext* serviceCtx, Timest
"holdStableTimestamp"_attr = holdStableTimestamp);
}
});
serviceCtx->getStorageEngine()->setStableTimestamp(newStableTimestamp);

StorageEngine* storageEngine = serviceCtx->getStorageEngine();
Timestamp prevStableTimestamp = storageEngine->getStableTimestamp();

storageEngine->setStableTimestamp(newStableTimestamp);

Checkpointer* checkpointer = Checkpointer::get(serviceCtx);
if (checkpointer && !checkpointer->hasTriggeredFirstStableCheckpoint()) {
checkpointer->triggerFirstStableCheckpoint(prevStableTimestamp,
storageEngine->getInitialDataTimestamp(),
storageEngine->getStableTimestamp());
}
}

void StorageInterfaceImpl::setInitialDataTimestamp(ServiceContext* serviceCtx,
Expand Down
15 changes: 15 additions & 0 deletions src/mongo/db/storage/SConscript
Expand Up @@ -121,11 +121,13 @@ env.Library(
'control/storage_control.cpp',
],
LIBDEPS=[
'checkpointer',
'journal_flusher',
],
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/db/service_context',
'storage_options',
],
)

Expand Down Expand Up @@ -512,6 +514,19 @@ env.Library(
],
)

env.Library(
target='checkpointer',
source=[
'checkpointer.cpp',
],
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/db/service_context',
'$BUILD_DIR/mongo/util/background_job',
'storage_options',
],
)

env.Library(
target='two_phase_index_build_knobs_idl',
source=[
Expand Down
168 changes: 168 additions & 0 deletions src/mongo/db/storage/checkpointer.cpp
@@ -0,0 +1,168 @@
/**
* Copyright (C) 2020-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/

#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kStorage

#include "mongo/platform/basic.h"

#include "mongo/db/storage/checkpointer.h"

#include "mongo/db/operation_context.h"
#include "mongo/db/service_context.h"
#include "mongo/db/storage/kv/kv_engine.h"
#include "mongo/logv2/log.h"
#include "mongo/util/concurrency/idle_thread_block.h"
#include "mongo/util/fail_point.h"

namespace mongo {

namespace {

const auto getCheckpointer = ServiceContext::declareDecoration<std::unique_ptr<Checkpointer>>();

MONGO_FAIL_POINT_DEFINE(pauseCheckpointThread);

} // namespace

Checkpointer* Checkpointer::get(ServiceContext* serviceCtx) {
return getCheckpointer(serviceCtx).get();
}

Checkpointer* Checkpointer::get(OperationContext* opCtx) {
return get(opCtx->getServiceContext());
}

void Checkpointer::set(ServiceContext* serviceCtx, std::unique_ptr<Checkpointer> newCheckpointer) {
auto& checkpointer = getCheckpointer(serviceCtx);
if (checkpointer) {
invariant(!checkpointer->running(),
"Tried to reset the Checkpointer without shutting down the original instance.");
}
checkpointer = std::move(newCheckpointer);
}

void Checkpointer::run() {
ThreadClient tc(name(), getGlobalServiceContext());
LOGV2_DEBUG(22307, 1, "Starting thread", "threadName"_attr = name());

while (true) {
auto opCtx = tc->makeOperationContext();

{
stdx::unique_lock<Latch> lock(_mutex);
MONGO_IDLE_THREAD_BLOCK;

// Wait for 'storageGlobalParams.checkpointDelaySecs' seconds; or until either shutdown
// is signaled or a checkpoint is triggered.
_sleepCV.wait_for(lock,
stdx::chrono::seconds(static_cast<std::int64_t>(
storageGlobalParams.checkpointDelaySecs)),
[&] { return _shuttingDown || _triggerCheckpoint; });

// If the checkpointDelaySecs is set to 0, that means we should skip checkpointing.
// However, checkpointDelaySecs is adjustable by a runtime server parameter, so we
// need to wake up to check periodically. The wakeup to check period is arbitrary.
while (storageGlobalParams.checkpointDelaySecs == 0 && !_shuttingDown &&
!_triggerCheckpoint) {
_sleepCV.wait_for(lock, stdx::chrono::seconds(static_cast<std::int64_t>(3)), [&] {
return _shuttingDown || _triggerCheckpoint;
});
}

if (_shuttingDown) {
invariant(!_shutdownReason.isOK());
LOGV2_DEBUG(22309,
1,
"Stopping thread",
"threadName"_attr = name(),
"reason"_attr = _shutdownReason);
return;
}

// Clear the trigger so we do not immediately checkpoint again after this.
_triggerCheckpoint = false;
}

pauseCheckpointThread.pauseWhileSet();

const Date_t startTime = Date_t::now();

// TODO SERVER-50861: Access the storage engine via the ServiceContext.
_kvEngine->checkpoint();

const auto secondsElapsed = durationCount<Seconds>(Date_t::now() - startTime);
if (secondsElapsed >= 30) {
LOGV2_DEBUG(22308,
1,
"Checkpoint was slow to complete",
"secondsElapsed"_attr = secondsElapsed);
}
}
}

void Checkpointer::triggerFirstStableCheckpoint(Timestamp prevStable,
Timestamp initialData,
Timestamp currStable) {
stdx::unique_lock<Latch> lock(_mutex);
invariant(!_hasTriggeredFirstStableCheckpoint);
if (prevStable < initialData && currStable >= initialData) {
LOGV2(22310,
"Triggering the first stable checkpoint",
"initialDataTimestamp"_attr = initialData,
"prevStableTimestamp"_attr = prevStable,
"currStableTimestamp"_attr = currStable);
_hasTriggeredFirstStableCheckpoint = true;
_triggerCheckpoint = true;
_sleepCV.notify_one();
}
}

bool Checkpointer::hasTriggeredFirstStableCheckpoint() {
stdx::unique_lock<Latch> lock(_mutex);
return _hasTriggeredFirstStableCheckpoint;
}

void Checkpointer::shutdown(const Status& reason) {
LOGV2(22322, "Shutting down checkpoint thread");

{
stdx::unique_lock<Latch> lock(_mutex);
_shuttingDown = true;
_shutdownReason = reason;

// Wake up the checkpoint thread early, to take a final checkpoint before shutting down, if
// one has not coincidentally just been taken.
_sleepCV.notify_one();
}

wait();
LOGV2(22323, "Finished shutting down checkpoint thread");
}

} // namespace mongo
114 changes: 114 additions & 0 deletions src/mongo/db/storage/checkpointer.h
@@ -0,0 +1,114 @@
/**
* Copyright (C) 2020-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/

#pragma once

#include "mongo/platform/mutex.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/util/background.h"

namespace mongo {

class KVEngine;
class OperationContext;
class ServiceContext;
class Timestamp;

class Checkpointer : public BackgroundJob {
public:
Checkpointer(KVEngine* kvEngine)
: BackgroundJob(false /* deleteSelf */),
_kvEngine(kvEngine),
_shuttingDown(false),
_shutdownReason(Status::OK()),
_hasTriggeredFirstStableCheckpoint(false),
_triggerCheckpoint(false) {}

static Checkpointer* get(ServiceContext* serviceCtx);
static Checkpointer* get(OperationContext* opCtx);
static void set(ServiceContext* serviceCtx, std::unique_ptr<Checkpointer> newCheckpointer);

std::string name() const override {
return "Checkpointer";
}

/**
* Starts the checkpoint thread that runs every storageGlobalParams.checkpointDelaySecs seconds.
*/
void run() override;

/**
* Triggers taking the first stable checkpoint if the stable timestamp has advanced past the
* initial data timestamp.
*
* The checkpoint thread runs automatically every storageGlobalParams.checkpointDelaySecs
* seconds. This function avoids potentially waiting that full duration for a stable checkpoint,
* initiating one immediately.
*
* Do not call this function if hasTriggeredFirstStableCheckpoint() returns true.
*/
void triggerFirstStableCheckpoint(Timestamp prevStable,
Timestamp initialData,
Timestamp currStable);

/**
* Returns whether the first stable checkpoint has already been triggered.
*/
bool hasTriggeredFirstStableCheckpoint();

/**
* Blocks until the checkpoint thread has been fully shutdown.
*/
void shutdown(const Status& reason);

private:
// A pointer to the KVEngine is maintained only due to unit testing limitations that don't fully
// setup the ServiceContext.
// TODO SERVER-50861: Remove this pointer.
KVEngine* const _kvEngine;

// Protects the state below.
Mutex _mutex = MONGO_MAKE_LATCH("Checkpointer::_mutex");

// The checkpoint thread idles on this condition variable for a particular time duration between
// taking checkpoints. It can be triggered early to expedite either: immediate checkpointing if
// _triggerCheckpoint is set; or shutdown cleanup if _shuttingDown is set.
stdx::condition_variable _sleepCV;

bool _shuttingDown;
Status _shutdownReason;

// This flag ensures the first stable checkpoint is only triggered once.
bool _hasTriggeredFirstStableCheckpoint;

// This flag allows the checkpoint thread to wake up early when _sleepCV is signaled.
bool _triggerCheckpoint;
};

} // namespace mongo
15 changes: 15 additions & 0 deletions src/mongo/db/storage/control/storage_control.cpp
Expand Up @@ -35,7 +35,9 @@

#include "mongo/db/operation_context.h"
#include "mongo/db/service_context.h"
#include "mongo/db/storage/checkpointer.h"
#include "mongo/db/storage/control/journal_flusher.h"
#include "mongo/db/storage/storage_options.h"
#include "mongo/logv2/log.h"

namespace mongo {
Expand Down Expand Up @@ -73,12 +75,25 @@ void startStorageControls(ServiceContext* serviceContext, bool forTestOnly) {
journalFlusher->go();
JournalFlusher::set(serviceContext, std::move(journalFlusher));

if (storageEngine->supportsCheckpoints() && !storageEngine->isEphemeral() &&
!storageGlobalParams.readOnly) {
std::unique_ptr<Checkpointer> checkpointer =
std::make_unique<Checkpointer>(storageEngine->getEngine());
checkpointer->go();
Checkpointer::set(serviceContext, std::move(checkpointer));
}

areControlsStarted = true;
}

void stopStorageControls(ServiceContext* serviceContext, const Status& reason) {
if (areControlsStarted) {
JournalFlusher::get(serviceContext)->shutdown(reason);

auto checkpointer = Checkpointer::get(serviceContext);
if (checkpointer) {
checkpointer->shutdown(reason);
}
}
}

Expand Down

0 comments on commit f10e0ad

Please sign in to comment.