Skip to content

Commit

Permalink
Storage support stop slow query (#2534)
Browse files Browse the repository at this point in the history
* add RequestCommon in GraphStorageService

* meta client load session

* meta client add checkIsPlanKilled

* add check kill

* stash kill query test

* add unittest

* improve
1. change E_PLAN_IS_KILLED error code
2. Replace RWLock with RCU
3. Replace std::set with folly::F14FastSet
4. check for every N rows

* rebase vesoft/master

* 1. format
2. change mod to bitand

* format

* fix memory leak

* address yixinglu's comment

* graphd passes the sessionid and planid parameters to storaged

Co-authored-by: Doodle <13706157+critical27@users.noreply.github.com>
  • Loading branch information
cangfengzhs and critical27 committed Sep 6, 2021
1 parent b5b6d89 commit 5905860
Show file tree
Hide file tree
Showing 36 changed files with 613 additions and 95 deletions.
82 changes: 69 additions & 13 deletions src/clients/meta/MetaClient.cpp
Expand Up @@ -30,17 +30,21 @@ DEFINE_int32(meta_client_retry_times, 3, "meta client retry times, 0 means no re
DEFINE_int32(meta_client_retry_interval_secs, 1, "meta client sleep interval between retry");
DEFINE_int32(meta_client_timeout_ms, 60 * 1000, "meta client timeout");
DEFINE_string(cluster_id_path, "cluster.id", "file path saved clusterId");

DEFINE_int32(check_plan_killed_frequency, 8, "check plan killed every 1<<n times");
namespace nebula {
namespace meta {

MetaClient::MetaClient(std::shared_ptr<folly::IOThreadPoolExecutor> ioThreadPool,
std::vector<HostAddr> addrs,
const MetaClientOptions& options)
: ioThreadPool_(ioThreadPool), addrs_(std::move(addrs)), options_(options) {
: ioThreadPool_(ioThreadPool),
addrs_(std::move(addrs)),
options_(options),
sessionMap_(new SessionMap{}),
killedPlans_(new folly::F14FastSet<std::pair<SessionID, ExecutionPlanID>>{}) {
CHECK(ioThreadPool_ != nullptr) << "IOThreadPool is required";
CHECK(!addrs_.empty()) << "No meta server address is specified or can be "
"solved. Meta server is required";
CHECK(!addrs_.empty())
<< "No meta server address is specified or can be solved. Meta server is required";
clientsMan_ = std::make_shared<thrift::ThriftClientManager<cpp2::MetaServiceAsyncClient>>();
updateActive();
updateLeader();
Expand All @@ -50,6 +54,8 @@ MetaClient::MetaClient(std::shared_ptr<folly::IOThreadPoolExecutor> ioThreadPool

MetaClient::~MetaClient() {
stop();
delete sessionMap_.load();
delete killedPlans_.load();
VLOG(3) << "~MetaClient";
}

Expand Down Expand Up @@ -182,6 +188,11 @@ bool MetaClient::loadData() {
return false;
}

if (!loadSessions()) {
LOG(ERROR) << "Load sessions Failed";
return false;
}

auto ret = listSpaces().get();
if (!ret.ok()) {
LOG(ERROR) << "List space failed, status:" << ret.status();
Expand Down Expand Up @@ -997,8 +1008,7 @@ void MetaClient::loadRemoteListeners() {
}
}

/// ================================== public methods
/// =================================
/// ================================== public methods =================================

PartitionID MetaClient::partId(int32_t numParts, const VertexID id) const {
// If the length of the id is 8, we will treat it as int64_t to be compatible
Expand Down Expand Up @@ -2859,17 +2869,15 @@ bool MetaClient::loadCfg() {
// only load current module's config is enough
auto ret = listConfigs(gflagsModule_).get();
if (ret.ok()) {
// if we load config from meta server successfully, update gflags and set
// configReady_
// if we load config from meta server successfully, update gflags and set configReady_
auto items = ret.value();
MetaConfigMap metaConfigMap;
for (auto& item : items) {
std::pair<cpp2::ConfigModule, std::string> key = {item.get_module(), item.get_name()};
metaConfigMap.emplace(std::move(key), std::move(item));
}
{
// For any configurations that is in meta, update in cache to replace
// previous value
// For any configurations that is in meta, update in cache to replace previous value
folly::RWSpinLock::WriteHolder holder(configCacheLock_);
for (const auto& entry : metaConfigMap) {
auto& key = entry.first;
Expand Down Expand Up @@ -2964,9 +2972,8 @@ void MetaClient::loadLeader(const std::vector<cpp2::HostItem>& hostItems,
<< item.get_leader_parts().size() << " space";
}
{
// todo(doodle): in worst case, storage and meta isolated, so graph may get
// a outdate leader info. The problem could be solved if leader term are
// cached as well.
// todo(doodle): in worst case, storage and meta isolated, so graph may get a outdate
// leader info. The problem could be solved if leader term are cached as well.
LOG(INFO) << "Load leader ok";
folly::RWSpinLock::WriteHolder wh(leadersLock_);
leadersInfo_ = std::move(leaderInfo);
Expand Down Expand Up @@ -3486,5 +3493,54 @@ folly::Future<StatusOr<bool>> MetaClient::ingest(GraphSpaceID spaceId) {
return folly::async(func);
}

bool MetaClient::loadSessions() {
auto session_list = listSessions().get();
if (!session_list.ok()) {
LOG(ERROR) << "List sessions failed, status:" << session_list.status();
return false;
}
SessionMap* oldSessionMap = sessionMap_.load();
SessionMap* newSessionMap = new SessionMap(*oldSessionMap);
auto oldKilledPlan = killedPlans_.load();
auto newKilledPlan = new folly::F14FastSet<std::pair<SessionID, ExecutionPlanID>>(*oldKilledPlan);
for (auto& session : session_list.value().get_sessions()) {
(*newSessionMap)[session.get_session_id()] = session;
for (auto& query : session.get_queries()) {
if (query.second.get_status() == cpp2::QueryStatus::KILLING) {
newKilledPlan->insert({session.get_session_id(), query.first});
}
}
}
sessionMap_.store(newSessionMap);
killedPlans_.store(newKilledPlan);
folly::rcu_retire(oldKilledPlan);
folly::rcu_retire(oldSessionMap);
return true;
}

StatusOr<cpp2::Session> MetaClient::getSessionFromCache(const nebula::SessionID& session_id) {
if (!ready_) {
return Status::Error("Not ready!");
}
folly::rcu_reader guard;
auto session_map = sessionMap_.load();
auto it = session_map->find(session_id);
if (it != session_map->end()) {
return it->second;
}
return Status::SessionNotFound();
}

bool MetaClient::checkIsPlanKilled(SessionID sessionId, ExecutionPlanID planId) {
static thread_local int check_counter = 0;
// Inaccurate in a multi-threaded environment, but it is not important
check_counter = (check_counter + 1) & ((1 << FLAGS_check_plan_killed_frequency) - 1);
if (check_counter != 0) {
return false;
}
folly::rcu_reader guard;
return killedPlans_.load()->count({sessionId, planId});
}

} // namespace meta
} // namespace nebula
19 changes: 17 additions & 2 deletions src/clients/meta/MetaClient.h
Expand Up @@ -8,9 +8,14 @@
#define CLIENTS_META_METACLIENT_H_

#include <folly/RWSpinLock.h>
#include <folly/container/F14Map.h>
#include <folly/container/F14Set.h>
#include <folly/executors/IOThreadPoolExecutor.h>
#include <folly/synchronization/Rcu.h>
#include <gtest/gtest_prod.h>

#include <atomic>

#include "common/base/Base.h"
#include "common/base/Status.h"
#include "common/base/StatusOr.h"
Expand All @@ -20,6 +25,7 @@
#include "common/thread/GenericWorker.h"
#include "common/thrift/ThriftClientManager.h"
#include "interface/gen-cpp2/MetaServiceAsyncClient.h"
#include "interface/gen-cpp2/common_types.h"
#include "interface/gen-cpp2/meta_types.h"

DECLARE_int32(meta_client_retry_times);
Expand Down Expand Up @@ -55,8 +61,7 @@ using NameIndexMap = std::unordered_map<std::pair<GraphSpaceID, std::string>, In
// Get Index Structure by indexID
using Indexes = std::unordered_map<IndexID, std::shared_ptr<cpp2::IndexItem>>;

// Listeners is a map of ListenerHost => <PartId + type>, used to add/remove
// listener on local host
// Listeners is a map of ListenerHost => <PartId + type>, used to add/remove listener on local host
using Listeners =
std::unordered_map<HostAddr, std::vector<std::pair<PartitionID, cpp2::ListenerType>>>;

Expand Down Expand Up @@ -115,6 +120,7 @@ using FulltextClientsList = std::vector<cpp2::FTClient>;

using FTIndexMap = std::unordered_map<std::string, cpp2::FTIndex>;

using SessionMap = std::unordered_map<SessionID, cpp2::Session>;
class MetaChangedListener {
public:
virtual ~MetaChangedListener() = default;
Expand Down Expand Up @@ -175,6 +181,7 @@ class MetaClient {
FRIEND_TEST(MetaClientTest, RetryOnceTest);
FRIEND_TEST(MetaClientTest, RetryUntilLimitTest);
FRIEND_TEST(MetaClientTest, RocksdbOptionsTest);
friend class KillQueryMetaWrapper;

public:
MetaClient(std::shared_ptr<folly::IOThreadPoolExecutor> ioThreadPool,
Expand Down Expand Up @@ -551,6 +558,10 @@ class MetaClient {

StatusOr<std::vector<HostAddr>> getStorageHosts() const;

StatusOr<cpp2::Session> getSessionFromCache(const nebula::SessionID& session_id);

bool checkIsPlanKilled(SessionID session_id, ExecutionPlanID plan_id);

StatusOr<HostAddr> getStorageLeaderFromCache(GraphSpaceID spaceId, PartitionID partId);

void updateStorageLeader(GraphSpaceID spaceId, PartitionID partId, const HostAddr& leader);
Expand Down Expand Up @@ -634,6 +645,8 @@ class MetaClient {

bool loadFulltextIndexes();

bool loadSessions();

void loadLeader(const std::vector<cpp2::HostItem>& hostItems,
const SpaceNameIdMap& spaceIndexByName);

Expand Down Expand Up @@ -746,6 +759,8 @@ class MetaClient {
MetaClientOptions options_;
std::vector<HostAddr> storageHosts_;
int64_t heartbeatTime_;
std::atomic<SessionMap*> sessionMap_;
std::atomic<folly::F14FastSet<std::pair<SessionID, ExecutionPlanID>>*> killedPlans_;
};

} // namespace meta
Expand Down

0 comments on commit 5905860

Please sign in to comment.