diff --git a/src/kvstore/Listener.cpp b/src/kvstore/Listener.cpp index b07499e0b58..bde1a192261 100644 --- a/src/kvstore/Listener.cpp +++ b/src/kvstore/Listener.cpp @@ -10,7 +10,6 @@ #include "kvstore/LogEncoder.h" DEFINE_int32(listener_commit_interval_secs, 1, "Listener commit interval"); -DEFINE_int32(listener_commit_batch_size, 1000, "Max batch size when listener commit"); DEFINE_uint32(ft_request_retry_times, 3, "Retry times if fulltext request failed"); DEFINE_int32(ft_bulk_batch_size, 100, "Max batch size when bulk insert"); DEFINE_int32(listener_pursue_leader_threshold, 1000, "Catch up with the leader's threshold"); @@ -161,123 +160,7 @@ void Listener::doApply() { }); } -void Listener::processLogs() { - std::unique_ptr iter; - { - std::lock_guard guard(raftLock_); - if (lastApplyLogId_ >= committedLogId_) { - return; - } - iter = wal_->iterator(lastApplyLogId_ + 1, committedLogId_); - } - - LogID lastApplyId = -1; - // the kv pair which can sync to remote safely - std::vector data; - while (iter->valid()) { - lastApplyId = iter->logId(); - - auto log = iter->logMsg(); - if (log.empty()) { - // skip the heartbeat - ++(*iter); - continue; - } - - DCHECK_GE(log.size(), sizeof(int64_t) + 1 + sizeof(uint32_t)); - switch (log[sizeof(int64_t)]) { - case OP_PUT: { - auto pieces = decodeMultiValues(log); - DCHECK_EQ(2, pieces.size()); - data.emplace_back(pieces[0], pieces[1]); - break; - } - case OP_MULTI_PUT: { - auto kvs = decodeMultiValues(log); - DCHECK_EQ(0, kvs.size() % 2); - for (size_t i = 0; i < kvs.size(); i += 2) { - data.emplace_back(kvs[i], kvs[i + 1]); - } - break; - } - case OP_REMOVE: - case OP_REMOVE_RANGE: - case OP_MULTI_REMOVE: { - break; - } - case OP_BATCH_WRITE: { - auto batch = decodeBatchValue(log); - for (auto& op : batch) { - // OP_BATCH_REMOVE and OP_BATCH_REMOVE_RANGE is igored - if (op.first == BatchLogType::OP_BATCH_PUT) { - data.emplace_back(op.second.first, op.second.second); - } - } - break; - } - case OP_TRANS_LEADER: - case OP_ADD_LEARNER: - case OP_ADD_PEER: - case OP_REMOVE_PEER: { - break; - } - default: { - VLOG(2) << idStr_ << "Unknown operation: " << static_cast(log[0]); - } - } - if (static_cast(data.size()) > FLAGS_listener_commit_batch_size) { - break; - } - ++(*iter); - } - - // apply to state machine - if (lastApplyId != -1 && apply(data)) { - std::lock_guard guard(raftLock_); - lastApplyLogId_ = lastApplyId; - persist(committedLogId_, term_, lastApplyLogId_); - VLOG(2) << idStr_ << "Listener succeeded apply log to " << lastApplyLogId_; - lastApplyTime_ = time::WallClock::fastNowInMilliSec(); - } -} - -std::tuple Listener::commitSnapshot( - const std::vector& rows, - LogID committedLogId, - TermID committedLogTerm, - bool finished) { - VLOG(2) << idStr_ << "Listener is committing snapshot."; - int64_t count = 0; - int64_t size = 0; - std::vector data; - data.reserve(rows.size()); - for (const auto& row : rows) { - count++; - size += row.size(); - auto kv = decodeKV(row); - data.emplace_back(kv.first, kv.second); - } - if (!apply(data)) { - LOG(INFO) << idStr_ << "Failed to apply data while committing snapshot."; - return { - nebula::cpp2::ErrorCode::E_RAFT_PERSIST_SNAPSHOT_FAILED, kNoSnapshotCount, kNoSnapshotSize}; - } - if (finished) { - CHECK(!raftLock_.try_lock()); - leaderCommitId_ = committedLogId; - lastApplyLogId_ = committedLogId; - persist(committedLogId, committedLogTerm, lastApplyLogId_); - lastApplyTime_ = time::WallClock::fastNowInMilliSec(); - LOG(INFO) << folly::sformat( - "Commit snapshot to : committedLogId={}," - "committedLogTerm={}, lastApplyLogId_={}", - committedLogId, - committedLogTerm, - lastApplyLogId_); - } - return {nebula::cpp2::ErrorCode::SUCCEEDED, count, size}; -} void Listener::resetListener() { std::lock_guard g(raftLock_); diff --git a/src/kvstore/Listener.h b/src/kvstore/Listener.h index 6f3f994543c..dfdb6899366 100644 --- a/src/kvstore/Listener.h +++ b/src/kvstore/Listener.h @@ -9,6 +9,7 @@ #include "common/base/Base.h" #include "common/meta/SchemaManager.h" #include "kvstore/Common.h" +#include "kvstore/LogEncoder.h" #include "kvstore/raftex/Host.h" #include "kvstore/raftex/RaftPart.h" #include "kvstore/wal/FileBasedWal.h" @@ -180,14 +181,6 @@ class Listener : public raftex::RaftPart { */ virtual LogID lastApplyLogId() = 0; - /** - * @brief Apply data into listener's state machine - * - * @param data Key/value to apply - * @return True if succeed. False if failed. - */ - virtual bool apply(const std::vector& data) = 0; - /** * @brief Persist commitLogId commitLogTerm and lastApplyLogId */ @@ -272,31 +265,13 @@ class Listener : public raftex::RaftPart { ClusterID clusterId, folly::StringPiece log) override; - /** - * @brief If the listener falls behind way to much than leader, the leader will send all its data - * in snapshot by batch, listener need to implement this method to apply the batch to state - * machine. The return value is a pair of of this batch. - * - * @param data Data to apply - * @param committedLogId Commit log id of snapshot - * @param committedLogTerm Commit log term of snapshot - * @param finished Whether spapshot is finished - * @return std::tuple Return {ok, count, size} if - * succeed, else return {errorcode, -1, -1} - */ - std::tuple commitSnapshot( - const std::vector& data, - LogID committedLogId, - TermID committedLogTerm, - bool finished) override; - /** * @brief Background job thread will trigger doApply to apply data into state machine periodically */ void doApply(); // Process logs and then call apply to execute - virtual void processLogs(); + virtual void processLogs() = 0; protected: LogID leaderCommitId_ = 0; diff --git a/src/kvstore/plugins/elasticsearch/ESListener.cpp b/src/kvstore/plugins/elasticsearch/ESListener.cpp index a4cacbc2196..05121ea196b 100644 --- a/src/kvstore/plugins/elasticsearch/ESListener.cpp +++ b/src/kvstore/plugins/elasticsearch/ESListener.cpp @@ -10,6 +10,7 @@ DECLARE_uint32(ft_request_retry_times); DECLARE_int32(ft_bulk_batch_size); +DEFINE_int32(listener_commit_batch_size, 1000, "Max batch size when listener commit"); namespace nebula { namespace kvstore { @@ -244,5 +245,122 @@ bool ESListener::writeDatum(const std::vector& items) c return true; } +void ESListener::processLogs() { + std::unique_ptr iter; + { + std::lock_guard guard(raftLock_); + if (lastApplyLogId_ >= committedLogId_) { + return; + } + iter = wal_->iterator(lastApplyLogId_ + 1, committedLogId_); + } + + LogID lastApplyId = -1; + // the kv pair which can sync to remote safely + std::vector data; + while (iter->valid()) { + lastApplyId = iter->logId(); + + auto log = iter->logMsg(); + if (log.empty()) { + // skip the heartbeat + ++(*iter); + continue; + } + + DCHECK_GE(log.size(), sizeof(int64_t) + 1 + sizeof(uint32_t)); + switch (log[sizeof(int64_t)]) { + case OP_PUT: { + auto pieces = decodeMultiValues(log); + DCHECK_EQ(2, pieces.size()); + data.emplace_back(pieces[0], pieces[1]); + break; + } + case OP_MULTI_PUT: { + auto kvs = decodeMultiValues(log); + DCHECK_EQ(0, kvs.size() % 2); + for (size_t i = 0; i < kvs.size(); i += 2) { + data.emplace_back(kvs[i], kvs[i + 1]); + } + break; + } + case OP_REMOVE: + case OP_REMOVE_RANGE: + case OP_MULTI_REMOVE: { + break; + } + case OP_BATCH_WRITE: { + auto batch = decodeBatchValue(log); + for (auto& op : batch) { + // OP_BATCH_REMOVE and OP_BATCH_REMOVE_RANGE is igored + if (op.first == BatchLogType::OP_BATCH_PUT) { + data.emplace_back(op.second.first, op.second.second); + } + } + break; + } + case OP_TRANS_LEADER: + case OP_ADD_LEARNER: + case OP_ADD_PEER: + case OP_REMOVE_PEER: { + break; + } + default: { + VLOG(2) << idStr_ << "Unknown operation: " << static_cast(log[0]); + } + } + + if (static_cast(data.size()) > FLAGS_listener_commit_batch_size) { + break; + } + ++(*iter); + } + // apply to state machine + if (lastApplyId != -1 && apply(data)) { + std::lock_guard guard(raftLock_); + lastApplyLogId_ = lastApplyId; + persist(committedLogId_, term_, lastApplyLogId_); + VLOG(2) << idStr_ << "Listener succeeded apply log to " << lastApplyLogId_; + lastApplyTime_ = time::WallClock::fastNowInMilliSec(); + } +} + +std::tuple ESListener::commitSnapshot( + const std::vector& rows, + LogID committedLogId, + TermID committedLogTerm, + bool finished) { + VLOG(2) << idStr_ << "Listener is committing snapshot."; + int64_t count = 0; + int64_t size = 0; + std::vector data; + data.reserve(rows.size()); + for (const auto& row : rows) { + count++; + size += row.size(); + auto kv = decodeKV(row); + data.emplace_back(kv.first, kv.second); + } + if (!apply(data)) { + LOG(INFO) << idStr_ << "Failed to apply data while committing snapshot."; + return { + nebula::cpp2::ErrorCode::E_RAFT_PERSIST_SNAPSHOT_FAILED, kNoSnapshotCount, kNoSnapshotSize}; + } + if (finished) { + CHECK(!raftLock_.try_lock()); + leaderCommitId_ = committedLogId; + lastApplyLogId_ = committedLogId; + persist(committedLogId, committedLogTerm, lastApplyLogId_); + lastApplyTime_ = time::WallClock::fastNowInMilliSec(); + LOG(INFO) << folly::sformat( + "Commit snapshot to : committedLogId={}," + "committedLogTerm={}, lastApplyLogId_={}", + committedLogId, + committedLogTerm, + lastApplyLogId_); + } + return {nebula::cpp2::ErrorCode::SUCCEEDED, count, size}; +} + } // namespace kvstore } // namespace nebula diff --git a/src/kvstore/plugins/elasticsearch/ESListener.h b/src/kvstore/plugins/elasticsearch/ESListener.h index 77fa8803d0f..0b0819af695 100644 --- a/src/kvstore/plugins/elasticsearch/ESListener.h +++ b/src/kvstore/plugins/elasticsearch/ESListener.h @@ -71,7 +71,7 @@ class ESListener : public Listener { * @param data Key/value to apply * @return True if succeed. False if failed. */ - bool apply(const std::vector& data) override; + bool apply(const std::vector& data); /** * @brief Persist commitLogId commitLogTerm and lastApplyLogId @@ -92,6 +92,14 @@ class ESListener : public Listener { */ LogID lastApplyLogId() override; + void processLogs() override; + + std::tuple commitSnapshot( + const std::vector& data, + LogID committedLogId, + TermID committedLogTerm, + bool finished) override; + private: /** * @brief Write last commit id, last commit term, last apply id to a file diff --git a/src/kvstore/test/NebulaListenerTest.cpp b/src/kvstore/test/NebulaListenerTest.cpp index 54d8ab7e452..7da68d8f467 100644 --- a/src/kvstore/test/NebulaListenerTest.cpp +++ b/src/kvstore/test/NebulaListenerTest.cpp @@ -58,12 +58,45 @@ class DummyListener : public Listener { return data_; } - std::tuple commitSnapshot(const std::vector& data, + std::tuple commitSnapshot(const std::vector& rows, LogID committedLogId, TermID committedLogTerm, bool finished) override { bool unl = raftLock_.try_lock(); - auto result = Listener::commitSnapshot(data, committedLogId, committedLogTerm, finished); + VLOG(2) << idStr_ << "Listener is committing snapshot."; + int64_t count = 0; + int64_t size = 0; + std::tuple result{ + nebula::cpp2::ErrorCode::SUCCEEDED, count, size}; + std::vector data; + data.reserve(rows.size()); + for (const auto& row : rows) { + count++; + size += row.size(); + auto kv = decodeKV(row); + data.emplace_back(kv.first, kv.second); + } + if (!apply(data)) { + LOG(INFO) << idStr_ << "Failed to apply data while committing snapshot."; + result = {nebula::cpp2::ErrorCode::E_RAFT_PERSIST_SNAPSHOT_FAILED, + kNoSnapshotCount, + kNoSnapshotSize}; + } else { + if (finished) { + CHECK(!raftLock_.try_lock()); + leaderCommitId_ = committedLogId; + lastApplyLogId_ = committedLogId; + persist(committedLogId, committedLogTerm, lastApplyLogId_); + lastApplyTime_ = time::WallClock::fastNowInMilliSec(); + LOG(INFO) << folly::sformat( + "Commit snapshot to : committedLogId={}," + "committedLogTerm={}, lastApplyLogId_={}", + committedLogId, + committedLogTerm, + lastApplyLogId_); + } + result = {nebula::cpp2::ErrorCode::SUCCEEDED, count, size}; + } if (unl) { raftLock_.unlock(); } @@ -85,10 +118,86 @@ class DummyListener : public Listener { return snapshotBatchCount_; } + void processLogs() override { + std::unique_ptr iter; + { + std::lock_guard guard(raftLock_); + if (lastApplyLogId_ >= committedLogId_) { + return; + } + iter = wal_->iterator(lastApplyLogId_ + 1, committedLogId_); + } + + LogID lastApplyId = -1; + // the kv pair which can sync to remote safely + std::vector data; + while (iter->valid()) { + lastApplyId = iter->logId(); + + auto log = iter->logMsg(); + if (log.empty()) { + // skip the heartbeat + ++(*iter); + continue; + } + + DCHECK_GE(log.size(), sizeof(int64_t) + 1 + sizeof(uint32_t)); + switch (log[sizeof(int64_t)]) { + case OP_PUT: { + auto pieces = decodeMultiValues(log); + DCHECK_EQ(2, pieces.size()); + data.emplace_back(pieces[0], pieces[1]); + break; + } + case OP_MULTI_PUT: { + auto kvs = decodeMultiValues(log); + DCHECK_EQ(0, kvs.size() % 2); + for (size_t i = 0; i < kvs.size(); i += 2) { + data.emplace_back(kvs[i], kvs[i + 1]); + } + break; + } + case OP_REMOVE: + case OP_REMOVE_RANGE: + case OP_MULTI_REMOVE: { + break; + } + case OP_BATCH_WRITE: { + auto batch = decodeBatchValue(log); + for (auto& op : batch) { + // OP_BATCH_REMOVE and OP_BATCH_REMOVE_RANGE is igored + if (op.first == BatchLogType::OP_BATCH_PUT) { + data.emplace_back(op.second.first, op.second.second); + } + } + break; + } + case OP_TRANS_LEADER: + case OP_ADD_LEARNER: + case OP_ADD_PEER: + case OP_REMOVE_PEER: { + break; + } + default: { + VLOG(2) << idStr_ << "Unknown operation: " << static_cast(log[0]); + } + } + ++(*iter); + } + // apply to state machine + if (lastApplyId != -1 && apply(data)) { + std::lock_guard guard(raftLock_); + lastApplyLogId_ = lastApplyId; + persist(committedLogId_, term_, lastApplyLogId_); + VLOG(2) << idStr_ << "Listener succeeded apply log to " << lastApplyLogId_; + lastApplyTime_ = time::WallClock::fastNowInMilliSec(); + } + } + protected: void init() override {} - bool apply(const std::vector& kvs) override { + bool apply(const std::vector& kvs) { for (const auto& kv : kvs) { data_.emplace_back(kv); }