Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor listener related interface and data structure #4927

Merged
merged 2 commits into from
Nov 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 67 additions & 57 deletions src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1063,86 +1063,96 @@ void MetaClient::listenerDiff(const LocalCache& oldCache, const LocalCache& newC
return;
}

VLOG(1) << "Let's check if any listeners parts added for " << options_.localHost_;
for (auto& spaceEntry : newMap) {
auto spaceId = spaceEntry.first;
VLOG(1) << "Let's check if any listeners is updated for " << options_.localHost_;
for (auto& [spaceId, typeMap] : newMap) {
auto oldSpaceIter = oldMap.find(spaceId);
if (oldSpaceIter == oldMap.end()) {
// new space is added
VLOG(1) << "[Listener] SpaceId " << spaceId << " was added!";
listener_->onSpaceAdded(spaceId, true);
for (const auto& partEntry : spaceEntry.second) {
auto partId = partEntry.first;
for (const auto& info : partEntry.second) {
VLOG(1) << "[Listener] SpaceId " << spaceId << ", partId " << partId << " was added!";
listener_->onListenerAdded(spaceId, partId, info);
// create all type of listener when new space listener added
for (const auto& [type, listenerParts] : typeMap) {
VLOG(1) << "[Listener] SpaceId " << spaceId << " was added, type is "
<< apache::thrift::util::enumNameSafe(type);
listener_->onListenerSpaceAdded(spaceId, type);
for (const auto& newListener : listenerParts) {
VLOG(1) << "[Listener] SpaceId " << spaceId << ", partId " << newListener.partId_
<< " was added, type is " << apache::thrift::util::enumNameSafe(type);
listener_->onListenerPartAdded(spaceId, newListener.partId_, type, newListener.peers_);
}
}
} else {
// check if new part listener is added
for (auto& partEntry : spaceEntry.second) {
auto partId = partEntry.first;
auto oldPartIter = oldSpaceIter->second.find(partId);
if (oldPartIter == oldSpaceIter->second.end()) {
for (const auto& info : partEntry.second) {
VLOG(1) << "[Listener] SpaceId " << spaceId << ", partId " << partId << " was added!";
listener_->onListenerAdded(spaceId, partId, info);
for (auto& [type, listenerParts] : typeMap) {
auto oldTypeIter = oldSpaceIter->second.find(type);
// create missing type of listener when new type of listener added
if (oldTypeIter == oldSpaceIter->second.end()) {
VLOG(1) << "[Listener] SpaceId " << spaceId << " was added, type is "
<< apache::thrift::util::enumNameSafe(type);
listener_->onListenerSpaceAdded(spaceId, type);
for (const auto& newListener : listenerParts) {
VLOG(1) << "[Listener] SpaceId " << spaceId << ", partId " << newListener.partId_
<< " was added, type is " << apache::thrift::util::enumNameSafe(type);
listener_->onListenerPartAdded(spaceId, newListener.partId_, type, newListener.peers_);
}
} else {
std::sort(partEntry.second.begin(), partEntry.second.end());
std::sort(oldPartIter->second.begin(), oldPartIter->second.end());
// create missing part of listener of specified type
std::sort(listenerParts.begin(), listenerParts.end());
std::sort(oldTypeIter->second.begin(), oldTypeIter->second.end());
std::vector<ListenerHosts> diff;
std::set_difference(partEntry.second.begin(),
partEntry.second.end(),
oldPartIter->second.begin(),
oldPartIter->second.end(),
std::set_difference(listenerParts.begin(),
listenerParts.end(),
oldTypeIter->second.begin(),
oldTypeIter->second.end(),
std::back_inserter(diff));
for (const auto& info : diff) {
VLOG(1) << "[Listener] SpaceId " << spaceId << ", partId " << partId << " was added!";
listener_->onListenerAdded(spaceId, partId, info);
for (const auto& newListener : diff) {
VLOG(1) << "[Listener] SpaceId " << spaceId << ", partId " << newListener.partId_
<< " was added, type is " << apache::thrift::util::enumNameSafe(type);
listener_->onListenerPartAdded(spaceId, newListener.partId_, type, newListener.peers_);
}
}
}
}
}

VLOG(1) << "Let's check if any old listeners removed....";
for (auto& spaceEntry : oldMap) {
auto spaceId = spaceEntry.first;
VLOG(1) << "Let's check if any listeners is removed from " << options_.localHost_;
for (auto& [spaceId, typeMap] : oldMap) {
auto newSpaceIter = newMap.find(spaceId);
if (newSpaceIter == newMap.end()) {
// remove old space
for (const auto& partEntry : spaceEntry.second) {
auto partId = partEntry.first;
for (const auto& info : partEntry.second) {
VLOG(1) << "SpaceId " << spaceId << ", partId " << partId << " was removed!";
listener_->onListenerRemoved(spaceId, partId, info.type_);
// remove all type of listener when space listener removed
for (const auto& [type, listenerParts] : typeMap) {
for (const auto& outdateListener : listenerParts) {
VLOG(1) << "SpaceId " << spaceId << ", partId " << outdateListener.partId_
<< " was removed, type is " << apache::thrift::util::enumNameSafe(type);
listener_->onListenerPartRemoved(spaceId, outdateListener.partId_, type);
}
listener_->onListenerSpaceRemoved(spaceId, type);
VLOG(1) << "[Listener] SpaceId " << spaceId << " was removed, type is "
<< apache::thrift::util::enumNameSafe(type);
}
listener_->onSpaceRemoved(spaceId, true);
VLOG(1) << "[Listener] SpaceId " << spaceId << " was removed!";
} else {
// check if part listener is removed
for (auto& partEntry : spaceEntry.second) {
auto partId = partEntry.first;
auto newPartIter = newSpaceIter->second.find(partId);
if (newPartIter == newSpaceIter->second.end()) {
for (const auto& info : partEntry.second) {
VLOG(1) << "[Listener] SpaceId " << spaceId << ", partId " << partId << " was removed!";
listener_->onListenerRemoved(spaceId, partId, info.type_);
for (auto& [type, listenerParts] : typeMap) {
auto newTypeIter = newSpaceIter->second.find(type);
// remove specified type of listener
if (newTypeIter == newSpaceIter->second.end()) {
for (const auto& outdateListener : listenerParts) {
VLOG(1) << "[Listener] SpaceId " << spaceId << ", partId " << outdateListener.partId_
<< " was removed!";
listener_->onListenerPartRemoved(spaceId, outdateListener.partId_, type);
}
listener_->onListenerSpaceRemoved(spaceId, type);
VLOG(1) << "[Listener] SpaceId " << spaceId << " was removed, type is "
<< apache::thrift::util::enumNameSafe(type);
} else {
std::sort(partEntry.second.begin(), partEntry.second.end());
std::sort(newPartIter->second.begin(), newPartIter->second.end());
// remove outdate part of listener of specified type
std::sort(listenerParts.begin(), listenerParts.end());
std::sort(newTypeIter->second.begin(), newTypeIter->second.end());
std::vector<ListenerHosts> diff;
std::set_difference(partEntry.second.begin(),
partEntry.second.end(),
newPartIter->second.begin(),
newPartIter->second.end(),
std::set_difference(listenerParts.begin(),
listenerParts.end(),
newTypeIter->second.begin(),
newTypeIter->second.end(),
std::back_inserter(diff));
for (const auto& info : diff) {
VLOG(1) << "[Listener] SpaceId " << spaceId << ", partId " << partId << " was removed!";
listener_->onListenerRemoved(spaceId, partId, info.type_);
for (const auto& outdateListener : diff) {
VLOG(1) << "[Listener] SpaceId " << spaceId << ", partId " << outdateListener.partId_
<< " was removed!";
listener_->onListenerPartRemoved(spaceId, outdateListener.partId_, type);
}
}
}
Expand Down Expand Up @@ -2929,7 +2939,7 @@ ListenersMap MetaClient::doGetListenersMap(const HostAddr& host, const LocalCach
auto partIter = space.second->partsAlloc_.find(partId);
if (partIter != space.second->partsAlloc_.end()) {
auto peers = partIter->second;
listenersMap[spaceId][partId].emplace_back(std::move(type), std::move(peers));
listenersMap[spaceId][type].emplace_back(partId, std::move(peers));
} else {
FLOG_WARN("%s has listener of [%d, %d], but can't find part peers",
host.toString().c_str(),
Expand Down
19 changes: 11 additions & 8 deletions src/clients/meta/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,8 @@ class MetaChangedListener {
public:
virtual ~MetaChangedListener() = default;

virtual void onSpaceAdded(GraphSpaceID spaceId, bool isListener = false) = 0;
virtual void onSpaceRemoved(GraphSpaceID spaceId, bool isListener = false) = 0;
virtual void onSpaceAdded(GraphSpaceID spaceId) = 0;
virtual void onSpaceRemoved(GraphSpaceID spaceId) = 0;
virtual void onSpaceOptionUpdated(
GraphSpaceID spaceId, const std::unordered_map<std::string, std::string>& options) = 0;
virtual void onPartAdded(const PartHosts& partHosts) = 0;
Expand All @@ -169,12 +169,15 @@ class MetaChangedListener {
virtual void fetchLeaderInfo(
std::unordered_map<GraphSpaceID, std::vector<cpp2::LeaderInfo>>& leaders) = 0;
virtual void fetchDiskParts(kvstore::SpaceDiskPartsMap& diskParts) = 0;
virtual void onListenerAdded(GraphSpaceID spaceId,
PartitionID partId,
const ListenerHosts& listenerHosts) = 0;
virtual void onListenerRemoved(GraphSpaceID spaceId,
PartitionID partId,
cpp2::ListenerType type) = 0;
virtual void onListenerSpaceAdded(GraphSpaceID spaceId, cpp2::ListenerType type) = 0;
virtual void onListenerSpaceRemoved(GraphSpaceID spaceId, cpp2::ListenerType type) = 0;
virtual void onListenerPartAdded(GraphSpaceID spaceId,
PartitionID partId,
cpp2::ListenerType type,
const std::vector<HostAddr>& peers) = 0;
virtual void onListenerPartRemoved(GraphSpaceID spaceId,
PartitionID partId,
cpp2::ListenerType type) = 0;
virtual void onCheckRemoteListeners(GraphSpaceID spaceId,
PartitionID partId,
const std::vector<HostAddr>& remoteListeners) = 0;
Expand Down
15 changes: 8 additions & 7 deletions src/common/meta/Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,29 +31,30 @@ struct PartHosts {

// ListenerHosts saves the listener type and the peers of the data replica
struct ListenerHosts {
ListenerHosts(cpp2::ListenerType type, std::vector<HostAddr> peers)
: type_(std::move(type)), peers_(std::move(peers)) {}
ListenerHosts(PartitionID partId, std::vector<HostAddr> peers)
: partId_(partId), peers_(std::move(peers)) {}

bool operator==(const ListenerHosts& rhs) const {
return this->type_ == rhs.type_ && this->peers_ == rhs.peers_;
return this->partId_ == rhs.partId_ && this->peers_ == rhs.peers_;
}

bool operator<(const ListenerHosts& rhs) const {
if (this->type_ == rhs.type_) {
if (this->partId_ == rhs.partId_) {
return this->peers_ < rhs.peers_;
}
return this->type_ < rhs.type_;
return this->partId_ < rhs.partId_;
}

cpp2::ListenerType type_;
PartitionID partId_;
// peers is the part peers which would send logs to the listener
std::vector<HostAddr> peers_;
};

using PartsMap = std::unordered_map<GraphSpaceID, std::unordered_map<PartitionID, PartHosts>>;
// ListenersMap is used for listener replica to get its peers of data replica
using ListenersMap =
std::unordered_map<GraphSpaceID, std::unordered_map<PartitionID, std::vector<ListenerHosts>>>;
std::unordered_map<GraphSpaceID,
std::unordered_map<cpp2::ListenerType, std::vector<ListenerHosts>>>;
critical27 marked this conversation as resolved.
Show resolved Hide resolved
// RemoteListenerInfo is pair of <listener host, listener type>
using RemoteListenerInfo = std::pair<HostAddr, cpp2::ListenerType>;
// RemoteListeners is used for data replica to check if some part has remote
Expand Down
6 changes: 2 additions & 4 deletions src/kvstore/Listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ Listener::Listener(GraphSpaceID spaceId,
std::shared_ptr<folly::Executor> handlers,
std::shared_ptr<raftex::SnapshotManager> snapshotMan,
std::shared_ptr<RaftClient> clientMan,
std::shared_ptr<DiskManager> diskMan,
meta::SchemaManager* schemaMan)
std::shared_ptr<DiskManager> diskMan)
: RaftPart(FLAGS_cluster_id,
spaceId,
partId,
Expand All @@ -38,8 +37,7 @@ Listener::Listener(GraphSpaceID spaceId,
handlers,
snapshotMan,
clientMan,
diskMan),
schemaMan_(schemaMan) {}
diskMan) {}

void Listener::start(std::vector<HostAddr>&& peers, bool) {
std::lock_guard<std::mutex> g(raftLock_);
Expand Down
4 changes: 1 addition & 3 deletions src/kvstore/Listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,7 @@ class Listener : public raftex::RaftPart {
std::shared_ptr<folly::Executor> handlers,
std::shared_ptr<raftex::SnapshotManager> snapshotMan,
std::shared_ptr<RaftClient> clientMan,
std::shared_ptr<DiskManager> diskMan,
meta::SchemaManager* schemaMan);
std::shared_ptr<DiskManager> diskMan);

/**
* @brief Initialize listener, all Listener must call this method
Expand Down Expand Up @@ -278,7 +277,6 @@ class Listener : public raftex::RaftPart {
LogID lastApplyLogId_ = 0;
int64_t lastApplyTime_ = 0;
std::set<HostAddr> peers_;
meta::SchemaManager* schemaMan_{nullptr};
};

} // namespace kvstore
Expand Down
39 changes: 0 additions & 39 deletions src/kvstore/ListenerFactory.h

This file was deleted.

Loading