Skip to content

Commit

Permalink
curve_ops_tool: fix concurrent check copysets on server
Browse files Browse the repository at this point in the history
  • Loading branch information
SeanHai committed Jul 8, 2021
1 parent 7800ea6 commit b784af2
Show file tree
Hide file tree
Showing 8 changed files with 83 additions and 17 deletions.
2 changes: 2 additions & 0 deletions conf/tools.conf
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ mdsDummyPort=6667
rpcTimeout=500
# rpc重试次数
rpcRetryTimes=5
# the rpc concurrency to chunkserver
rpcConcurrentNum=10
# etcd地址
etcdAddr=127.0.0.1:2379
# snapshot clone server 地址
Expand Down
1 change: 1 addition & 0 deletions curve-ansible/roles/generate_config/defaults/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ s3_throttle_bpsWriteLimit: 1280
# 运维工具默认值
tool_rpc_timeout: 500
tool_rpc_retry_times: 5
tool_rpc_concurrent_num: 10

# snapshotclone_nginx配置
nginx_docker_internal_port: 80
Expand Down
2 changes: 2 additions & 0 deletions curve-ansible/roles/generate_config/templates/tools.conf.j2
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ mdsDummyPort={{ hostvars[groups.mds[0]].mds_dummy_port }}
rpcTimeout={{ tool_rpc_timeout }}
# rpc重试次数
rpcRetryTimes={{ tool_rpc_retry_times }}
# the rpc concurrency to chunkserver
rpcConcurrentNum={{ tool_rpc_concurrent_num }}
# etcd地址
{% set etcd_address=[] -%}
{% for host in groups.etcd -%}
Expand Down
68 changes: 51 additions & 17 deletions src/tools/copyset_check_core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,12 +154,14 @@ ChunkServerHealthStatus CopysetCheckCore::CheckCopysetsOnChunkServer(
bool isHealthy = true;
butil::IOBuf iobuf;
int res = QueryChunkServer(chunkserverAddr, &iobuf);
statisticsMutest.lock();
if (res != 0) {
// 如果查询chunkserver失败,认为不在线,把它上面所有的
// copyset都添加到peerNotOnlineCopysets_里面
UpdatePeerNotOnlineCopysets(chunkserverAddr);
serviceExceptionChunkServers_.emplace(chunkserverAddr);
chunkserverCopysets_[chunkserverAddr] = {};
statisticsMutest.unlock();
return ChunkServerHealthStatus::kNotOnline;
}
// 存储每一个copyset的详细信息
Expand All @@ -176,6 +178,7 @@ ChunkServerHealthStatus CopysetCheckCore::CheckCopysetsOnChunkServer(
(!groupIds.empty() && copysetInfos.size() != groupIds.size())) {
std::cout << "Some copysets not found on chunkserver, may be tranfered"
<< std::endl;
statisticsMutest.unlock();
return ChunkServerHealthStatus::kNotHealthy;
}
// 存储需要发送消息的chunkserver的地址和对应的groupId
Expand Down Expand Up @@ -244,6 +247,8 @@ ChunkServerHealthStatus CopysetCheckCore::CheckCopysetsOnChunkServer(
isHealthy = false;
}
}
statisticsMutest.unlock();

// 遍历没有leader的copyset
bool health = CheckCopysetsNoLeader(chunkserverAddr,
noLeaderCopysetsPeers);
Expand All @@ -266,9 +271,9 @@ ChunkServerHealthStatus CopysetCheckCore::CheckCopysetsOnChunkServer(
}

