Skip to content

Commit

Permalink
tools: fixed remove copyset after remove peer
Browse files Browse the repository at this point in the history
  • Loading branch information
Wine93 authored and ilixiaocui committed Jun 4, 2021
1 parent 4fecf1b commit 184ba59
Show file tree
Hide file tree
Showing 10 changed files with 315 additions and 27 deletions.
3 changes: 3 additions & 0 deletions proto/copyset.proto
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ enum COPYSET_OP_STATUS {
COPYSET_OP_STATUS_EXIST = 1; // copyset node 已经存在
COPYSET_OP_STATUS_COPYSET_NOTEXIST = 2;
COPYSET_OP_STATUS_FAILURE_UNKNOWN = 3;
COPYSET_OP_STATUS_COPYSET_IS_HEALTHY = 4;
};

message CopysetResponse {
Expand Down Expand Up @@ -95,5 +96,7 @@ service CopysetService {

rpc CreateCopysetNode2 (CopysetRequest2) returns (CopysetResponse2);

rpc DeleteBrokenCopyset(CopysetRequest) returns (CopysetResponse);

rpc GetCopysetStatus (CopysetStatusRequest) returns (CopysetStatusResponse);
};
24 changes: 24 additions & 0 deletions src/chunkserver/copyset_node_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,30 @@ bool CopysetNodeManager::PurgeCopysetNodeData(const LogicPoolID &logicPoolId,
return ret;
}

bool CopysetNodeManager::DeleteBrokenCopyset(const LogicPoolID& poolId,
const CopysetID& copysetId) {
auto groupId = ToGroupId(poolId, copysetId);
// if copyset node exist in the manager means its data is complete
if (copysetNodeMap_.find(groupId) != copysetNodeMap_.end()) {
return false;
}

std::string copysetsDir;
auto trash = copysetNodeOptions_.trash;
auto chunkDataUri = copysetNodeOptions_.chunkDataUri;
auto protocol = UriParser::ParseUri(chunkDataUri, &copysetsDir);
if (protocol.empty()) {
LOG(ERROR) << "Not support chunk data uri's protocol: " << chunkDataUri;
return false;
} else if (0 != trash->RecycleCopySet(copysetsDir + "/" + groupId)) {
LOG(ERROR) << "Failed to recycle broken copyset "
<< ToGroupIdString(poolId, copysetId);
return false;
}

return true;
}

bool CopysetNodeManager::IsExist(const LogicPoolID &logicPoolId,
const CopysetID &copysetId) {
/* 加读锁 */
Expand Down
9 changes: 9 additions & 0 deletions src/chunkserver/copyset_node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,15 @@ class CopysetNodeManager : public curve::common::Uncopyable {
bool PurgeCopysetNodeData(const LogicPoolID &logicPoolId,
const CopysetID &copysetId);

/**
* @brief Delete broken copyset
* @param[in] poolId logical pool id
* @param[in] copysetId copyset id
* @return true if delete success, else return false
*/
bool DeleteBrokenCopyset(const LogicPoolID& poolId,
const CopysetID& copysetId);

/**
* 判断指定的copyset是否存在
* @param logicPoolId:逻辑池子id
Expand Down
25 changes: 25 additions & 0 deletions src/chunkserver/copyset_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,31 @@ void CopysetServiceImpl::CreateCopysetNode2(RpcController *controller,
LOG(INFO) << "Create " << request->copysets().size() << " copysets success";
}

void CopysetServiceImpl::DeleteBrokenCopyset(RpcController* controller,
const CopysetRequest* request,
CopysetResponse* response,
Closure* done) {
LOG(INFO) << "Receive delete broken copyset request";

brpc::ClosureGuard doneGuard(done);

auto poolId = request->logicpoolid();
auto copysetId = request->copysetid();
auto groupId = ToGroupIdString(poolId, copysetId);

// if copyset node exist in the manager means its data is complete
if (copysetNodeManager_->IsExist(poolId, copysetId)) {
response->set_status(COPYSET_OP_STATUS_COPYSET_IS_HEALTHY);
LOG(WARNING) << "Delete broken copyset, " << groupId << " is healthy";
} else if (!copysetNodeManager_->DeleteBrokenCopyset(poolId, copysetId)) {
response->set_status(COPYSET_OP_STATUS_FAILURE_UNKNOWN);
LOG(ERROR) << "Delete broken copyset " << groupId << " failed";
} else {
response->set_status(COPYSET_OP_STATUS_SUCCESS);
LOG(INFO) << "Delete broken copyset " << groupId << " success";
}
}

void CopysetServiceImpl::GetCopysetStatus(RpcController *controller,
const CopysetStatusRequest *request,
CopysetStatusResponse *response,
Expand Down
8 changes: 8 additions & 0 deletions src/chunkserver/copyset_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,14 @@ class CopysetServiceImpl : public CopysetService {
CopysetResponse2 *response,
Closure *done);

/**
* @brief Delete broken copyset
*/
void DeleteBrokenCopyset(RpcController* controller,
const CopysetRequest* request,
CopysetResponse* response,
Closure* done);

void GetCopysetStatus(RpcController *controller,
const CopysetStatusRequest *request,
CopysetStatusResponse *response,
Expand Down
93 changes: 67 additions & 26 deletions src/tools/curve_cli.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ DEFINE_string(peer,
"", "Id of the operating peer");
DEFINE_string(new_conf,
"", "new conf to reset peer");
DEFINE_bool(remove_copyset, false, "Whether need to remove broken copyset "
"after remove peer (default: false)");

DEFINE_bool(affirm, true,
"If true, command line interactive affirmation is required."
Expand Down Expand Up @@ -64,43 +66,82 @@ int CurveCli::Init() {
return mdsClient_->Init(FLAGS_mdsAddr);
}

butil::Status CurveCli::DeleteBrokenCopyset(braft::PeerId peerId,
const LogicPoolID& poolId,
const CopysetID& copysetId) {
brpc::Channel channel;
brpc::Controller cntl;
CopysetRequest request;
CopysetResponse response;

cntl.set_timeout_ms(FLAGS_timeout_ms);
cntl.set_max_retry(FLAGS_max_retry);
request.set_logicpoolid(poolId);
request.set_copysetid(copysetId);

if (channel.Init(peerId.addr, NULL) != 0) {
return butil::Status(-1, "Fail to init channel to %s",
peerId.to_string().c_str());
}

CopysetService_Stub stub(&channel);
stub.DeleteBrokenCopyset(&cntl, &request, &response, NULL);

if (cntl.Failed()) {
return butil::Status(cntl.ErrorCode(), cntl.ErrorText());
} else if (response.status() != COPYSET_OP_STATUS_SUCCESS) {
return butil::Status(-1, COPYSET_OP_STATUS_Name(response.status()));
}

return butil::Status::OK();
}

int CurveCli::RemovePeer() {
CHECK_FLAG(conf);
CHECK_FLAG(peer);

braft::Configuration conf;
braft::PeerId peerId;
curve::common::Peer peer;
braft::cli::CliOptions opt;

auto poolId = FLAGS_logicalPoolId;
auto copysetId = FLAGS_copysetId;
opt.timeout_ms = FLAGS_timeout_ms;
opt.max_retry = FLAGS_max_retry;

if (conf.parse_from(FLAGS_conf) != 0) {
std::cout << "Fail to parse --conf" << std::endl;
return -1;
}
braft::PeerId removingPeerId;
if (removingPeerId.parse(FLAGS_peer) != 0) {
} else if (peerId.parse(FLAGS_peer) != 0) {
std::cout << "Fail to parse --peer" << std::endl;
return -1;
} else {
peer.set_address(peerId.to_string());
}
curve::common::Peer removingPeer;
removingPeer.set_address(removingPeerId.to_string());
braft::cli::CliOptions opt;
opt.timeout_ms = FLAGS_timeout_ms;
opt.max_retry = FLAGS_max_retry;
butil::Status st = curve::chunkserver::RemovePeer(
FLAGS_logicalPoolId,
FLAGS_copysetId,
conf,
removingPeer,
opt);
if (!st.ok()) {
std::cout << "Remove peer " << removingPeerId << " from copyset "
<< "(" << FLAGS_logicalPoolId << ", "
<< FLAGS_copysetId << ")"
<< " fail, original conf: " << conf
<< ", detail: " << st << std::endl;
return -1;

// STEP 1: remove peer
butil::Status status = curve::chunkserver::RemovePeer(
poolId, copysetId, conf, peer, opt);
auto succ = status.ok();
std::cout << "Remove peer " << peerId << " for copyset("
<< poolId << ", " << copysetId << ") "
<< (succ ? "success" : "fail") << ", original conf: " << conf
<< ", status: " << status << std::endl;

if (!succ || !FLAGS_remove_copyset) {
return succ ? 0 : -1;
}
std::cout << "Remove peer " << removingPeerId << " from copyset "
<< "(" << FLAGS_logicalPoolId << ", " << FLAGS_copysetId << ")"
<< " success, original conf: " << conf << std::endl;
return 0;

// STEP 2: delete broken copyset
status = DeleteBrokenCopyset(peerId, poolId, copysetId);
succ = status.ok();
std::cout << "Delete copyset(" << poolId << ", " << copysetId << ")"
<< " in " << peerId << (succ ? "success" : "fail")
<< ", original conf: " << conf
<< ", status: " << status << std::endl;

return succ ? 0 : -1;
}

int CurveCli::TransferLeader() {
Expand Down Expand Up @@ -279,7 +320,7 @@ void CurveCli::PrintHelp(const std::string &cmd) {
"-new_conf=127.0.0.1:8080:0 -max_retry=3 -timeout_ms=100" << std::endl; // NOLINT
} else if (cmd == kRemovePeerCmd || cmd == kTransferLeaderCmd) {
std::cout << "curve_ops_tool " << cmd << " -logicalPoolId=1 -copysetId=10001 -peer=127.0.0.1:8080:0 " // NOLINT
"-conf=127.0.0.1:8080:0,127.0.0.1:8081:0,127.0.0.1:8082:0 -max_retry=3 -timeout_ms=100" << std::endl; // NOLINT
"-conf=127.0.0.1:8080:0,127.0.0.1:8081:0,127.0.0.1:8082:0 -max_retry=3 -timeout_ms=100 -remove_copyset=true/false" << std::endl; // NOLINT
} else if (cmd == kDoSnapshot) {
std::cout << "curve_ops_tool " << cmd << " -logicalPoolId=1 -copysetId=10001 -peer=127.0.0.1:8080:0 " // NOLINT
"-max_retry=3 -timeout_ms=100" << std::endl;
Expand Down
22 changes: 22 additions & 0 deletions src/tools/curve_cli.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,25 @@
#include <iostream>
#include <memory>

#include "include/chunkserver/chunkserver_common.h"
#include "src/chunkserver/copyset_node.h"
#include "src/chunkserver/cli2.h"
#include "src/tools/curve_tool.h"
#include "src/tools/curve_tool_define.h"
#include "src/tools/mds_client.h"
#include "proto/copyset.pb.h"

namespace curve {
namespace tool {

using ::curve::chunkserver::LogicPoolID;
using ::curve::chunkserver::CopysetID;
using ::curve::chunkserver::CopysetRequest;
using ::curve::chunkserver::CopysetResponse;
using ::curve::chunkserver::CopysetService_Stub;
using ::curve::chunkserver::COPYSET_OP_STATUS::COPYSET_OP_STATUS_SUCCESS;
using ::curve::chunkserver::COPYSET_OP_STATUS::COPYSET_OP_STATUS_FAILURE_UNKNOWN; // NOLINT

class CurveCli : public CurveTool {
public:
explicit CurveCli(std::shared_ptr<MDSClient> mdsClient) :
Expand Down Expand Up @@ -74,6 +85,17 @@ class CurveCli : public CurveTool {
static bool SupportCommand(const std::string& command);

private:
/**
* @brief Delete broken copyset
* @param[in] peerId chunkserver peer (ip, port)
* @param[in] poolId logical pool id
* @param[in] copysetId copyset id
* @return butil::Status (code, err_msg)
*/
butil::Status DeleteBrokenCopyset(braft::PeerId peerId,
const LogicPoolID& poolId,
const CopysetID& copysetId);

/**
* @brief 删除peer
* @param 无
Expand Down
49 changes: 48 additions & 1 deletion test/chunkserver/copyset_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

#include <cstdint>

#include "src/chunkserver/trash.h"
#include "src/chunkserver/copyset_node.h"
#include "src/chunkserver/copyset_node_manager.h"
#include "src/chunkserver/cli.h"
Expand Down Expand Up @@ -57,11 +58,20 @@ class CopysetServiceTest : public testing::Test {
public:
void SetUp() {
testDir = "CopysetServiceTestData";
rmCmd = "rm -rf CopysetServiceTestData";
rmCmd = "rm -rf CopysetServiceTestData trash";
copysetDir = "local://./CopysetServiceTestData";
copysetDirPattern = "local://./CopysetServiceTestData/%d";
Exec(rmCmd.c_str());

// prepare trash
TrashOptions opt;
opt.trashPath = "local://./trash";
opt.localFileSystem =
LocalFsFactory::CreateFs(FileSystemType::EXT4, "");
trash_ = std::make_shared<Trash>();
trash_->Init(opt);
}

void TearDown() {
Exec(rmCmd.c_str());
}
Expand All @@ -71,6 +81,7 @@ class CopysetServiceTest : public testing::Test {
std::string rmCmd;
std::string copysetDir;
std::string copysetDirPattern;
std::shared_ptr<Trash> trash_;
};

butil::AtExitManager atExitManager;
Expand Down Expand Up @@ -106,6 +117,7 @@ TEST_F(CopysetServiceTest, basic) {
copysetNodeOptions.localFileSystem = fs;
copysetNodeOptions.chunkFilePool =
std::make_shared<FilePool>(fs);
copysetNodeOptions.trash = trash_;
ASSERT_EQ(0, copysetNodeManager->Init(copysetNodeOptions));
ASSERT_EQ(0, copysetNodeManager->Run());

Expand Down Expand Up @@ -175,6 +187,41 @@ TEST_F(CopysetServiceTest, basic) {
ASSERT_EQ(cntl.ErrorCode(), EINVAL);
}

// TEST CASES: remove copyset node
{
brpc::Controller cntl;
CopysetRequest request;
CopysetResponse response;
CopysetStatusRequest statusReq;
CopysetStatusResponse statusResp;
cntl.set_timeout_ms(3000);

// CASE 1: copyset is healthy
request.set_logicpoolid(logicPoolId);
request.set_copysetid(copysetId);
stub.DeleteBrokenCopyset(&cntl, &request, &response, nullptr);
ASSERT_FALSE(cntl.Failed());
ASSERT_EQ(response.status(), COPYSET_OP_STATUS_COPYSET_IS_HEALTHY);

// CASE 2: copyset is not exist -> delete failed
cntl.Reset();
request.set_logicpoolid(logicPoolId);
request.set_copysetid(copysetId + 1);
stub.DeleteBrokenCopyset(&cntl, &request, &response, nullptr);
ASSERT_FALSE(cntl.Failed());
ASSERT_EQ(response.status(), COPYSET_OP_STATUS_FAILURE_UNKNOWN);

// CASE 3: delete broken copyset success
ASSERT_TRUE(copysetNodeManager->
DeleteCopysetNode(logicPoolId, copysetId));
cntl.Reset();
request.set_logicpoolid(logicPoolId);
request.set_copysetid(copysetId);
stub.DeleteBrokenCopyset(&cntl, &request, &response, nullptr);
ASSERT_FALSE(cntl.Failed());
ASSERT_EQ(response.status(), COPYSET_OP_STATUS_SUCCESS);
}

ASSERT_EQ(0, server.Stop(0));
ASSERT_EQ(0, server.Join());
}
Expand Down
Loading

0 comments on commit 184ba59

Please sign in to comment.