Skip to content

Commit

Permalink
[feat]curvefs/client: add check in cache
Browse files Browse the repository at this point in the history
1. check disk cache is work
2. check kv cache is work
3. add tools-v2

Signed-off-by: Cyber-SiKu <Cyber-SiKu@outlook.com>
  • Loading branch information
Cyber-SiKu committed Nov 10, 2023
1 parent 1867323 commit 424d309
Show file tree
Hide file tree
Showing 20 changed files with 648 additions and 113 deletions.
3 changes: 3 additions & 0 deletions curvefs/src/client/common/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ std::ostream &operator<<(std::ostream &os, MetaServerOpType optype) {

const char kCurveFsWarmupOpAdd[] = "add";
const char kCurveFsWarmupOpCancel[] = "cancel";
const char kCurveFsWarmupOpCheck[] = "check";
const char kCurveFsWarmupTypeList[] = "list";
const char kCurveFsWarmupTypeSingle[] = "single";

Expand All @@ -89,6 +90,8 @@ WarmupOpType GetWarmupOpType(const std::string& op) {
ret = WarmupOpType::kWarmupOpAdd;
} else if (op == kCurveFsWarmupOpCancel) {
ret = WarmupOpType::kWarmupOpCancel;
} else if (op == kCurveFsWarmupOpCheck) {
ret = WarmupOpType::kWarmupOpCheck;
}
return ret;
}
Expand Down
2 changes: 2 additions & 0 deletions curvefs/src/client/common/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,13 @@ std::ostream &operator<<(std::ostream &os, MetaServerOpType optype);
constexpr size_t kMinWarmupOpArgsNum = 1;
constexpr size_t kWarmupAddArgsNum = 6;
constexpr size_t kWarmupCancelArgsNum = 2;
constexpr size_t kWarmupCheckArgsNum = 6;

enum class WarmupOpType {
kWarmupOpUnknown = 0,
kWarmupOpAdd = 1,
kWarmupOpCancel = 2,
kWarmupOpCheck = 3,
};

WarmupOpType GetWarmupOpType(const std::string& op);
Expand Down
51 changes: 44 additions & 7 deletions curvefs/src/client/curve_fuse_op.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@

#include "curvefs/src/client/curve_fuse_op.h"

#include <fmt/format.h>

