Skip to content

Commit

Permalink
optimize copyset creation
Browse files Browse the repository at this point in the history
  • Loading branch information
cw123 committed Apr 2, 2022
1 parent 0f1a454 commit cd2c8c9
Show file tree
Hide file tree
Showing 16 changed files with 735 additions and 137 deletions.
12 changes: 8 additions & 4 deletions curvefs/conf/mds.conf
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,18 @@ leader.electionTimeoutMs=0
mds.topology.TopologyUpdateToRepoSec=60
# the policy of choose pool 0:Random, 1:Weight
mds.topology.ChoosePoolPolicy=0
# partition number in each copyset 2^8
mds.topology.PartitionNumberInCopyset=256
# max partition number in copyset 2^8
mds.topology.MaxPartitionNumberInCopyset=256
# id number in each partition 2^24 [0, 2^24-1]
mds.topology.IdNumberInPartition=16777216
# create copyset number at a time
mds.topology.CreateCopysetNumber=10
# initial copyset number in cluster
mds.topology.InitialCopysetNumber=10
# min avaiable copyset num in cluster
mds.topology.MinAvailableCopysetNum=10
# default create partition number 3
mds.topology.CreatePartitionNumber=3
# max copyset num in metaserver
mds.topology.MaxCopysetNumInMetaserver=100
# Topology update metric interval
mds.topology.UpdateMetricIntervalSec=60

Expand Down
1 change: 1 addition & 0 deletions curvefs/proto/topology.proto
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ enum TopoStatusCode {
TOPO_PARTITION_NOT_FOUND = 20;
TOPO_CREATE_PARTITION_FAIL = 21;
TOPO_METASERVER_EXIST = 22;
TOPO_GET_AVAILABLE_COPYSET_ERROR = 23;
}

