Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use the same rocksdb snapshot when sending snapshot #3846

Merged
merged 2 commits into from
Mar 10, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion resources/gflags.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
"v",
"heartbeat_interval_secs",
"meta_client_retry_times",
"slow_op_threshold_ms",
"clean_wal_interval_secs",
"wal_ttl",
"clean_wal_interval_secs",
Expand Down
1 change: 0 additions & 1 deletion src/common/base/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ nebula_add_library(
Status.cpp
SanitizerOptions.cpp
SignalHandler.cpp
SlowOpTracker.cpp
${gdb_debug_script}
)

Expand Down
11 changes: 0 additions & 11 deletions src/common/base/SlowOpTracker.cpp

This file was deleted.

40 changes: 0 additions & 40 deletions src/common/base/SlowOpTracker.h

This file was deleted.

7 changes: 0 additions & 7 deletions src/common/base/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,6 @@ nebula_add_test(
LIBRARIES gtest gtest_main
)

nebula_add_test(
NAME slow_op_tracker_test
SOURCES SlowOpTrackerTest.cpp
OBJECTS $<TARGET_OBJECTS:base_obj> $<TARGET_OBJECTS:time_obj>
LIBRARIES gtest gtest_main
)

nebula_add_test(
NAME lru_test
SOURCES ConcurrentLRUCacheTest.cpp
Expand Down
28 changes: 0 additions & 28 deletions src/common/base/test/SlowOpTrackerTest.cpp

This file was deleted.

1 change: 0 additions & 1 deletion src/common/meta/GflagsManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ std::unordered_map<std::string, std::pair<cpp2::ConfigMode, bool>> GflagsManager
{"heartbeat_interval_secs", {cpp2::ConfigMode::MUTABLE, false}},
{"agent_heartbeat_interval_secs", {cpp2::ConfigMode::MUTABLE, false}},
{"meta_client_retry_times", {cpp2::ConfigMode::MUTABLE, false}},
{"slow_op_threshold_ms", {cpp2::ConfigMode::MUTABLE, false}},
{"wal_ttl", {cpp2::ConfigMode::MUTABLE, false}},
{"clean_wal_interval_secs", {cpp2::ConfigMode::MUTABLE, false}},
{"custom_filter_interval_secs", {cpp2::ConfigMode::MUTABLE, false}},
Expand Down
4 changes: 3 additions & 1 deletion src/kvstore/KVEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@ class KVEngine {
* @param value Pointer of value
* @return nebula::cpp2::ErrorCode
*/
virtual nebula::cpp2::ErrorCode get(const std::string& key, std::string* value) = 0;
virtual nebula::cpp2::ErrorCode get(const std::string& key,
std::string* value,
const void* snapshot = nullptr) = 0;

/**
* @brief Read a list of keys
Expand Down
4 changes: 3 additions & 1 deletion src/kvstore/KVStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ class KVStore {
virtual const void* GetSnapshot(GraphSpaceID spaceId,
PartitionID partID,
bool canReadFromFollower = false) = 0;

/**
* @brief Release snapshot.
*
Expand All @@ -131,7 +132,8 @@ class KVStore {
PartitionID partId,
const std::string& key,
std::string* value,
bool canReadFromFollower = false) = 0;
bool canReadFromFollower = false,
const void* snapshot = nullptr) = 0;

/**
* @brief Read a list of keys
Expand Down
64 changes: 50 additions & 14 deletions src/kvstore/NebulaSnapshotManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,40 +29,69 @@ NebulaSnapshotManager::NebulaSnapshotManager(NebulaStore* kv) : store_(kv) {
void NebulaSnapshotManager::accessAllRowsInSnapshot(GraphSpaceID spaceId,
PartitionID partId,
raftex::SnapshotCallback cb) {
auto rateLimiter = std::make_unique<kvstore::RateLimiter>();
CHECK_NOTNULL(store_);
auto tables = NebulaKeyUtils::snapshotPrefix(partId);
static constexpr LogID kInvalidLogId = -1;
static constexpr TermID kInvalidLogTerm = -1;
std::vector<std::string> data;
int64_t totalSize = 0;
int64_t totalCount = 0;
LOG(INFO) << folly::sformat(
"Space {} Part {} start send snapshot, rate limited to {}, batch size is {}",
spaceId,
partId,
FLAGS_snapshot_part_rate_limit,
FLAGS_snapshot_batch_size);

const void* snapshot = store_->GetSnapshot(spaceId, partId);
CHECK_NOTNULL(store_);
auto partRet = store_->part(spaceId, partId);
if (!ok(partRet)) {
LOG(INFO) << folly::sformat("Failed to find space {} part {]", spaceId, partId);
cb(kInvalidLogId, kInvalidLogTerm, data, totalCount, totalSize, raftex::SnapshotStatus::FAILED);
return;
}
// Create a rocksdb snapshot
auto snapshot = store_->GetSnapshot(spaceId, partId);
SCOPE_EXIT {
if (snapshot != nullptr) {
store_->ReleaseSnapshot(spaceId, partId, snapshot);
}
};
auto part = nebula::value(partRet);
// Get the commit log id and commit log term of specified partition
std::string val;
auto commitRet = part->engine()->get(NebulaKeyUtils::systemCommitKey(partId), &val, snapshot);
panda-sheep marked this conversation as resolved.
Show resolved Hide resolved
if (commitRet != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(INFO) << folly::sformat(
"Cannot fetch the commit log id and term of space {} part {}", spaceId, partId);
cb(kInvalidLogId, kInvalidLogTerm, data, totalCount, totalSize, raftex::SnapshotStatus::FAILED);
return;
}
CHECK_EQ(val.size(), sizeof(LogID) + sizeof(TermID));
LogID commitLogId;
TermID commitLogTerm;
memcpy(reinterpret_cast<void*>(&commitLogId), val.data(), sizeof(LogID));
memcpy(reinterpret_cast<void*>(&commitLogTerm), val.data() + sizeof(LogID), sizeof(TermID));

critical27 marked this conversation as resolved.
Show resolved Hide resolved
LOG(INFO) << folly::sformat(
"Space {} Part {} start send snapshot of commitLogId {} commitLogTerm {}, rate limited to "
"{}, batch size is {}",
spaceId,
partId,
commitLogId,
commitLogTerm,
FLAGS_snapshot_part_rate_limit,
FLAGS_snapshot_batch_size);

auto rateLimiter = std::make_unique<kvstore::RateLimiter>();
auto tables = NebulaKeyUtils::snapshotPrefix(partId);
for (const auto& prefix : tables) {
if (!accessTable(spaceId,
partId,
snapshot,
prefix,
cb,
commitLogId,
commitLogTerm,
data,
totalCount,
totalSize,
rateLimiter.get())) {
return;
}
}
cb(data, totalCount, totalSize, raftex::SnapshotStatus::DONE);
cb(commitLogId, commitLogTerm, data, totalCount, totalSize, raftex::SnapshotStatus::DONE);
}

// Promise is set in callback. Access part of the data, and try to send to
Expand All @@ -72,6 +101,8 @@ bool NebulaSnapshotManager::accessTable(GraphSpaceID spaceId,
const void* snapshot,
const std::string& prefix,
raftex::SnapshotCallback& cb,
LogID commitLogId,
TermID commitLogTerm,
std::vector<std::string>& data,
int64_t& totalCount,
int64_t& totalSize,
Expand All @@ -81,7 +112,7 @@ bool NebulaSnapshotManager::accessTable(GraphSpaceID spaceId,
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
VLOG(2) << "[spaceId:" << spaceId << ", partId:" << partId << "] access prefix failed"
<< ", error code:" << static_cast<int32_t>(ret);
cb(data, totalCount, totalSize, raftex::SnapshotStatus::FAILED);
cb(commitLogId, commitLogTerm, data, totalCount, totalSize, raftex::SnapshotStatus::FAILED);
return false;
}
data.reserve(kReserveNum);
Expand All @@ -91,7 +122,12 @@ bool NebulaSnapshotManager::accessTable(GraphSpaceID spaceId,
rateLimiter->consume(static_cast<double>(batchSize), // toConsume
static_cast<double>(FLAGS_snapshot_part_rate_limit), // rate
static_cast<double>(FLAGS_snapshot_part_rate_limit)); // burstSize
if (cb(data, totalCount, totalSize, raftex::SnapshotStatus::IN_PROGRESS)) {
if (cb(commitLogId,
commitLogTerm,
data,
totalCount,
totalSize,
raftex::SnapshotStatus::IN_PROGRESS)) {
data.clear();
batchSize = 0;
} else {
Expand Down
2 changes: 2 additions & 0 deletions src/kvstore/NebulaSnapshotManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ class NebulaSnapshotManager : public raftex::SnapshotManager {
const void* snapshot,
const std::string& prefix,
raftex::SnapshotCallback& cb,
LogID commitLogId,
TermID commitLogTerm,
std::vector<std::string>& data,
int64_t& totalCount,
int64_t& totalSize,
Expand Down
5 changes: 3 additions & 2 deletions src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,8 @@ nebula::cpp2::ErrorCode NebulaStore::get(GraphSpaceID spaceId,
PartitionID partId,
const std::string& key,
std::string* value,
bool canReadFromFollower) {
bool canReadFromFollower,
const void* snapshot) {
auto ret = part(spaceId, partId);
if (!ok(ret)) {
return error(ret);
Expand All @@ -588,7 +589,7 @@ nebula::cpp2::ErrorCode NebulaStore::get(GraphSpaceID spaceId,
return part->isLeader() ? nebula::cpp2::ErrorCode::E_LEADER_LEASE_FAILED
: nebula::cpp2::ErrorCode::E_LEADER_CHANGED;
}
return part->engine()->get(key, value);
return part->engine()->get(key, value, snapshot);
}

const void* NebulaStore::GetSnapshot(GraphSpaceID spaceId,
Expand Down
3 changes: 2 additions & 1 deletion src/kvstore/NebulaStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,8 @@ class NebulaStore : public KVStore, public Handler {
PartitionID partId,
const std::string& key,
std::string* value,
bool canReadFromFollower = false) override;
bool canReadFromFollower = false,
const void* snapshot = nullptr) override;

/**
* @brief Read a list of keys
Expand Down
59 changes: 10 additions & 49 deletions src/kvstore/Part.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -308,26 +308,12 @@ std::tuple<nebula::cpp2::ErrorCode, LogID, TermID> Part::commitLogs(
}
case OP_TRANS_LEADER: {
auto newLeader = decodeHost(OP_TRANS_LEADER, log);
auto ts = getTimestamp(log);
critical27 marked this conversation as resolved.
Show resolved Hide resolved
if (ts > startTimeMs_) {
commitTransLeader(newLeader);
} else {
VLOG(1) << idStr_ << "Skip commit stale transfer leader " << newLeader
<< ", the part is opened at " << startTimeMs_ << ", but the log timestamp is "
<< ts;
}
commitTransLeader(newLeader);
break;
}
case OP_REMOVE_PEER: {
auto peer = decodeHost(OP_REMOVE_PEER, log);
auto ts = getTimestamp(log);
if (ts > startTimeMs_) {
commitRemovePeer(peer);
} else {
VLOG(1) << idStr_ << "Skip commit stale remove peer " << peer
<< ", the part is opened at " << startTimeMs_ << ", but the log timestamp is "
<< ts;
}
commitRemovePeer(peer);
break;
}
default: {
Expand Down Expand Up @@ -405,51 +391,26 @@ bool Part::preProcessLog(LogID logId, TermID termId, ClusterID clusterId, const
switch (log[sizeof(int64_t)]) {
case OP_ADD_LEARNER: {
auto learner = decodeHost(OP_ADD_LEARNER, log);
auto ts = getTimestamp(log);
if (ts > startTimeMs_) {
VLOG(1) << idStr_ << "preprocess add learner " << learner;
addLearner(learner);
} else {
VLOG(1) << idStr_ << "Skip stale add learner " << learner << ", the part is opened at "
<< startTimeMs_ << ", but the log timestamp is " << ts;
}
LOG(INFO) << idStr_ << "preprocess add learner " << learner;
addLearner(learner);
break;
}
case OP_TRANS_LEADER: {
auto newLeader = decodeHost(OP_TRANS_LEADER, log);
auto ts = getTimestamp(log);
if (ts > startTimeMs_) {
VLOG(1) << idStr_ << "preprocess trans leader " << newLeader;
preProcessTransLeader(newLeader);
} else {
VLOG(1) << idStr_ << "Skip stale transfer leader " << newLeader
<< ", the part is opened at " << startTimeMs_ << ", but the log timestamp is "
<< ts;
}
LOG(INFO) << idStr_ << "preprocess trans leader " << newLeader;
preProcessTransLeader(newLeader);
break;
}
case OP_ADD_PEER: {
auto peer = decodeHost(OP_ADD_PEER, log);
auto ts = getTimestamp(log);
if (ts > startTimeMs_) {
VLOG(1) << idStr_ << "preprocess add peer " << peer;
addPeer(peer);
} else {
VLOG(1) << idStr_ << "Skip stale add peer " << peer << ", the part is opened at "
<< startTimeMs_ << ", but the log timestamp is " << ts;
}
LOG(INFO) << idStr_ << "preprocess add peer " << peer;
addPeer(peer);
break;
}
case OP_REMOVE_PEER: {
auto peer = decodeHost(OP_REMOVE_PEER, log);
auto ts = getTimestamp(log);
if (ts > startTimeMs_) {
VLOG(1) << idStr_ << "preprocess remove peer " << peer;
preProcessRemovePeer(peer);
} else {
VLOG(1) << idStr_ << "Skip stale remove peer " << peer << ", the part is opened at "
<< startTimeMs_ << ", but the log timestamp is " << ts;
}
LOG(INFO) << idStr_ << "preprocess remove peer " << peer;
preProcessRemovePeer(peer);
break;
}
default: {
Expand Down
Loading