#include <cstring>
#include <functional>
#include <memory>
#include <string>
#include <unordered_map>
Expand Down Expand Up @@ -65,6 +68,7 @@ using ::curvefs::client::common::WarmupStorageType;
using ::curvefs::client::filesystem::AttrOut;
using ::curvefs::client::filesystem::EntryOut;
using ::curvefs::client::filesystem::FileOut;
using ::curvefs::client::filesystem::IsCheckXAttr;
using ::curvefs::client::filesystem::IsListWarmupXAttr;
using ::curvefs::client::filesystem::IsWarmupXAttr;
using ::curvefs::client::filesystem::StrAttr;
Expand Down Expand Up @@ -235,16 +239,18 @@ void UnInitFuseClient() {
int AddWarmupTask(curvefs::client::common::WarmupType type, fuse_ino_t key,
const std::string& path,
curvefs::client::common::WarmupStorageType storageType,
const std::string& mount_point, const std::string& root) {
const std::string& mount_point, const std::string& root,
bool check = false) {
int ret = 0;
bool result = true;
switch (type) {
case curvefs::client::common::WarmupType::kWarmupTypeList:
result = g_ClientInstance->PutWarmFilelistTask(key, storageType, path,
mount_point, root);
break;
result = g_ClientInstance->PutWarmFilelistTask(
key, storageType, path, mount_point, root, check);
break;
case curvefs::client::common::WarmupType::kWarmupTypeSingle:
result = g_ClientInstance->PutWarmFileTask(key, path, storageType);
result =
g_ClientInstance->PutWarmFileTask(key, path, storageType, check);
break;
default:
// not support add warmup type (warmup single file/dir or filelist)
Expand Down Expand Up @@ -289,6 +295,18 @@ void QueryWarmupTask(fuse_ino_t key, std::string *data) {
VLOG(9) << "Warmup [" << key << "]" << *data;
}

void QueryCheckCachedTask(fuse_ino_t key, std::string* data) {
WarmupProgress progress;
bool ret = g_ClientInstance->GetCheckCachedProgress(key, &progress);
if (!ret) {
*data = "no check task or not warmup yet";
} else {
*data =
fmt::format("{}/{}", progress.GetFinished(), progress.GetTotal());
}
VLOG(9) << "check cached [" << key << "]" << *data;
}

void ListWarmupTasks(std::string* data) {
WarmupProgress progress;
std::unordered_map<std::string, WarmupProgress> filepath2progress;
Expand Down Expand Up @@ -342,9 +360,15 @@ int Warmup(fuse_ino_t key, const char* name, const std::string& values) {
}

int ret = 0;
bool check = false;
if (curvefs::client::common::GetWarmupOpType(warmupOpType) ==
curvefs::client::common::WarmupOpType::kWarmupOpCheck) {
check = true;
}

switch (curvefs::client::common::GetWarmupOpType(warmupOpType)) {
case curvefs::client::common::WarmupOpType::kWarmupOpAdd: {
case curvefs::client::common::WarmupOpType::kWarmupOpCheck:
case curvefs::client::common::WarmupOpType::kWarmupOpAdd : {
if (opTypePath.size() !=
curvefs::client::common::kWarmupAddArgsNum) {
LOG(ERROR)
Expand All @@ -371,7 +395,7 @@ int Warmup(fuse_ino_t key, const char* name, const std::string& values) {
ret = AddWarmupTask(
curvefs::client::common::GetWarmupType(warmupDataType), key,
entryFilePathInClient, storageType, mountPointInCurvefs,
rootPathInCurvefs);
rootPathInCurvefs, check);
break;
}
case curvefs::client::common::WarmupOpType::kWarmupOpCancel: {
Expand Down Expand Up @@ -444,6 +468,17 @@ void QueryWarmup(fuse_req_t req, fuse_ino_t ino, size_t size) {
return fs->ReplyBuffer(req, data.data(), data.length());
}

void QueryCheckCached(fuse_req_t req, fuse_ino_t ino, size_t size) {
auto fs = Client()->GetFileSystem();

std::string data;
QueryCheckCachedTask(ino, &data);
if (size == 0) {
return fs->ReplyXattr(req, data.length());
}
return fs->ReplyBuffer(req, data.data(), data.length());
}

void ListWarmup(fuse_req_t req, size_t size) {
auto fs = Client()->GetFileSystem();

Expand Down Expand Up @@ -942,6 +977,8 @@ void FuseOpGetXattr(fuse_req_t req,
return ListWarmup(req, size);
} else if (IsWarmupXAttr(name)) {
return QueryWarmup(req, ino, size);
} else if (IsCheckXAttr(name)) {
return QueryCheckCached(req, ino, size);
}

rc = Client()->FuseOpGetXattr(req, ino, name, &value, size);
Expand Down
5 changes: 5 additions & 0 deletions curvefs/src/client/filesystem/xattr.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ const char XATTR_DIR_RFBYTES[] = "curve.dir.rfbytes";
const char XATTR_DIR_PREFIX[] = "curve.dir";
const char XATTR_WARMUP_OP[] = "curvefs.warmup.op";
const char XATTR_WARMUP_OP_LIST[] = "curvefs.warmup.op.list";
const char XATTR_WARMUP_OP_CHECK[] = "curvefs.warmup.check";

inline bool IsSpecialXAttr(const std::string& key) {
static std::map<std::string, bool> xattrs {
Expand All @@ -65,6 +66,10 @@ inline bool IsWarmupXAttr(const std::string& key) {
return key == XATTR_WARMUP_OP;
}

inline bool IsCheckXAttr(const std::string& key) {
return key == XATTR_WARMUP_OP_CHECK;
}

inline bool IsListWarmupXAttr(const std::string& key) {
return key == XATTR_WARMUP_OP_LIST;
}
Expand Down
18 changes: 13 additions & 5 deletions curvefs/src/client/fuse_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -312,18 +312,18 @@ class FuseClient {
bool PutWarmFilelistTask(fuse_ino_t key, common::WarmupStorageType type,
const std::string& path,
const std::string& mount_point,
const std::string& root) {
const std::string& root, bool check = false) {
if (fsInfo_->fstype() == FSType::TYPE_S3) {
return warmupManager_->AddWarmupFilelist(key, type, path,
mount_point, root);
mount_point, root, check);
} // only support s3
return true;
}

bool PutWarmFileTask(fuse_ino_t key, const std::string &path,
common::WarmupStorageType type) {
bool PutWarmFileTask(fuse_ino_t key, const std::string& path,
common::WarmupStorageType type, bool check = false) {
if (fsInfo_->fstype() == FSType::TYPE_S3) {
return warmupManager_->AddWarmupFile(key, path, type);
return warmupManager_->AddWarmupFile(key, path, type, check);
} // only support s3
return true;
}
Expand All @@ -342,6 +342,14 @@ class FuseClient {
return false;
}

bool GetCheckCachedProgress(fuse_ino_t key,
warmup::WarmupProgress* progress) {
if (fsInfo_->fstype() == FSType::TYPE_S3) {
return warmupManager_->QueryCheckCachedProgress(key, progress);
}
return false;
}

bool GetAllWarmupProgress(Filepath2WarmupProgressMap* filepath2progress) {
if (fsInfo_->fstype() == FSType::TYPE_S3) {
return warmupManager_->ListWarmupProgress(filepath2progress);
Expand Down
1 change: 1 addition & 0 deletions curvefs/src/client/kvclient/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,6 @@ cc_library(
"//src/common:curve_common",
"//src/common:curve_s3_adapter",
"@libmemcached",
"@fmt//:fmt",
],
)
3 changes: 3 additions & 0 deletions curvefs/src/client/kvclient/kvclient.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#include <libmemcached-1.0/types/return.h>

#include <cstdint>
#include <string>

namespace curvefs {
Expand Down Expand Up @@ -54,6 +55,8 @@ class KVClient {
virtual bool Get(const std::string& key, char* value, uint64_t offset,
uint64_t length, std::string* errorlog,
uint64_t* actLength, memcached_return_t* retCod) = 0;

virtual bool Exist(const std::string& key) = 0;
};

} // namespace client
Expand Down
7 changes: 7 additions & 0 deletions curvefs/src/client/kvclient/kvclient_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,5 +128,12 @@ int KVClientManager::GetKvCache(
return 0;
}

void KVClientManager::Exist(std::shared_ptr<ExistKVCacheTask> task) {
threadPool_.Enqueue([task, this]() {
task->res = client_->Exist(task->key);
OnReturn(&kvClientManagerMetric_->exist, task);
});
}

} // namespace client
} // namespace curvefs
39 changes: 20 additions & 19 deletions curvefs/src/client/kvclient/kvclient_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,7 @@ namespace client {
class KVClientManager;
struct SetKVCacheTask;
struct GetKVCacheTask;

class GetKvCacheContext;
class SetKvCacheContext;
struct ExistKVCacheTask;

using curve::common::GetObjectAsyncContext;
using curve::common::TaskThreadPool;
Expand All @@ -58,6 +56,8 @@ using SetKVCacheDone =
std::function<void(const std::shared_ptr<SetKVCacheTask>&)>;
using GetKVCacheDone =
std::function<void(const std::shared_ptr<GetKVCacheTask>&)>;
using ExistKVCacheDone =
std::function<void(const std::shared_ptr<ExistKVCacheTask>&)>;

struct SetKVCacheTask {
std::string key;
Expand All @@ -79,7 +79,7 @@ struct SetKVCacheTask {
};

struct GetKVCacheTask {
const std::string& key;
std::string key;
char* value;
uint64_t offset;
uint64_t valueLength;
Expand All @@ -101,11 +101,21 @@ struct GetKVCacheTask {
timer(butil::Timer::STARTED) {}
};

using GetKvCacheCallBack =
std::function<void(const std::shared_ptr<GetKvCacheContext>&)>;
struct ExistKVCacheTask {
std::string key;
bool res;
uint64_t length = 0; // useless,just for OnReturn
ExistKVCacheDone done;
butil::Timer timer;

using SetKvCacheCallBack =
std::function<void(const std::shared_ptr<SetKvCacheContext>&)>;
explicit ExistKVCacheTask(
const std::string& k,
ExistKVCacheDone done = [](const std::shared_ptr<ExistKVCacheTask>&) {})
: key(k),
res(false),
done(std::move(done)),
timer(butil::Timer::STARTED) {}
};

struct KvCacheContext {
std::string key;
Expand All @@ -117,17 +127,6 @@ struct KvCacheContext {
uint64_t startTime;
};

struct GetKvCacheContext : KvCacheContext {
char* value;
bool res;
GetKvCacheCallBack cb;
};

struct SetKvCacheContext : KvCacheContext {
const char* value;
SetKvCacheCallBack cb;
};

class KVClientManager {
public:
KVClientManager() = default;
Expand All @@ -146,6 +145,8 @@ class KVClientManager {

void Get(std::shared_ptr<GetKVCacheTask> task);

void Exist(std::shared_ptr<ExistKVCacheTask> task);

KVClientManagerMetric* GetMetricForTesting() {
return kvClientManagerMetric_.get();
}
Expand Down
34 changes: 34 additions & 0 deletions curvefs/src/client/kvclient/memcache_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,44 @@

#include "curvefs/src/client/kvclient/memcache_client.h"

#include <libmemcached-1.0/exist.h>
#include <libmemcached-1.0/types/return.h>

#include "src/client/client_metric.h"

namespace curvefs {
namespace client {

thread_local memcached_st* tcli = nullptr;

bool MemCachedClient::Exist(const std::string& key) {
// https://awesomized.github.io/libmemcached/libmemcached/memcached_exist.html?highlight=exist#_CPPv415memcached_existP12memcached_stPcP6size_t
memcached_return_t ue;
size_t value_length = 0;
uint64_t start = butil::cpuwide_time_us();
if (nullptr == tcli) {
LOG(ERROR) << "create tcli";
tcli = memcached_clone(nullptr, client_);
}
ue = memcached_exist(tcli, key.c_str(), key.length());
if (ue == MEMCACHED_SUCCESS) {
curve::client::CollectMetrics(&metric_->exist, 0,
butil::cpuwide_time_us() - start);
return true;
}

if (ue == MEMCACHED_NOTFOUND) {
curve::client::CollectMetrics(&metric_->exist, 0,
butil::cpuwide_time_us() - start);
} else {
std::string errorlog = ResError(ue);
LOG(ERROR) << "Exist key = " << key << " error = " << errorlog;
metric_->exist.eps.count << 1;
}
memcached_free(tcli);
tcli = nullptr;
return false;
}

} // namespace client
} // namespace curvefs

0 comments on commit 424d309

Please sign in to comment.