Skip to content

Commit

Permalink
remove unrelated topo listener
Browse files Browse the repository at this point in the history
  • Loading branch information
critical27 committed Nov 24, 2022
1 parent 06dee4f commit bf36c33
Show file tree
Hide file tree
Showing 15 changed files with 14 additions and 585 deletions.
20 changes: 0 additions & 20 deletions src/common/utils/NebulaKeyUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,24 +323,4 @@ std::string NebulaKeyUtils::dataVersionValue() {
return "3.0";
}

std::string NebulaKeyUtils::updatePartIdTagKey(PartitionID newPartId, const std::string& rawKey) {
int32_t item = (newPartId << kPartitionOffset) | static_cast<uint32_t>(NebulaKeyType::kTag_);

std::string key;
key.reserve(rawKey.size());
key.append(reinterpret_cast<const char*>(&item), sizeof(int32_t))
.append(rawKey.data() + sizeof(int32_t), rawKey.size() - sizeof(int32_t));
return key;
}

std::string NebulaKeyUtils::updatePartIdEdgeKey(PartitionID newPartId, const std::string& rawKey) {
int32_t item = (newPartId << kPartitionOffset) | static_cast<uint32_t>(NebulaKeyType::kEdge);

std::string key;
key.reserve(rawKey.size());
key.append(reinterpret_cast<const char*>(&item), sizeof(int32_t))
.append(rawKey.data() + sizeof(int32_t), rawKey.size() - sizeof(int32_t));
return key;
}

} // namespace nebula
4 changes: 0 additions & 4 deletions src/common/utils/NebulaKeyUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -263,10 +263,6 @@ class NebulaKeyUtils final {
return rawKey.subpiece(0, rawKey.size() - sizeof(EdgeVerPlaceHolder));
}

static std::string updatePartIdTagKey(PartitionID newPartId, const std::string& rawKey);

static std::string updatePartIdEdgeKey(PartitionID newPartId, const std::string& rawKey);

