Skip to content

Commit

Permalink
feat(fs): metaserver support asynchronous snapshot
Browse files Browse the repository at this point in the history
close: #1617

Signed-off-by: NaturalSelect <2145973003@qq.com>
  • Loading branch information
NaturalSelect committed Sep 8, 2023
1 parent e0d7fbf commit 802a34d
Show file tree
Hide file tree
Showing 44 changed files with 1,914 additions and 1,720 deletions.
2 changes: 1 addition & 1 deletion .bazelrc
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ build --verbose_failures
build --define=with_glog=true --define=libunwind=true
build --copt -DHAVE_ZLIB=1 --copt -DGFLAGS_NS=google --copt -DUSE_BTHREAD_MUTEX
build --cxxopt -Wno-error=format-security
build:gcc7-later --cxxopt -faligned-new --cxxopt -Wno-error=implicit-fallthrough
build:gcc7-later --cxxopt -faligned-new
build --incompatible_blacklisted_protos_requires_proto_info=false
build --copt=-fdiagnostics-color=always

Expand Down
6 changes: 5 additions & 1 deletion curvefs/proto/metaserver.proto
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,11 @@ message PrepareRenameTxRequest {
}

message TransactionRequest {
required uint32 type = 1;
enum TransactionType {
None = 0;
Rename = 1;
}
required TransactionType type = 1;
required string rawPayload = 2;
}

Expand Down
56 changes: 48 additions & 8 deletions curvefs/src/metaserver/copyset/copyset_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ CopysetNode::CopysetNode(PoolId poolId, CopysetId copysetId,
confChangeMtx_(),
ongoingConfChange_(),
metric_(absl::make_unique<OperatorMetric>(poolId_, copysetId_)),
isLoading_(false) {}
isLoading_(false),
snapshotTask_() {}

CopysetNode::~CopysetNode() {
Stop();
Expand Down Expand Up @@ -188,6 +189,8 @@ void CopysetNode::Stop() {
LOG_IF(ERROR, metaStore_->Destroy() != true)
<< "Failed to clear metastore, copyset: " << name_;
}
// wait for snapshot
WaitSnapshotDone();
}

int CopysetNode::LoadConfEpoch(const std::string& file) {
Expand Down Expand Up @@ -362,6 +365,41 @@ class OnSnapshotSaveDoneClosureImpl : public OnSnapshotSaveDoneClosure {

} // namespace

void CopysetNode::DoSnapshot(OnSnapshotSaveDoneClosure* done) {
// NOTE: save metadata cannot be asynchronous
// we need maintain the consistency with
// raft snapshot metadata
std::vector<std::string> files;
brpc::ClosureGuard doneGuard(done);
auto *writer = done->GetSnapshotWriter();
if (!metaStore_->SaveMeta(writer->get_path(), &files)) {
done->SetError(MetaStatusCode::SAVE_META_FAIL);
LOG(ERROR) << "Save meta store metadata failed";
return;
}
// asynchronous save data
{
std::lock_guard<std::mutex> lock(snapshotLock_);
snapshotTask_ = std::async(std::launch::async,[files, done, this]() mutable {
brpc::ClosureGuard doneGuard(done);
auto *writer = done->GetSnapshotWriter();
// save data files
if (!metaStore_->SaveData(writer->get_path(), &files)) {
done->SetError(MetaStatusCode::SAVE_META_FAIL);
LOG(ERROR) << "Save meta store data failed";
return;
}
// add files to snapshot writer
// file is a relative path under the given directory
for (const auto &f : files) {
writer->add_file(f);
}
done->SetSuccess();
});
}
doneGuard.release();
}

void CopysetNode::on_snapshot_save(braft::SnapshotWriter* writer,
braft::Closure* done) {
LOG(INFO) << "Copyset " << name_ << " saving snapshot to '"
Expand Down Expand Up @@ -390,19 +428,21 @@ void CopysetNode::on_snapshot_save(braft::SnapshotWriter* writer,

writer->add_file(kConfEpochFilename);

// TODO(wuhanqing): MetaStore::Save will start a thread and do task
// asynchronously, after task completed it will call
// OnSnapshotSaveDoneImpl::Run
// BUT, this manner is not so clear, maybe it better to make thing
// asynchronous directly in here
metaStore_->Save(writer->get_path(), new OnSnapshotSaveDoneClosureImpl(
DoSnapshot(new OnSnapshotSaveDoneClosureImpl(
this, writer, done, metricCtx));
doneGuard.release();

// `Cancel` only available for rvalue
std::move(cleanMetricIfFailed).Cancel();
}

void CopysetNode::WaitSnapshotDone() {
std::lock_guard<std::mutex> lock(snapshotLock_);
if (snapshotTask_.valid()) {
snapshotTask_.wait();
}
}

namespace {

class CopysetLoadingGuard {
Expand Down Expand Up @@ -463,7 +503,7 @@ int CopysetNode::on_snapshot_load(braft::SnapshotReader* reader) {

void CopysetNode::on_leader_start(int64_t term) {
/*
* Invoke order in on_leader_start:
* Invoke order in on_leader_start:
* 1. flush concurrent apply queue.
* 2. set term in states machine.
*
Expand Down
21 changes: 21 additions & 0 deletions curvefs/src/metaserver/copyset/copyset_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include <string>
#include <vector>
#include <map>
#include <condition_variable>

#include "curvefs/src/metaserver/common/types.h"
#include "curvefs/src/metaserver/copyset/concurrent_apply_queue.h"
Expand Down Expand Up @@ -220,6 +221,13 @@ class CopysetNode : public braft::StateMachine {

FRIEND_TEST(CopysetNodeBlockGroupTest, Test_AggregateBlockStatInfo);

private:
// for snapshot

void WaitSnapshotDone();

void DoSnapshot(OnSnapshotSaveDoneClosure* done);

private:
const PoolId poolId_;
const CopysetId copysetId_;
Expand Down Expand Up @@ -267,6 +275,19 @@ class CopysetNode : public braft::StateMachine {
std::unique_ptr<OperatorMetric> metric_;

std::atomic<bool> isLoading_;

// for asynchronous snapshot
// std::atomic_bool enableSnapshot_;

// std::condition_variable snapshotCv_;

mutable std::mutex snapshotLock_;

// NOTE: maintain a queue is unnecessary
// we only need one item
// std::function<void()> snapshotTask_;

std::future<void> snapshotTask_;
};

inline void CopysetNode::Propose(const braft::Task& task) {
Expand Down
Loading

0 comments on commit 802a34d

Please sign in to comment.