bool CopysetCheckCore::CheckCopysetsNoLeader(const std::string& csAddr,
const std::map<std::string,
std::vector<std::string>>&
copysetsPeers) {
const std::map<std::string,
std::vector<std::string>>&
copysetsPeers) {
if (copysetsPeers.empty()) {
return true;
}
Expand All @@ -286,6 +291,7 @@ bool CopysetCheckCore::CheckCopysetsNoLeader(const std::string& csAddr,
for (const auto& item : result) {
// 如果在配置组中,检查是否是majority offline
if (item.second) {
statisticsMutest.lock();
isHealthy = false;
std::string groupId = item.first;
CheckResult checkRes = CheckPeerOnlineStatus(
Expand All @@ -296,6 +302,7 @@ bool CopysetCheckCore::CheckCopysetsNoLeader(const std::string& csAddr,
continue;
}
copysets_[kNoLeader].emplace(groupId);
statisticsMutest.unlock();
}
}
return isHealthy;
Expand Down Expand Up @@ -351,9 +358,35 @@ int CopysetCheckCore::CheckCopysetsOnServer(const std::string& serverIp,
return CheckCopysetsOnServer(0, serverIp, true, unhealthyChunkServers);
}

void CopysetCheckCore::ConcurrentCheckCopysetsOnServer(
const std::vector<ChunkServerInfo> &chunkservers,
uint32_t *index, bool queryLeader, bool *isHealthy,
std::vector<std::string>* unhealthyChunkServers) {
while (1) {
indexMutex.lock();
if (*index > chunkservers.size() - 1) {
indexMutex.unlock();
break;
}
auto info = chunkservers[*index];
(*index)++;
indexMutex.unlock();
std::string csAddr = info.hostip() + ":" + std::to_string(info.port());
ChunkServerHealthStatus res = CheckCopysetsOnChunkServer(csAddr,
{}, queryLeader);
if (res != ChunkServerHealthStatus::kHealthy) {
vectorMutex.lock();
*isHealthy = false;
if (unhealthyChunkServers) {
unhealthyChunkServers->emplace_back(csAddr);
}
vectorMutex.unlock();
}
}
}

int CopysetCheckCore::CheckCopysetsOnServer(const ServerIdType& serverId,
const std::string& serverIp,
bool queryLeader,
const std::string& serverIp, bool queryLeader,
std::vector<std::string>* unhealthyChunkServers) {
bool isHealthy = true;
// 向mds发送RPC
Expand All @@ -368,19 +401,20 @@ int CopysetCheckCore::CheckCopysetsOnServer(const ServerIdType& serverId,
std::cout << "ListChunkServersOnServer fail!" << std::endl;
return -1;
}
for (const auto& info : chunkservers) {
std::string ip = info.hostip();
uint64_t port = info.port();
std::string csAddr = ip + ":" + std::to_string(port);
ChunkServerHealthStatus res = CheckCopysetsOnChunkServer(csAddr,
{}, queryLeader);
if (res != ChunkServerHealthStatus::kHealthy) {
isHealthy = false;
if (unhealthyChunkServers) {
unhealthyChunkServers->emplace_back(csAddr);
}
}

std::vector<Thread> threadpool;
uint32_t index = 0;
for (int i = 0; i < FLAGS_rpcConcurrentNum; i++) {
threadpool.emplace_back(Thread(
&CopysetCheckCore::ConcurrentCheckCopysetsOnServer,
this, std::ref(chunkservers), &index,
queryLeader, &isHealthy,
unhealthyChunkServers));
}
for (auto &thread : threadpool) {
thread.join();
}

if (isHealthy) {
return 0;
} else {
Expand Down
21 changes: 21 additions & 0 deletions src/tools/copyset_check_core.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
#include "src/tools/metric_name.h"
#include "src/tools/curve_tool_define.h"
#include "include/chunkserver/chunkserver_common.h"
#include "src/common/concurrent/concurrent.h"

using curve::mds::topology::PoolIdType;
using curve::mds::topology::CopySetIdType;
Expand All @@ -54,6 +55,8 @@ using curve::mds::topology::ChunkServerStatus;
using curve::chunkserver::ToGroupId;
using curve::chunkserver::GetPoolID;
using curve::chunkserver::GetCopysetID;
using curve::common::Mutex;
using curve::common::Thread;

namespace curve {
namespace tool {
Expand Down Expand Up @@ -311,6 +314,19 @@ class CopysetCheckCore {
bool queryLeader = true,
std::vector<std::string>* unhealthyChunkServers = nullptr);

/**
* @brief concurrent check copyset on server
* @param[in] chunkservers: chunkservers on server
* @param[in] index: the deal index of chunkserver
* @param[in] queryLeader: whether send rpc to server which leader on
* @param[in] isHealthy: check result
* @param[in] unhealthyChunkServers: store the unhealthy chunkserver list
*/
void ConcurrentCheckCopysetsOnServer(
const std::vector<ChunkServerInfo> &chunkservers,
uint32_t *index, bool queryLeader, bool *isHealthy,
std::vector<std::string>* unhealthyChunkServers);

/**
* @brief 根据leader的map里面的copyset信息分析出copyset是否健康,健康返回0,否则
* 否则返回错误码
Expand Down Expand Up @@ -428,6 +444,11 @@ class CopysetCheckCore {
std::string copysetsDetail_;

const std::string kEmptyAddr = "0.0.0.0:0:0";

// mutex for concurrent rpc to chunkserver
Mutex indexMutex;
Mutex vectorMutex;
Mutex statisticsMutest;
};

} // namespace tool
Expand Down
1 change: 1 addition & 0 deletions src/tools/curve_tool_define.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ DEFINE_string(mdsDummyPort, "6667", "dummy port of mds, "
DEFINE_string(etcdAddr, "127.0.0.1:2379", "etcd addr");
DEFINE_uint64(rpcTimeout, 3000, "millisecond for rpc timeout");
DEFINE_uint64(rpcRetryTimes, 5, "rpc retry times");
DEFINE_uint64(rpcConcurrentNum, 10, "rpc concurrent number to chunkserver");
DEFINE_string(snapshotCloneAddr, "127.0.0.1:5555", "snapshot clone addr");
DEFINE_string(snapshotCloneDummyPort, "8081", "dummy port of snapshot clone, "
"can specify one or several. "
Expand Down
1 change: 1 addition & 0 deletions src/tools/curve_tool_define.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ DECLARE_string(mdsDummyPort);
DECLARE_string(etcdAddr);
DECLARE_uint64(rpcTimeout);
DECLARE_uint64(rpcRetryTimes);
DECLARE_uint64(rpcConcurrentNum);
DECLARE_string(snapshotCloneAddr);
DECLARE_string(snapshotCloneDummyPort);
DECLARE_uint64(chunkSize);
Expand Down
4 changes: 4 additions & 0 deletions src/tools/curve_tool_main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ void UpdateFlagsFromConf(curve::common::Configuration* conf) {
if (GetCommandLineFlagInfo("rpcRetryTimes", &info) && info.is_default) {
conf->GetUInt64Value("rpcRetryTimes", &FLAGS_rpcRetryTimes);
}
if (GetCommandLineFlagInfo("rpcConcurrentNum", &info) &&
info.is_default) {
conf->GetUInt64Value("rpcConcurrentNum", &FLAGS_rpcConcurrentNum);
}
if (GetCommandLineFlagInfo("snapshotCloneAddr", &info) &&
info.is_default) {
conf->GetStringValue("snapshotCloneAddr", &FLAGS_snapshotCloneAddr);
Expand Down

0 comments on commit b784af2

Please sign in to comment.