Skip to content

Commit

Permalink
Update listener (#4925)
Browse files Browse the repository at this point in the history
* remove Listener::apply

* remove Listener::apply

* Revert "remove Listener::apply"

This reverts commit 7898a20.
  • Loading branch information
cangfengzhs committed Nov 23, 2022
1 parent 2c447a9 commit b251e2f
Show file tree
Hide file tree
Showing 5 changed files with 241 additions and 148 deletions.
117 changes: 0 additions & 117 deletions src/kvstore/Listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
#include "kvstore/LogEncoder.h"

DEFINE_int32(listener_commit_interval_secs, 1, "Listener commit interval");
DEFINE_int32(listener_commit_batch_size, 1000, "Max batch size when listener commit");
DEFINE_uint32(ft_request_retry_times, 3, "Retry times if fulltext request failed");
DEFINE_int32(ft_bulk_batch_size, 100, "Max batch size when bulk insert");
DEFINE_int32(listener_pursue_leader_threshold, 1000, "Catch up with the leader's threshold");
Expand Down Expand Up @@ -161,123 +160,7 @@ void Listener::doApply() {
});
}

void Listener::processLogs() {
std::unique_ptr<LogIterator> iter;
{
std::lock_guard<std::mutex> guard(raftLock_);
if (lastApplyLogId_ >= committedLogId_) {
return;
}
iter = wal_->iterator(lastApplyLogId_ + 1, committedLogId_);
}

LogID lastApplyId = -1;
// the kv pair which can sync to remote safely
std::vector<KV> data;
while (iter->valid()) {
lastApplyId = iter->logId();

auto log = iter->logMsg();
if (log.empty()) {
// skip the heartbeat
++(*iter);
continue;
}

DCHECK_GE(log.size(), sizeof(int64_t) + 1 + sizeof(uint32_t));
switch (log[sizeof(int64_t)]) {
case OP_PUT: {
auto pieces = decodeMultiValues(log);
DCHECK_EQ(2, pieces.size());
data.emplace_back(pieces[0], pieces[1]);
break;
}
case OP_MULTI_PUT: {
auto kvs = decodeMultiValues(log);
DCHECK_EQ(0, kvs.size() % 2);
for (size_t i = 0; i < kvs.size(); i += 2) {
data.emplace_back(kvs[i], kvs[i + 1]);
}
break;
}
case OP_REMOVE:
case OP_REMOVE_RANGE:
case OP_MULTI_REMOVE: {
break;
}
case OP_BATCH_WRITE: {
auto batch = decodeBatchValue(log);
for (auto& op : batch) {
// OP_BATCH_REMOVE and OP_BATCH_REMOVE_RANGE is igored
if (op.first == BatchLogType::OP_BATCH_PUT) {
data.emplace_back(op.second.first, op.second.second);
}
}
break;
}
case OP_TRANS_LEADER:
case OP_ADD_LEARNER:
case OP_ADD_PEER:
case OP_REMOVE_PEER: {
break;
}
default: {
VLOG(2) << idStr_ << "Unknown operation: " << static_cast<int32_t>(log[0]);
}
}

if (static_cast<int32_t>(data.size()) > FLAGS_listener_commit_batch_size) {
break;
}
++(*iter);
}

// apply to state machine
if (lastApplyId != -1 && apply(data)) {
std::lock_guard<std::mutex> guard(raftLock_);
lastApplyLogId_ = lastApplyId;
persist(committedLogId_, term_, lastApplyLogId_);
VLOG(2) << idStr_ << "Listener succeeded apply log to " << lastApplyLogId_;
lastApplyTime_ = time::WallClock::fastNowInMilliSec();
}
}