/**
* @brief gen edge key from lock, this will used at resume
* if enableMvcc ver of edge and lock will be same,
Expand Down
5 changes: 2 additions & 3 deletions src/interface/meta.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -898,9 +898,8 @@ struct ListZonesResp {
}

enum ListenerType {
UNKNOWN = 0x00,
ELASTICSEARCH = 0x01,
GRAPH_TOPOLOGY = 0x02,
UNKNOWN = 0x00,
ELASTICSEARCH = 0x01,
} (cpp.enum_strict)

struct AddListenerReq {
Expand Down
1 change: 0 additions & 1 deletion src/kvstore/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ nebula_add_library(
NebulaSnapshotManager.cpp
RateLimiter.cpp
plugins/elasticsearch/ESListener.cpp
plugins/topology/TopoListener.cpp
)

nebula_add_library(
Expand Down
53 changes: 11 additions & 42 deletions src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
#include "kvstore/NebulaSnapshotManager.h"
#include "kvstore/RocksEngine.h"
#include "kvstore/plugins/elasticsearch/ESListener.h"
#include "kvstore/plugins/topology/TopoListener.h"

DEFINE_string(engine_type, "rocksdb", "rocksdb, memory...");
DEFINE_int32(custom_filter_interval_secs,
Expand Down Expand Up @@ -373,9 +372,8 @@ void NebulaStore::addSpace(GraphSpaceID spaceId) {
bool engineExist = false;
auto dataPath = folly::stringPrintf("%s/nebula/%d", path.c_str(), spaceId);
// Check if given data path contain a kv engine of specified spaceId
for (auto iter = spaces_[spaceId]->engines_.begin();
iter != spaces_[spaceId]->engines_.end();
iter++) {
for (auto iter = spaces_[spaceId]->engines_.begin(); iter != spaces_[spaceId]->engines_.end();
iter++) {
auto dPath = (*iter)->getDataRoot();
if (dataPath.compare(dPath) == 0) {
engineExist = true;
Expand Down Expand Up @@ -568,7 +566,7 @@ void NebulaStore::removePart(GraphSpaceID spaceId, PartitionID partId, bool need
}

void NebulaStore::addListenerSpace(GraphSpaceID spaceId, meta::cpp2::ListenerType type) {
// doodle: SpaceListenerInfo need to be a map of <ListenerType -> pair of <partId, peers>> as well
UNUSED(type);
folly::RWSpinLock::WriteHolder wh(&lock_);
// listener don't need engine for now
if (this->spaceListeners_.find(spaceId) != this->spaceListeners_.end()) {
Expand All @@ -577,17 +575,10 @@ void NebulaStore::addListenerSpace(GraphSpaceID spaceId, meta::cpp2::ListenerTyp
}
LOG(INFO) << "Create listener space " << spaceId;
this->spaceListeners_[spaceId] = std::make_unique<SpaceListenerInfo>();
// doodle: open a rocksdb here
if (type == meta::cpp2::ListenerType::GRAPH_TOPOLOGY) {
this->spaceListeners_[spaceId]->engine_ =
newEngine(spaceId, options_.listenerPath_, options_.walPath_);
}
}

void NebulaStore::removeListenerSpace(GraphSpaceID spaceId, meta::cpp2::ListenerType type) {
// doodle: SpaceListenerInfo need to be a map of <ListenerType -> pair of <partId, peers>> as well
UNUSED(type);

folly::RWSpinLock::WriteHolder wh(&lock_);
auto spaceIt = this->spaceListeners_.find(spaceId);
if (spaceIt != this->spaceListeners_.end()) {
Expand Down Expand Up @@ -636,17 +627,6 @@ std::shared_ptr<Listener> NebulaStore::newListener(GraphSpaceID spaceId,
if (type == meta::cpp2::ListenerType::ELASTICSEARCH) {
listener = std::make_shared<ESListener>(
spaceId, partId, raftAddr_, walPath, ioPool_, bgWorkers_, workers_, options_.schemaMan_);
} else if (type == meta::cpp2::ListenerType::GRAPH_TOPOLOGY) {
auto engine = spaceListeners_[spaceId]->engine_.get();
listener = std::make_shared<TopoListener>(spaceId,
partId,
raftAddr_,
walPath,
ioPool_,
bgWorkers_,
workers_,
engine,
getSpaceVidLen(spaceId));
} else {
LOG(FATAL) << "Should not reach here";
return nullptr;
Expand Down Expand Up @@ -825,26 +805,15 @@ nebula::cpp2::ErrorCode NebulaStore::prefix(GraphSpaceID spaceId,
std::unique_ptr<KVIterator>* iter,
bool canReadFromFollower,
const void* snapshot) {
if (!isListener()) {
auto ret = part(spaceId, partId);
if (!ok(ret)) {
return error(ret);
}
auto part = nebula::value(ret);
if (!checkLeader(part, canReadFromFollower)) {
return nebula::cpp2::ErrorCode::E_LEADER_CHANGED;
}
return part->engine()->prefix(prefix, iter, snapshot);
} else {
// doodle: for simplicity, we just use the engine in spaceListeners_, and we don't check whether
// part exists or type of listener is topo or not
auto spaceIt = spaceListeners_.find(spaceId);
if (spaceIt == spaceListeners_.end()) {
return nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND;
}
auto engine = spaceIt->second->engine_.get();
return engine->prefix(prefix, iter, snapshot);
auto ret = part(spaceId, partId);
if (!ok(ret)) {
return error(ret);
}
auto part = nebula::value(ret);
if (!checkLeader(part, canReadFromFollower)) {
return nebula::cpp2::ErrorCode::E_LEADER_CHANGED;
}
return part->engine()->prefix(prefix, iter, snapshot);
}

nebula::cpp2::ErrorCode NebulaStore::rangeWithPrefix(GraphSpaceID spaceId,
Expand Down
2 changes: 0 additions & 2 deletions src/kvstore/NebulaStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ struct SpacePartInfo {

struct SpaceListenerInfo {
std::unordered_map<PartitionID, ListenerMap> listeners_;
std::unique_ptr<KVEngine> engine_;
};

/**
Expand All @@ -57,7 +56,6 @@ class NebulaStore : public KVStore, public Handler {
FRIEND_TEST(NebulaStoreTest, ThreeCopiesCheckpointTest);
FRIEND_TEST(NebulaStoreTest, RemoveInvalidSpaceTest);
friend class ListenerBasicTest;
friend class TopoListenerTest;

public:
/**
Expand Down
2 changes: 0 additions & 2 deletions src/kvstore/PartManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ class Handler {
*/
virtual void removePart(GraphSpaceID spaceId, PartitionID partId, bool needLock = true) = 0;


/**
* @brief Add a space to listener
*
Expand Down Expand Up @@ -242,7 +241,6 @@ class MemPartManager final : public PartManager {
FRIEND_TEST(NebulaStoreTest, RemoveInvalidSpaceTest);
FRIEND_TEST(NebulaStoreTest, BackupRestoreTest);
friend class ListenerBasicTest;
friend class TopoListenerTest;

public:
MemPartManager() = default;
Expand Down
82 changes: 0 additions & 82 deletions src/kvstore/plugins/topology/TopoListener.cpp

This file was deleted.

57 changes: 0 additions & 57 deletions src/kvstore/plugins/topology/TopoListener.h

This file was deleted.

15 changes: 0 additions & 15 deletions src/kvstore/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -137,21 +137,6 @@ nebula_add_test(
gtest
)

nebula_add_test(
NAME
topo_listener_test
SOURCES
TopoListenerTest.cpp
OBJECTS
${KVSTORE_TEST_LIBS}
LIBRARIES
${THRIFT_LIBRARIES}
${ROCKSDB_LIBRARIES}
${PROXYGEN_LIBRARIES}
wangle
gtest
)

nebula_add_test(
NAME
disk_man_test
Expand Down
Loading

0 comments on commit bf36c33

Please sign in to comment.