Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tools: fixed remove broken copyset after remove peer #373

Merged
merged 1 commit into from
Jun 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions proto/copyset.proto
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ enum COPYSET_OP_STATUS {
COPYSET_OP_STATUS_EXIST = 1; // copyset node 已经存在
COPYSET_OP_STATUS_COPYSET_NOTEXIST = 2;
COPYSET_OP_STATUS_FAILURE_UNKNOWN = 3;
COPYSET_OP_STATUS_COPYSET_IS_HEALTHY = 4;
};

message CopysetResponse {
Expand Down Expand Up @@ -95,5 +96,7 @@ service CopysetService {

rpc CreateCopysetNode2 (CopysetRequest2) returns (CopysetResponse2);

rpc DeleteBrokenCopyset(CopysetRequest) returns (CopysetResponse);

rpc GetCopysetStatus (CopysetStatusRequest) returns (CopysetStatusResponse);
};
24 changes: 24 additions & 0 deletions src/chunkserver/copyset_node_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,30 @@ bool CopysetNodeManager::PurgeCopysetNodeData(const LogicPoolID &logicPoolId,
return ret;
}

bool CopysetNodeManager::DeleteBrokenCopyset(const LogicPoolID& poolId,
const CopysetID& copysetId) {
auto groupId = ToGroupId(poolId, copysetId);
// if copyset node exist in the manager means its data is complete
if (copysetNodeMap_.find(groupId) != copysetNodeMap_.end()) {
return false;
}

std::string copysetsDir;
auto trash = copysetNodeOptions_.trash;
auto chunkDataUri = copysetNodeOptions_.chunkDataUri;
auto protocol = UriParser::ParseUri(chunkDataUri, &copysetsDir);
if (protocol.empty()) {
LOG(ERROR) << "Not support chunk data uri's protocol: " << chunkDataUri;
return false;
} else if (0 != trash->RecycleCopySet(copysetsDir + "/" + groupId)) {
LOG(ERROR) << "Failed to recycle broken copyset "
<< ToGroupIdString(poolId, copysetId);
return false;
}

return true;
}

bool CopysetNodeManager::IsExist(const LogicPoolID &logicPoolId,
const CopysetID &copysetId) {
/* 加读锁 */
Expand Down
9 changes: 9 additions & 0 deletions src/chunkserver/copyset_node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,15 @@ class CopysetNodeManager : public curve::common::Uncopyable {
bool PurgeCopysetNodeData(const LogicPoolID &logicPoolId,
const CopysetID &copysetId);

/**
* @brief Delete broken copyset
* @param[in] poolId logical pool id
* @param[in] copysetId copyset id
* @return true if delete success, else return false
*/
bool DeleteBrokenCopyset(const LogicPoolID& poolId,
const CopysetID& copysetId);

/**
* 判断指定的copyset是否存在
* @param logicPoolId:逻辑池子id
Expand Down
25 changes: 25 additions & 0 deletions src/chunkserver/copyset_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,31 @@ void CopysetServiceImpl::CreateCopysetNode2(RpcController *controller,
LOG(INFO) << "Create " << request->copysets().size() << " copysets success";
}

void CopysetServiceImpl::DeleteBrokenCopyset(RpcController* controller,
const CopysetRequest* request,
CopysetResponse* response,
Closure* done) {
LOG(INFO) << "Receive delete broken copyset request";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add groupId in this LOG ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add groupId in this LOG ?

The below logic has guranteed the group id will be print.


brpc::ClosureGuard doneGuard(done);

auto poolId = request->logicpoolid();
auto copysetId = request->copysetid();
auto groupId = ToGroupIdString(poolId, copysetId);

// if copyset node exist in the manager means its data is complete
if (copysetNodeManager_->IsExist(poolId, copysetId)) {
response->set_status(COPYSET_OP_STATUS_COPYSET_IS_HEALTHY);
LOG(WARNING) << "Delete broken copyset, " << groupId << " is healthy";
} else if (!copysetNodeManager_->DeleteBrokenCopyset(poolId, copysetId)) {
response->set_status(COPYSET_OP_STATUS_FAILURE_UNKNOWN);
LOG(ERROR) << "Delete broken copyset " << groupId << " failed";
} else {
response->set_status(COPYSET_OP_STATUS_SUCCESS);
LOG(INFO) << "Delete broken copyset " << groupId << " success";
}
}

void CopysetServiceImpl::GetCopysetStatus(RpcController *controller,
const CopysetStatusRequest *request,
CopysetStatusResponse *response,
Expand Down
8 changes: 8 additions & 0 deletions src/chunkserver/copyset_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,14 @@ class CopysetServiceImpl : public CopysetService {
CopysetResponse2 *response,
Closure *done);

/**
* @brief Delete broken copyset
*/
void DeleteBrokenCopyset(RpcController* controller,
const CopysetRequest* request,
CopysetResponse* response,
Closure* done);

void GetCopysetStatus(RpcController *controller,
const CopysetStatusRequest *request,
CopysetStatusResponse *response,
Expand Down
93 changes: 67 additions & 26 deletions src/tools/curve_cli.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ DEFINE_string(peer,
"", "Id of the operating peer");
DEFINE_string(new_conf,
"", "new conf to reset peer");
DEFINE_bool(remove_copyset, false, "Whether need to remove broken copyset "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why default is false? if the broken copyset data keep there, there will some problem

Copy link
Contributor Author

@Wine93 Wine93 Jun 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why default is false? if the broken copyset data keep there, there will some problem

Actually, it's a danger action to delete copyset even if it is a broken copyset, we want to let user know what he is doning now, instead of delete it silently.

BTW: The copyset contain multi contents: chunk data, raft { log, meta, snapshot }.

"after remove peer (default: false)");

DEFINE_bool(affirm, true,
"If true, command line interactive affirmation is required."
Expand Down Expand Up @@ -64,43 +66,82 @@ int CurveCli::Init() {
return mdsClient_->Init(FLAGS_mdsAddr);
}

butil::Status CurveCli::DeleteBrokenCopyset(braft::PeerId peerId,
const LogicPoolID& poolId,
const CopysetID& copysetId) {
brpc::Channel channel;
brpc::Controller cntl;
CopysetRequest request;
CopysetResponse response;

cntl.set_timeout_ms(FLAGS_timeout_ms);
cntl.set_max_retry(FLAGS_max_retry);
request.set_logicpoolid(poolId);
request.set_copysetid(copysetId);

if (channel.Init(peerId.addr, NULL) != 0) {
return butil::Status(-1, "Fail to init channel to %s",
peerId.to_string().c_str());
}

CopysetService_Stub stub(&channel);
stub.DeleteBrokenCopyset(&cntl, &request, &response, NULL);

if (cntl.Failed()) {
return butil::Status(cntl.ErrorCode(), cntl.ErrorText());
} else if (response.status() != COPYSET_OP_STATUS_SUCCESS) {
return butil::Status(-1, COPYSET_OP_STATUS_Name(response.status()));
}

return butil::Status::OK();
}

int CurveCli::RemovePeer() {
CHECK_FLAG(conf);
CHECK_FLAG(peer);

braft::Configuration conf;
braft::PeerId peerId;
curve::common::Peer peer;
braft::cli::CliOptions opt;

auto poolId = FLAGS_logicalPoolId;
auto copysetId = FLAGS_copysetId;
opt.timeout_ms = FLAGS_timeout_ms;
opt.max_retry = FLAGS_max_retry;

if (conf.parse_from(FLAGS_conf) != 0) {
std::cout << "Fail to parse --conf" << std::endl;
return -1;
}
braft::PeerId removingPeerId;
if (removingPeerId.parse(FLAGS_peer) != 0) {
} else if (peerId.parse(FLAGS_peer) != 0) {
std::cout << "Fail to parse --peer" << std::endl;
return -1;
} else {
peer.set_address(peerId.to_string());
}
curve::common::Peer removingPeer;
removingPeer.set_address(removingPeerId.to_string());
braft::cli::CliOptions opt;
opt.timeout_ms = FLAGS_timeout_ms;
opt.max_retry = FLAGS_max_retry;
butil::Status st = curve::chunkserver::RemovePeer(
FLAGS_logicalPoolId,
FLAGS_copysetId,
conf,
removingPeer,
opt);
if (!st.ok()) {
std::cout << "Remove peer " << removingPeerId << " from copyset "
<< "(" << FLAGS_logicalPoolId << ", "
<< FLAGS_copysetId << ")"
<< " fail, original conf: " << conf
<< ", detail: " << st << std::endl;
return -1;

// STEP 1: remove peer
butil::Status status = curve::chunkserver::RemovePeer(
poolId, copysetId, conf, peer, opt);
auto succ = status.ok();
std::cout << "Remove peer " << peerId << " for copyset("
<< poolId << ", " << copysetId << ") "
<< (succ ? "success" : "fail") << ", original conf: " << conf
<< ", status: " << status << std::endl;

if (!succ || !FLAGS_remove_copyset) {
return succ ? 0 : -1;
}
std::cout << "Remove peer " << removingPeerId << " from copyset "
<< "(" << FLAGS_logicalPoolId << ", " << FLAGS_copysetId << ")"
<< " success, original conf: " << conf << std::endl;
return 0;

// STEP 2: delete broken copyset
status = DeleteBrokenCopyset(peerId, poolId, copysetId);
succ = status.ok();
std::cout << "Delete copyset(" << poolId << ", " << copysetId << ")"
<< " in " << peerId << (succ ? "success" : "fail")
<< ", original conf: " << conf
<< ", status: " << status << std::endl;

return succ ? 0 : -1;
}

int CurveCli::TransferLeader() {
Expand Down Expand Up @@ -279,7 +320,7 @@ void CurveCli::PrintHelp(const std::string &cmd) {
"-new_conf=127.0.0.1:8080:0 -max_retry=3 -timeout_ms=100" << std::endl; // NOLINT
} else if (cmd == kRemovePeerCmd || cmd == kTransferLeaderCmd) {
std::cout << "curve_ops_tool " << cmd << " -logicalPoolId=1 -copysetId=10001 -peer=127.0.0.1:8080:0 " // NOLINT
"-conf=127.0.0.1:8080:0,127.0.0.1:8081:0,127.0.0.1:8082:0 -max_retry=3 -timeout_ms=100" << std::endl; // NOLINT
"-conf=127.0.0.1:8080:0,127.0.0.1:8081:0,127.0.0.1:8082:0 -max_retry=3 -timeout_ms=100 -remove_copyset=true/false" << std::endl; // NOLINT
} else if (cmd == kDoSnapshot) {
std::cout << "curve_ops_tool " << cmd << " -logicalPoolId=1 -copysetId=10001 -peer=127.0.0.1:8080:0 " // NOLINT
"-max_retry=3 -timeout_ms=100" << std::endl;
Expand Down
22 changes: 22 additions & 0 deletions src/tools/curve_cli.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,25 @@
#include <iostream>
#include <memory>

#include "include/chunkserver/chunkserver_common.h"
#include "src/chunkserver/copyset_node.h"
#include "src/chunkserver/cli2.h"
#include "src/tools/curve_tool.h"
#include "src/tools/curve_tool_define.h"
#include "src/tools/mds_client.h"
#include "proto/copyset.pb.h"

namespace curve {
namespace tool {

using ::curve::chunkserver::LogicPoolID;
using ::curve::chunkserver::CopysetID;
using ::curve::chunkserver::CopysetRequest;
using ::curve::chunkserver::CopysetResponse;
using ::curve::chunkserver::CopysetService_Stub;
using ::curve::chunkserver::COPYSET_OP_STATUS::COPYSET_OP_STATUS_SUCCESS;
using ::curve::chunkserver::COPYSET_OP_STATUS::COPYSET_OP_STATUS_FAILURE_UNKNOWN; // NOLINT

class CurveCli : public CurveTool {
public:
explicit CurveCli(std::shared_ptr<MDSClient> mdsClient) :
Expand Down Expand Up @@ -74,6 +85,17 @@ class CurveCli : public CurveTool {
static bool SupportCommand(const std::string& command);

private:
/**
* @brief Delete broken copyset
* @param[in] peerId chunkserver peer (ip, port)
* @param[in] poolId logical pool id
* @param[in] copysetId copyset id
* @return butil::Status (code, err_msg)
*/
butil::Status DeleteBrokenCopyset(braft::PeerId peerId,
const LogicPoolID& poolId,
const CopysetID& copysetId);

/**
* @brief 删除peer
* @param 无
Expand Down
49 changes: 48 additions & 1 deletion test/chunkserver/copyset_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

#include <cstdint>

#include "src/chunkserver/trash.h"
#include "src/chunkserver/copyset_node.h"
#include "src/chunkserver/copyset_node_manager.h"
#include "src/chunkserver/cli.h"
Expand Down Expand Up @@ -57,11 +58,20 @@ class CopysetServiceTest : public testing::Test {
public:
void SetUp() {
testDir = "CopysetServiceTestData";
rmCmd = "rm -rf CopysetServiceTestData";
rmCmd = "rm -rf CopysetServiceTestData trash";
copysetDir = "local://./CopysetServiceTestData";
copysetDirPattern = "local://./CopysetServiceTestData/%d";
Exec(rmCmd.c_str());

// prepare trash
TrashOptions opt;
opt.trashPath = "local://./trash";
opt.localFileSystem =
LocalFsFactory::CreateFs(FileSystemType::EXT4, "");
trash_ = std::make_shared<Trash>();
trash_->Init(opt);
}

void TearDown() {
Exec(rmCmd.c_str());
}
Expand All @@ -71,6 +81,7 @@ class CopysetServiceTest : public testing::Test {
std::string rmCmd;
std::string copysetDir;
std::string copysetDirPattern;
std::shared_ptr<Trash> trash_;
};

butil::AtExitManager atExitManager;
Expand Down Expand Up @@ -106,6 +117,7 @@ TEST_F(CopysetServiceTest, basic) {
copysetNodeOptions.localFileSystem = fs;
copysetNodeOptions.chunkFilePool =
std::make_shared<FilePool>(fs);
copysetNodeOptions.trash = trash_;
ASSERT_EQ(0, copysetNodeManager->Init(copysetNodeOptions));
ASSERT_EQ(0, copysetNodeManager->Run());

Expand Down Expand Up @@ -175,6 +187,41 @@ TEST_F(CopysetServiceTest, basic) {
ASSERT_EQ(cntl.ErrorCode(), EINVAL);
}

// TEST CASES: remove copyset node
{
brpc::Controller cntl;
CopysetRequest request;
CopysetResponse response;
CopysetStatusRequest statusReq;
CopysetStatusResponse statusResp;
cntl.set_timeout_ms(3000);

// CASE 1: copyset is healthy
request.set_logicpoolid(logicPoolId);
request.set_copysetid(copysetId);
stub.DeleteBrokenCopyset(&cntl, &request, &response, nullptr);
ASSERT_FALSE(cntl.Failed());
ASSERT_EQ(response.status(), COPYSET_OP_STATUS_COPYSET_IS_HEALTHY);

// CASE 2: copyset is not exist -> delete failed
cntl.Reset();
request.set_logicpoolid(logicPoolId);
request.set_copysetid(copysetId + 1);
stub.DeleteBrokenCopyset(&cntl, &request, &response, nullptr);
ASSERT_FALSE(cntl.Failed());
ASSERT_EQ(response.status(), COPYSET_OP_STATUS_FAILURE_UNKNOWN);

// CASE 3: delete broken copyset success
ASSERT_TRUE(copysetNodeManager->
DeleteCopysetNode(logicPoolId, copysetId));
cntl.Reset();
request.set_logicpoolid(logicPoolId);
request.set_copysetid(copysetId);
stub.DeleteBrokenCopyset(&cntl, &request, &response, nullptr);
ASSERT_FALSE(cntl.Failed());
ASSERT_EQ(response.status(), COPYSET_OP_STATUS_SUCCESS);
}

ASSERT_EQ(0, server.Stop(0));
ASSERT_EQ(0, server.Join());
}
Expand Down
Loading