std::tuple<nebula::cpp2::ErrorCode, int64_t, int64_t> Listener::commitSnapshot(
const std::vector<std::string>& rows,
LogID committedLogId,
TermID committedLogTerm,
bool finished) {
VLOG(2) << idStr_ << "Listener is committing snapshot.";
int64_t count = 0;
int64_t size = 0;
std::vector<KV> data;
data.reserve(rows.size());
for (const auto& row : rows) {
count++;
size += row.size();
auto kv = decodeKV(row);
data.emplace_back(kv.first, kv.second);
}
if (!apply(data)) {
LOG(INFO) << idStr_ << "Failed to apply data while committing snapshot.";
return {
nebula::cpp2::ErrorCode::E_RAFT_PERSIST_SNAPSHOT_FAILED, kNoSnapshotCount, kNoSnapshotSize};
}
if (finished) {
CHECK(!raftLock_.try_lock());
leaderCommitId_ = committedLogId;
lastApplyLogId_ = committedLogId;
persist(committedLogId, committedLogTerm, lastApplyLogId_);
lastApplyTime_ = time::WallClock::fastNowInMilliSec();
LOG(INFO) << folly::sformat(
"Commit snapshot to : committedLogId={},"
"committedLogTerm={}, lastApplyLogId_={}",
committedLogId,
committedLogTerm,
lastApplyLogId_);
}
return {nebula::cpp2::ErrorCode::SUCCEEDED, count, size};
}

