Skip to content

Commit

Permalink
[feat]curvefs/client: warmup manager
Browse files Browse the repository at this point in the history
1. add WarmupManager
2. add WarmupManagerS3Impl
3. add query warmup progress in tools-v2

Signed-off-by: Cyber-SiKu <Cyber-SiKu@outlook.com>
  • Loading branch information
Cyber-SiKu authored and wuhongsong committed Feb 20, 2023
1 parent fbc32f4 commit e03e921
Show file tree
Hide file tree
Showing 27 changed files with 2,135 additions and 874 deletions.
2 changes: 2 additions & 0 deletions curvefs/src/client/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ cc_library(
"s3/*.h",
"volume/*.cpp",
"volume/*.h",
"warmup/*.h",
"warmup/*.cpp",
],
exclude = ["main.cpp"],
),
Expand Down
4 changes: 4 additions & 0 deletions curvefs/src/client/common/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ std::ostream &operator<<(std::ostream &os, MetaServerOpType optype) {
}

const char kCurveFsWarmupOpAdd[] = "add";
const char kCurveFsWarmupOpQuery[] = "query";
const char kCurveFsWarmupTypeList[] = "list";
const char kCurveFsWarmupTypeSingle[] = "single";

Expand All @@ -87,6 +88,9 @@ WarmupOpType GetWarmupOpType(const std::string& op) {
if (op == kCurveFsWarmupOpAdd) {
ret = WarmupOpType::kWarmupOpAdd;
}
if (op == kCurveFsWarmupOpQuery) {
ret = WarmupOpType::kWarmupOpQuery;
}
return ret;
}

Expand Down
1 change: 1 addition & 0 deletions curvefs/src/client/common/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ const char kCurveFsWarmupXAttr[] = "curvefs.warmup.op";
enum class WarmupOpType {
kWarmupOpUnknown = 0,
kWarmupOpAdd = 1,
kWarmupOpQuery = 2,
};

WarmupOpType GetWarmupOpType(const std::string& op);
Expand Down
82 changes: 59 additions & 23 deletions curvefs/src/client/curve_fuse_op.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include "curvefs/src/client/metric/client_metric.h"
#include "curvefs/src/common/metric_utils.h"
#include "curvefs/src/common/dynamic_vlog.h"
#include "curvefs/src/client/warmup/warmup_manager.h"

using ::curve::common::Configuration;
using ::curvefs::client::CURVEFS_ERROR;
Expand Down Expand Up @@ -279,25 +280,47 @@ void FuseOpGetAttr(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi) {
fuse_reply_attr(req, &attr, g_fuseClientOption->attrTimeOut);
}

int AddWarmupTask(const std::string& type, const std::string& path) {
int AddWarmupTask(curvefs::client::common::WarmupType type, fuse_ino_t key,
const std::string &path) {
int ret = 0;
switch (curvefs::client::common::GetWarmupType(type)) {
case curvefs::client::common::WarmupType::kWarmupTypeList:
g_ClientInstance->PutWarmTask(path);
break;
case curvefs::client::common::WarmupType::kWarmupTypeSingle:
g_ClientInstance->FetchDentryEnqueue(path);
break;
default:
// not support add warmup type (warmup single file/dir or filelist)
LOG(ERROR) << "not support warmup type, only support single/list";
ret = ERANGE;
bool result = true;
switch (type) {
case curvefs::client::common::WarmupType::kWarmupTypeList:
result = g_ClientInstance->PutWarmFilelistTask(key);
break;
case curvefs::client::common::WarmupType::kWarmupTypeSingle:
result = g_ClientInstance->PutWarmFileTask(key, path);
break;
default:
// not support add warmup type (warmup single file/dir or filelist)
LOG(ERROR) << "not support warmup type, only support single/list";
ret = EOPNOTSUPP;
}
if (!result) {
ret = ERANGE;
}
return ret;
}

int Warmup(const std::string& name, const std::string& value) {
void QueryWarmupTask(fuse_ino_t key, std::string *data) {
curvefs::client::warmup::WarmupProgress progress;
bool ret = g_ClientInstance->GetWarmupProgress(key, &progress);
if (!ret) {
*data = "finished";
} else {
*data = std::to_string(progress.GetFinished()) + "/" +
std::to_string(progress.GetTotal());
}
VLOG(9) << "Warmup [" << key << "]" << *data;
}

int Warmup(fuse_ino_t key, const std::string& name, const std::string& value) {
// warmup
if (g_ClientInstance->GetFsInfo()->fstype() != FSType::TYPE_S3) {
LOG(ERROR) << "warmup only support s3";
return EOPNOTSUPP;
}

std::vector<std::string> opTypePath;
curve::common::SplitString(value, "\n", &opTypePath);
if (opTypePath.size() != 3) {
Expand All @@ -306,15 +329,17 @@ int Warmup(const std::string& name, const std::string& value) {
}
int ret = 0;
switch (curvefs::client::common::GetWarmupOpType(opTypePath[0])) {
case curvefs::client::common::WarmupOpType::kWarmupOpAdd:
ret = AddWarmupTask(opTypePath[1], opTypePath[2]);
if (ret != 0) {
LOG(ERROR) << name << " has invalid xattr value " << value;
}
break;
default:
case curvefs::client::common::WarmupOpType::kWarmupOpAdd:
ret =
AddWarmupTask(curvefs::client::common::GetWarmupType(opTypePath[1]),
key, opTypePath[2]);
if (ret != 0) {
LOG(ERROR) << name << " has invalid xattr value " << value;
ret = ERANGE;
}
break;
default:
LOG(ERROR) << name << " has invalid xattr value " << value;
ret = ERANGE;
}
return ret;
}
Expand All @@ -327,7 +352,7 @@ void FuseOpSetXattr(fuse_req_t req, fuse_ino_t ino, const char* name,
<< " flags " << flags;
if (strcmp(name, curvefs::client::common::kCurveFsWarmupXAttr) == 0) {
// warmup
int code = Warmup(name, xattrValue);
int code = Warmup(ino, name, xattrValue);
fuse_reply_err(req, code);
} else {
// set xattr
Expand All @@ -340,7 +365,18 @@ void FuseOpSetXattr(fuse_req_t req, fuse_ino_t ino, const char* name,
}

void FuseOpGetXattr(fuse_req_t req, fuse_ino_t ino, const char *name,
size_t size) {
size_t size) {
if (strcmp(name, curvefs::client::common::kCurveFsWarmupXAttr) == 0) {
// warmup
std::string data;
QueryWarmupTask(ino, &data);
if (size == 0) {
fuse_reply_xattr(req, data.length());
} else {
fuse_reply_buf(req, data.data(), data.length());
}
return;
}
InflightGuard guard(&g_clientOpMetric->opGetXattr.inflightOpNum);
LatencyUpdater updater(&g_clientOpMetric->opGetXattr.latency);
std::string buf;
Expand Down

0 comments on commit e03e921

Please sign in to comment.