enum OnlineState {
Expand Down
12 changes: 8 additions & 4 deletions curvefs/src/mds/mds.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,20 @@ void MDS::InitMetaServerOption(MetaserverOptions* metaserverOption) {
void MDS::InitTopologyOption(TopologyOption* topologyOption) {
conf_->GetValueFatalIfFail("mds.topology.TopologyUpdateToRepoSec",
&topologyOption->topologyUpdateToRepoSec);
conf_->GetValueFatalIfFail("mds.topology.PartitionNumberInCopyset",
&topologyOption->partitionNumberInCopyset);
conf_->GetValueFatalIfFail("mds.topology.MaxPartitionNumberInCopyset",
&topologyOption->maxPartitionNumberInCopyset);
conf_->GetValueFatalIfFail("mds.topology.IdNumberInPartition",
&topologyOption->idNumberInPartition);
conf_->GetValueFatalIfFail("mds.topology.ChoosePoolPolicy",
&topologyOption->choosePoolPolicy);
conf_->GetValueFatalIfFail("mds.topology.CreateCopysetNumber",
&topologyOption->createCopysetNumber);
conf_->GetValueFatalIfFail("mds.topology.InitialCopysetNumber",
&topologyOption->initialCopysetNumber);
conf_->GetValueFatalIfFail("mds.topology.MinAvailableCopysetNum",
&topologyOption->minAvailableCopysetNum);
conf_->GetValueFatalIfFail("mds.topology.CreatePartitionNumber",
&topologyOption->createPartitionNumber);
conf_->GetValueFatalIfFail("mds.topology.MaxCopysetNumInMetaserver",
&topologyOption->maxCopysetNumInMetaserver);
conf_->GetValueFatalIfFail("mds.topology.UpdateMetricIntervalSec",
&topologyOption->UpdateMetricIntervalSec);
}
Expand Down
17 changes: 7 additions & 10 deletions curvefs/src/mds/metaserverclient/metaserver_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -322,18 +322,15 @@ FSStatusCode MetaserverClient::DeletePartition(
}
}

FSStatusCode MetaserverClient::CreateCopySet(
uint32_t poolId, std::set<uint32_t> copysetIds,
const std::set<std::string> &addrs) {
FSStatusCode MetaserverClient::CreateCopySet(uint32_t poolId,
uint32_t copysetId, const std::set<std::string> &addrs) {
CreateCopysetRequest request;
CreateCopysetResponse response;
for (auto id : copysetIds) {
auto copyset = request.add_copysets();
copyset->set_poolid(poolId);
copyset->set_copysetid(id);
for (auto item : addrs) {
copyset->add_peers()->set_address(BuildPeerIdWithAddr(item));
}
auto copyset = request.add_copysets();
copyset->set_poolid(poolId);
copyset->set_copysetid(copysetId);
for (auto item : addrs) {
copyset->add_peers()->set_address(BuildPeerIdWithAddr(item));
}

for (const std::string &item : addrs) {
Expand Down
2 changes: 1 addition & 1 deletion curvefs/src/mds/metaserverclient/metaserver_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class MetaserverClient {
const std::set<std::string> &addrs);

virtual FSStatusCode CreateCopySet(uint32_t poolId,
std::set<uint32_t> copysetIds,
uint32_t copysetId,
const std::set<std::string> &addrs);

virtual FSStatusCode CreateCopySetOnOneMetaserver(uint32_t poolId,
Expand Down
223 changes: 200 additions & 23 deletions curvefs/src/mds/topology/topology.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -661,34 +661,59 @@ bool TopologyImpl::GetCopysetOfPartition(PartitionIdType id,
return false;
}

bool TopologyImpl::GetAvailableCopyset(CopySetInfo *out) const {
std::list<CopySetKey> TopologyImpl::GetAvailableCopysetList() const {
ReadLockGuard rlockCopySet(copySetMutex_);
uint64_t maxPartitionNum = option_.partitionNumberInCopyset;
CopySetKey key;
uint64_t minPartitionNum = maxPartitionNum;

std::list<CopySetKey> result;
for (auto const &it : copySetMap_) {
uint64_t partitionNum = it.second.GetPartitionNum();
if (partitionNum < maxPartitionNum) {
if (partitionNum < minPartitionNum) {
minPartitionNum = partitionNum;
key = it.first;
}
if (it.second.GetPartitionNum()
>= option_.maxPartitionNumberInCopyset) {
continue;
}
result.push_back(it.first);
}

if (minPartitionNum == maxPartitionNum) {
return result;
}

// choose random
int TopologyImpl::GetOneRandomNumber(int start, int end) const {
static std::random_device rd;
static std::mt19937 gen(rd());
std::uniform_int_distribution<> dis(start, end);
return dis(gen);
}

bool TopologyImpl::GetAvailableCopyset(CopySetInfo *out) const {
std::list<CopySetKey> copysetList = GetAvailableCopysetList();
if (copysetList.size() == 0) {
return false;
}

auto it = copySetMap_.find(key);
// random select one copyset
int randomValue = GetOneRandomNumber(0, copysetList.size() - 1);
auto iter = copysetList.begin();
std::advance(iter, randomValue);
auto it = copySetMap_.find(*iter);
if (it != copySetMap_.end()) {
*out = it->second;
return true;
}
return false;
}

int TopologyImpl::GetAvailableCopysetNum() const {
ReadLockGuard rlockCopySet(copySetMutex_);
int num = 0;
for (auto const &it : copySetMap_) {
if (it.second.GetPartitionNum()
>= option_.maxPartitionNumberInCopyset) {
continue;
}
num++;
}
return num;
}

std::list<Partition> TopologyImpl::GetPartitionOfFs(
FsIdType id, PartitionFilter filter) const {
std::list<Partition> ret;
Expand Down Expand Up @@ -1257,15 +1282,171 @@ TopoStatusCode TopologyImpl::ChooseNewMetaServerForCopyset(
return TopoStatusCode::TOPO_OK;
}

TopoStatusCode TopologyImpl::ChooseAvailableMetaServers(
std::set<MetaServerIdType> *metaServers, PoolIdType *poolId) {
ReadLockGuard rlockMetaserver(metaServerMutex_);
std::vector<const MetaServer *> vec;
uint32_t TopologyImpl::GetCopysetNumInMetaserver(MetaServerIdType id) const {
uint32_t num = 0;
for (const auto &it : copySetMap_) {
if (it.second.HasMember(id)) {
num++;
}
}

return num;
}

void TopologyImpl::GetAvailableMetaserversInPoolUnlock(
std::vector<const MetaServer *>* vec) {
for (const auto &it : metaServerMap_) {
if (it.second.GetOnlineState() == OnlineState::ONLINE) {
vec.emplace_back(&(it.second));
if (it.second.GetOnlineState() == OnlineState::ONLINE
&& it.second.GetMetaServerSpace().IsMetaserverResourceAvailable()
&& GetCopysetNumInMetaserver(it.first)
< option_.maxCopysetNumInMetaserver) {
vec->emplace_back(&(it.second));
}
}
}

TopoStatusCode TopologyImpl::GenCandidateMapUnlock(
const std::map<PoolIdType, uint16_t> &replicaMap,
std::map<PoolIdType, std::map<ZoneIdType, std::list<MetaServerIdType>>>
* candidateMap) {
// 1. get all online and available metaserver
std::vector<const MetaServer *> vec;
GetAvailableMetaserversInPoolUnlock(&vec);

// 2. genarate candidateMap
for (const auto &it : vec) {
Server server;
ServerIdType serverId = it->GetServerId();
if (!GetServer(serverId, &server)) {
LOG(ERROR) << "get server failed when choose metaservers,"
<< " the serverId = " << serverId;
return TopoStatusCode::TOPO_SERVER_NOT_FOUND;
}

PoolIdType poolId = server.GetPoolId();
ZoneIdType zoneId = server.GetZoneId();
auto iter = candidateMap->find(poolId);
if (iter == candidateMap->end()) {
std::map<ZoneIdType, std::list<MetaServerIdType>> tmpZoneMap;
iter = candidateMap->emplace(poolId, tmpZoneMap).first;
}

auto iter2 = iter->second.find(zoneId);
if (iter2 == iter->second.end()) {
std::list<MetaServerIdType> tempMetaseverList;
iter2 = iter->second.emplace(zoneId, tempMetaseverList).first;
}

iter2->second.push_back(it->GetId());
}

// 3. remove pools which available zone num < repalica num
for (auto it = candidateMap->begin(); it != candidateMap->end();) {
auto iter = replicaMap.find(it->first);
if (iter == replicaMap.end()) {
LOG(WARNING) << "poolId = " << it->first
<< " can not find in replicaMap";
it = candidateMap->erase(it);
continue;
}

uint16_t replicaNum = iter->second;
if (it->second.size() < replicaNum) {
LOG(WARNING) << "poolId = " << it->first << " need replica num = "
<< replicaNum << ", but only has available zone num = "
<< it->second.size();
it = candidateMap->erase(it);
continue;
}

LOG(INFO) << "find pool available, poolId = " << it->first
<< ", zone num = " << it->second.size()
<< ", replica num = " << replicaNum;
it++;
}

return TopoStatusCode::TOPO_OK;
}

TopoStatusCode TopologyImpl::GenInitialCopysetAddrBatch(uint32_t needCreateNum,
std::list<CopysetCreateInfo>* copysetList) {
ReadLockGuard rlockPool(poolMutex_);
ReadLockGuard rlockMetaserver(metaServerMutex_);
ReadLockGuard rlockCopyset(copySetMutex_);
LOG(INFO) << "GenInitialCopysetAddrBatch needCreateNum = "
<< needCreateNum << " begin";

// 1. genarate replicaMap
std::map<PoolIdType, uint16_t> replicaMap;
for (const auto &it : poolMap_) {
replicaMap.emplace(it.first, it.second.GetReplicaNum());
}

// 2. genarate candidateMap
std::map<PoolIdType, std::map<ZoneIdType, std::list<MetaServerIdType>>>
candidateMap;
auto ret = GenCandidateMapUnlock(replicaMap, &candidateMap);

// 3. generate need num copyset create info
if (candidateMap.size() == 0) {
LOG(WARNING) << "can not find available metaserver for copyset.";
return TopoStatusCode::TOPO_METASERVER_NOT_FOUND;
}

int createCount = 0;
while (createCount < needCreateNum) {
for (const auto &it : candidateMap) {
CopysetCreateInfo copysetInfo;
copysetInfo.poolId = it.first;
copysetInfo.copysetId = UNINITIALIZE_ID;
uint16_t replicaNum = replicaMap[copysetInfo.poolId];
copysetInfo.metaServerIds =
RandomGetMetaserverIds(it.second, replicaNum);
copysetList->push_back(copysetInfo);

createCount++;
if (needCreateNum == createCount) {
return TopoStatusCode::TOPO_OK;
}
}
}

LOG(ERROR) << "can not find available metaserver for copyset.";
return TopoStatusCode::TOPO_METASERVER_NOT_FOUND;
}

std::set<MetaServerIdType> TopologyImpl::RandomGetMetaserverIds(
const std::map<ZoneIdType, std::list<MetaServerIdType>>& zoneMap,
uint16_t num) {
std::vector<MetaServerIdType> metaseverList;
for (const auto &it : zoneMap) {
int randomValue = GetOneRandomNumber(0, it.second.size() - 1);
auto iter = it.second.begin();
std::advance(iter, randomValue);
metaseverList.push_back(*iter);
}

static std::random_device rd;
static std::mt19937 g(rd());
std::shuffle(metaseverList.begin(), metaseverList.end(), g);

std::set<MetaServerIdType> ids;
int count = 0;
for (MetaServerIdType it : metaseverList) {
ids.insert(it);
count++;
if (count == num) {
break;
}
}
return ids;
}

TopoStatusCode TopologyImpl::GenCopysetAddrByResourceUsage(
std::set<MetaServerIdType> *metaServers, PoolIdType *poolId) {
ReadLockGuard rlockMetaserver(metaServerMutex_);
std::vector<const MetaServer *> vec;
GetAvailableMetaserversInPoolUnlock(&vec);

// sort by resource usage
std::sort(vec.begin(), vec.end(),
Expand All @@ -1277,10 +1458,6 @@ TopoStatusCode TopologyImpl::ChooseAvailableMetaServers(
std::map<PoolIdType, std::map<ZoneIdType, MetaServer>> candidateMap;
std::map<PoolIdType, uint16_t> replicaMap;
for (const auto &it : vec) {
if (!it->GetMetaServerSpace().IsMetaserverResourceAvailable()) {
continue;
}

Server server;
ServerIdType serverId = it->GetServerId();
if (GetServer(serverId, &server)) {
Expand Down
Loading

0 comments on commit cd2c8c9

Please sign in to comment.