void Listener::resetListener() {
std::lock_guard<std::mutex> g(raftLock_);
Expand Down
29 changes: 2 additions & 27 deletions src/kvstore/Listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "common/base/Base.h"
#include "common/meta/SchemaManager.h"
#include "kvstore/Common.h"
#include "kvstore/LogEncoder.h"
#include "kvstore/raftex/Host.h"
#include "kvstore/raftex/RaftPart.h"
#include "kvstore/wal/FileBasedWal.h"
Expand Down Expand Up @@ -180,14 +181,6 @@ class Listener : public raftex::RaftPart {
*/
virtual LogID lastApplyLogId() = 0;

/**
* @brief Apply data into listener's state machine
*
* @param data Key/value to apply
* @return True if succeed. False if failed.
*/
virtual bool apply(const std::vector<KV>& data) = 0;

/**
* @brief Persist commitLogId commitLogTerm and lastApplyLogId
*/
Expand Down Expand Up @@ -272,31 +265,13 @@ class Listener : public raftex::RaftPart {
ClusterID clusterId,
folly::StringPiece log) override;

/**
* @brief If the listener falls behind way to much than leader, the leader will send all its data
* in snapshot by batch, listener need to implement this method to apply the batch to state
* machine. The return value is a pair of <logs count, logs size> of this batch.
*
* @param data Data to apply
* @param committedLogId Commit log id of snapshot
* @param committedLogTerm Commit log term of snapshot
* @param finished Whether spapshot is finished
* @return std::tuple<nebula::cpp2::ErrorCode, int64_t, int64_t> Return {ok, count, size} if
* succeed, else return {errorcode, -1, -1}
*/
std::tuple<nebula::cpp2::ErrorCode, int64_t, int64_t> commitSnapshot(
const std::vector<std::string>& data,
LogID committedLogId,
TermID committedLogTerm,
bool finished) override;

/**
* @brief Background job thread will trigger doApply to apply data into state machine periodically
*/
void doApply();

// Process logs and then call apply to execute
virtual void processLogs();
virtual void processLogs() = 0;

protected:
LogID leaderCommitId_ = 0;
Expand Down
118 changes: 118 additions & 0 deletions src/kvstore/plugins/elasticsearch/ESListener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

DECLARE_uint32(ft_request_retry_times);
DECLARE_int32(ft_bulk_batch_size);
DEFINE_int32(listener_commit_batch_size, 1000, "Max batch size when listener commit");

namespace nebula {
namespace kvstore {
Expand Down Expand Up @@ -244,5 +245,122 @@ bool ESListener::writeDatum(const std::vector<nebula::plugin::DocItem>& items) c
return true;
}

void ESListener::processLogs() {
std::unique_ptr<LogIterator> iter;
{
std::lock_guard<std::mutex> guard(raftLock_);
if (lastApplyLogId_ >= committedLogId_) {
return;
}
iter = wal_->iterator(lastApplyLogId_ + 1, committedLogId_);
}

LogID lastApplyId = -1;
// the kv pair which can sync to remote safely
std::vector<KV> data;
while (iter->valid()) {
lastApplyId = iter->logId();

auto log = iter->logMsg();
if (log.empty()) {
// skip the heartbeat
++(*iter);
continue;
}

DCHECK_GE(log.size(), sizeof(int64_t) + 1 + sizeof(uint32_t));
switch (log[sizeof(int64_t)]) {
case OP_PUT: {
auto pieces = decodeMultiValues(log);
DCHECK_EQ(2, pieces.size());
data.emplace_back(pieces[0], pieces[1]);
break;
}
case OP_MULTI_PUT: {
auto kvs = decodeMultiValues(log);
DCHECK_EQ(0, kvs.size() % 2);
for (size_t i = 0; i < kvs.size(); i += 2) {
data.emplace_back(kvs[i], kvs[i + 1]);
}
break;
}
case OP_REMOVE:
case OP_REMOVE_RANGE:
case OP_MULTI_REMOVE: {
break;
}
case OP_BATCH_WRITE: {
auto batch = decodeBatchValue(log);
for (auto& op : batch) {
// OP_BATCH_REMOVE and OP_BATCH_REMOVE_RANGE is igored
if (op.first == BatchLogType::OP_BATCH_PUT) {
data.emplace_back(op.second.first, op.second.second);
}
}
break;
}
case OP_TRANS_LEADER:
case OP_ADD_LEARNER:
case OP_ADD_PEER:
case OP_REMOVE_PEER: {
break;
}
default: {
VLOG(2) << idStr_ << "Unknown operation: " << static_cast<int32_t>(log[0]);
}
}

if (static_cast<int32_t>(data.size()) > FLAGS_listener_commit_batch_size) {
break;
}
++(*iter);
}
// apply to state machine
if (lastApplyId != -1 && apply(data)) {
std::lock_guard<std::mutex> guard(raftLock_);
lastApplyLogId_ = lastApplyId;
persist(committedLogId_, term_, lastApplyLogId_);
VLOG(2) << idStr_ << "Listener succeeded apply log to " << lastApplyLogId_;
lastApplyTime_ = time::WallClock::fastNowInMilliSec();
}
}

std::tuple<nebula::cpp2::ErrorCode, int64_t, int64_t> ESListener::commitSnapshot(
const std::vector<std::string>& rows,
LogID committedLogId,
TermID committedLogTerm,
bool finished) {
VLOG(2) << idStr_ << "Listener is committing snapshot.";
int64_t count = 0;
int64_t size = 0;
std::vector<KV> data;
data.reserve(rows.size());
for (const auto& row : rows) {
count++;
size += row.size();
auto kv = decodeKV(row);
data.emplace_back(kv.first, kv.second);
}
if (!apply(data)) {
LOG(INFO) << idStr_ << "Failed to apply data while committing snapshot.";
return {
nebula::cpp2::ErrorCode::E_RAFT_PERSIST_SNAPSHOT_FAILED, kNoSnapshotCount, kNoSnapshotSize};
}
if (finished) {
CHECK(!raftLock_.try_lock());
leaderCommitId_ = committedLogId;
lastApplyLogId_ = committedLogId;
persist(committedLogId, committedLogTerm, lastApplyLogId_);
lastApplyTime_ = time::WallClock::fastNowInMilliSec();
LOG(INFO) << folly::sformat(
"Commit snapshot to : committedLogId={},"
"committedLogTerm={}, lastApplyLogId_={}",
committedLogId,
committedLogTerm,
lastApplyLogId_);
}
return {nebula::cpp2::ErrorCode::SUCCEEDED, count, size};
}

} // namespace kvstore
} // namespace nebula
10 changes: 9 additions & 1 deletion src/kvstore/plugins/elasticsearch/ESListener.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class ESListener : public Listener {
* @param data Key/value to apply
* @return True if succeed. False if failed.
*/
bool apply(const std::vector<KV>& data) override;
bool apply(const std::vector<KV>& data);

/**
* @brief Persist commitLogId commitLogTerm and lastApplyLogId
Expand All @@ -92,6 +92,14 @@ class ESListener : public Listener {
*/
LogID lastApplyLogId() override;

void processLogs() override;

std::tuple<nebula::cpp2::ErrorCode, int64_t, int64_t> commitSnapshot(
const std::vector<std::string>& data,
LogID committedLogId,
TermID committedLogTerm,
bool finished) override;

private:
/**
* @brief Write last commit id, last commit term, last apply id to a file
Expand Down
Loading

0 comments on commit b251e2f

Please sign in to comment.