Skip to content

Commit

Permalink
[feat]curvefs/client:add memcache to warmup
Browse files Browse the repository at this point in the history
Signed-off-by: Cyber-SiKu <Cyber-SiKu@outlook.com>
  • Loading branch information
Cyber-SiKu committed Mar 10, 2023
1 parent 87664bd commit bc36988
Show file tree
Hide file tree
Showing 21 changed files with 192 additions and 128 deletions.
14 changes: 9 additions & 5 deletions curvefs/src/client/fuse_s3_client.cpp
Expand Up @@ -70,7 +70,7 @@ CURVEFS_ERROR FuseS3Client::Init(const FuseClientOption &option) {
auto fsCacheManager = std::make_shared<FsCacheManager>(
dynamic_cast<S3ClientAdaptorImpl *>(s3Adaptor_.get()),
opt.s3Opt.s3ClientAdaptorOpt.readCacheMaxByte,
opt.s3Opt.s3ClientAdaptorOpt.writeCacheMaxByte);
opt.s3Opt.s3ClientAdaptorOpt.writeCacheMaxByte, kvClientManager_);
if (opt.s3Opt.s3ClientAdaptorOpt.diskCacheOpt.diskCacheType !=
DiskCacheType::Disable) {
auto s3DiskCacheClient = std::make_shared<S3ClientImpl>();
Expand All @@ -84,11 +84,11 @@ CURVEFS_ERROR FuseS3Client::Init(const FuseClientOption &option) {
diskCacheManager, s3DiskCacheClient);
ret = s3Adaptor_->Init(opt.s3Opt.s3ClientAdaptorOpt, s3Client,
inodeManager_, mdsClient_, fsCacheManager,
diskCacheManagerImpl, true);
diskCacheManagerImpl, kvClientManager_, true);
} else {
ret = s3Adaptor_->Init(opt.s3Opt.s3ClientAdaptorOpt, s3Client,
inodeManager_, mdsClient_, fsCacheManager,
nullptr, true);
nullptr, kvClientManager_, true);
}
return ret;
}
Expand All @@ -112,13 +112,17 @@ bool FuseS3Client::InitKVCache(const KVClientManagerOpt &opt) {
return false;
}

g_kvClientManager = new KVClientManager();
if (!g_kvClientManager->Init(opt, memcacheClient)) {
kvClientManager_ = std::make_shared<KVClientManager>();
if (!kvClientManager_->Init(opt, memcacheClient)) {
LOG(ERROR) << "FLAGS_supportKVcache = " << FLAGS_supportKVcache
<< ", but init kvClientManager fail";
return false;
}

if (warmupManager_ != nullptr) {
warmupManager_->SetKVClientManager(kvClientManager_);
}

return true;
}

Expand Down
3 changes: 2 additions & 1 deletion curvefs/src/client/fuse_s3_client.h
Expand Up @@ -57,7 +57,7 @@ class FuseS3Client : public FuseClient {
};
warmupManager_ = std::make_shared<warmup::WarmupManagerS3Impl>(
metaClient_, inodeManager_, dentryManager_, fsInfo_, readFunc,
s3Adaptor_);
s3Adaptor_, nullptr);
}

FuseS3Client(const std::shared_ptr<MdsClient> &mdsClient,
Expand Down Expand Up @@ -118,6 +118,7 @@ class FuseS3Client : public FuseClient {
private:
// s3 adaptor
std::shared_ptr<S3ClientAdaptor> s3Adaptor_;
std::shared_ptr<KVClientManager> kvClientManager_;
};


Expand Down
16 changes: 6 additions & 10 deletions curvefs/src/client/kvclient/kvclient_manager.cpp
Expand Up @@ -21,6 +21,7 @@
*/

#include "curvefs/src/client/kvclient/kvclient_manager.h"
#include <memory>
#include "src/client/client_metric.h"
#include "src/common/concurrent/count_down_event.h"

Expand All @@ -31,32 +32,27 @@ using curvefs::client::metric::KVClientMetric;
namespace curvefs {
namespace client {

KVClientManager *g_kvClientManager = nullptr;
KVClientMetric *g_kvClientMetric = nullptr;

#define ONRETURN(TYPE, RES) \
if (RES) { \
g_kvClientMetric->kvClient##TYPE.qps.count << 1; \
kvClientMetric_.kvClient##TYPE.qps.count << 1; \
} else { \
g_kvClientMetric->kvClient##TYPE.eps.count << 1; \
}
kvClientMetric_.kvClient##TYPE.eps.count << 1; \
} \

bool KVClientManager::Init(const KVClientManagerOpt &config,
const std::shared_ptr<KVClient> &kvclient) {
client_ = kvclient;
g_kvClientMetric = new KVClientMetric();
return threadPool_.Start(config.setThreadPooln) == 0;
}

void KVClientManager::Uninit() {
client_->UnInit();
threadPool_.Stop();
delete g_kvClientMetric;
}

void KVClientManager::Set(std::shared_ptr<SetKVCacheTask> task) {
threadPool_.Enqueue([task, this]() {
LatencyGuard guard(&g_kvClientMetric->kvClientSet.latency);
LatencyGuard guard(&kvClientMetric_.kvClientSet.latency);

std::string error_log;
auto res =
Expand All @@ -69,7 +65,7 @@ void KVClientManager::Set(std::shared_ptr<SetKVCacheTask> task) {

void KVClientManager::Get(std::shared_ptr<GetKVCacheTask> task) {
threadPool_.Enqueue([task, this]() {
LatencyGuard guard(&g_kvClientMetric->kvClientGet.latency);
LatencyGuard guard(&kvClientMetric_.kvClientGet.latency);

std::string error_log;
task->res = client_->Get(task->key, task->value, task->offset,
Expand Down
14 changes: 7 additions & 7 deletions curvefs/src/client/kvclient/kvclient_manager.h
Expand Up @@ -48,9 +48,6 @@ class GetKVCacheTask;
using curve::common::TaskThreadPool;
using curvefs::client::common::KVClientManagerOpt;

extern KVClientManager *g_kvClientManager;
extern KVClientMetric *g_kvClientMetric;

typedef std::function<void(const std::shared_ptr<SetKVCacheTask> &)>
SetKVCacheDone;
typedef std::function<void(const std::shared_ptr<GetKVCacheTask> &)>
Expand All @@ -62,10 +59,10 @@ struct SetKVCacheTask {
uint64_t length;
SetKVCacheDone done;
SetKVCacheTask() = default;
SetKVCacheTask(const std::string &k, const char *val, const uint64_t len)
: key(k), value(val), length(len) {
done = [](const std::shared_ptr<SetKVCacheTask> &) {};
}
SetKVCacheTask(
const std::string &k, const char *val, const uint64_t len,
SetKVCacheDone done = [](const std::shared_ptr<SetKVCacheTask> &) {})
: key(k), value(val), length(len), done(std::move(done)) {}
};

struct GetKVCacheTask {
Expand Down Expand Up @@ -98,12 +95,15 @@ class KVClientManager {

void Get(std::shared_ptr<GetKVCacheTask> task);

KVClientMetric *GetClientMetricForTesting() { return &kvClientMetric_; }

private:
void Uninit();

private:
TaskThreadPool<bthread::Mutex, bthread::ConditionVariable> threadPool_;
std::shared_ptr<KVClient> client_;
KVClientMetric kvClientMetric_;
};

} // namespace client
Expand Down
5 changes: 4 additions & 1 deletion curvefs/src/client/s3/client_s3_adaptor.cpp
Expand Up @@ -24,6 +24,7 @@
#include <brpc/controller.h>
#include <algorithm>
#include <list>
#include <utility>

#include "absl/memory/memory.h"
#include "curvefs/src/client/s3/client_s3_adaptor.h"
Expand All @@ -39,6 +40,7 @@ S3ClientAdaptorImpl::Init(
std::shared_ptr<MdsClient> mdsClient,
std::shared_ptr<FsCacheManager> fsCacheManager,
std::shared_ptr<DiskCacheManagerImpl> diskCacheManagerImpl,
std::shared_ptr<KVClientManager> kvClientManager,
bool startBackGround) {
pendingReq_ = 0;
blockSize_ = option.blockSize;
Expand Down Expand Up @@ -66,6 +68,7 @@ S3ClientAdaptorImpl::Init(
fsCacheManager_ = fsCacheManager;
waitInterval_.Init(option.intervalSec * 1000);
diskCacheManagerImpl_ = diskCacheManagerImpl;
kvClientManager_ = std::move(kvClientManager);
if (HasDiskCache()) {
diskCacheManagerImpl_ = diskCacheManagerImpl;
if (diskCacheManagerImpl_->Init(option) < 0) {
Expand Down Expand Up @@ -372,7 +375,7 @@ CURVEFS_ERROR S3ClientAdaptorImpl::FlushAllCache(uint64_t inodeId) {
}

// force flush data in diskcache to s3
if (!g_kvClientManager && HasDiskCache()) {
if (!kvClientManager_ && HasDiskCache()) {
VLOG(6) << "FlushAllCache, wait inodeId:" << inodeId
<< "related chunk upload to s3";
if (ClearDiskCache(inodeId) < 0) {
Expand Down
4 changes: 4 additions & 0 deletions curvefs/src/client/s3/client_s3_adaptor.h
Expand Up @@ -71,6 +71,7 @@ class S3ClientAdaptor {
std::shared_ptr<MdsClient> mdsClient,
std::shared_ptr<FsCacheManager> fsCacheManager,
std::shared_ptr<DiskCacheManagerImpl> diskCacheManagerImpl,
std::shared_ptr<KVClientManager> kvClientManager,
bool startBackGround = false) = 0;
/**
* @brief write data to s3
Expand Down Expand Up @@ -128,6 +129,7 @@ class S3ClientAdaptorImpl : public S3ClientAdaptor {
std::shared_ptr<MdsClient> mdsClient,
std::shared_ptr<FsCacheManager> fsCacheManager,
std::shared_ptr<DiskCacheManagerImpl> diskCacheManagerImpl,
std::shared_ptr<KVClientManager> kvClientManager,
bool startBackGround = false);
/**
* @brief write data to s3
Expand Down Expand Up @@ -274,6 +276,8 @@ class S3ClientAdaptorImpl : public S3ClientAdaptor {

TaskThreadPool<bthread::Mutex, bthread::ConditionVariable>
taskPool_;

std::shared_ptr<KVClientManager> kvClientManager_ = nullptr;
};

} // namespace client
Expand Down
27 changes: 16 additions & 11 deletions curvefs/src/client/s3/client_s3_cache_manager.cpp
Expand Up @@ -96,7 +96,7 @@ FsCacheManager::FindOrCreateFileCacheManager(uint64_t fsId, uint64_t inodeId) {
}

FileCacheManagerPtr fileCacheManager = std::make_shared<FileCacheManager>(
fsId, inodeId, s3ClientAdaptor_);
fsId, inodeId, s3ClientAdaptor_, kvClientManager_);
auto ret = fileCacheManagerMap_.emplace(inodeId, fileCacheManager);
g_s3MultiManagerMetric->fileManagerNum << 1;
assert(ret.second);
Expand Down Expand Up @@ -293,7 +293,8 @@ FileCacheManager::FindOrCreateChunkCacheManager(uint64_t index) {
}

ChunkCacheManagerPtr chunkCacheManager =
std::make_shared<ChunkCacheManager>(index, s3ClientAdaptor_);
std::make_shared<ChunkCacheManager>(index, s3ClientAdaptor_,
kvClientManager_);
auto ret = chunkCacheMap_.emplace(index, chunkCacheManager);
g_s3MultiManagerMetric->chunkManagerNum << 1;
assert(ret.second);
Expand Down Expand Up @@ -509,7 +510,7 @@ bool FileCacheManager::ReadKVRequestFromRemoteCache(const std::string &name,
char *databuf,
uint64_t offset,
uint64_t length) {
if (!g_kvClientManager) {
if (!kvClientManager_) {
return false;
}

Expand All @@ -519,7 +520,7 @@ bool FileCacheManager::ReadKVRequestFromRemoteCache(const std::string &name,
event.Signal();
return;
};
g_kvClientManager->Get(task);
kvClientManager_->Get(task);
event.Wait();

return task->res;
Expand Down Expand Up @@ -616,7 +617,7 @@ int FileCacheManager::ReadKVRequest(
WriteLockGuard writeLockGuard(chunkCacheManager->rwLockChunk_);
DataCachePtr dataCache = std::make_shared<DataCache>(
s3ClientAdaptor_, chunkCacheManager, chunkPos, req->len,
dataBuf + req->readOffset);
dataBuf + req->readOffset, kvClientManager_);
chunkCacheManager->AddReadDataCache(dataCache);
}
}
Expand Down Expand Up @@ -1477,8 +1478,9 @@ DataCachePtr ChunkCacheManager::FindWriteableDataCache(
void ChunkCacheManager::WriteNewDataCache(S3ClientAdaptorImpl *s3ClientAdaptor,
uint32_t chunkPos, uint32_t len,
const char *data) {
DataCachePtr dataCache = std::make_shared<DataCache>(
s3ClientAdaptor, this->shared_from_this(), chunkPos, len, data);
DataCachePtr dataCache =
std::make_shared<DataCache>(s3ClientAdaptor, this->shared_from_this(),
chunkPos, len, data, kvClientManager_);
VLOG(9) << "WriteNewDataCache chunkPos:" << chunkPos << ", len:" << len
<< ", new len:" << dataCache->GetLen() << ",chunkIndex:" << index_;
WriteLockGuard writeLockGuard(rwLockWrite_);
Expand Down Expand Up @@ -1733,7 +1735,8 @@ void ChunkCacheManager::AddWriteDataCacheForTest(DataCachePtr dataCache) {

DataCache::DataCache(S3ClientAdaptorImpl *s3ClientAdaptor,
ChunkCacheManagerPtr chunkCacheManager, uint64_t chunkPos,
uint64_t len, const char *data)
uint64_t len, const char *data,
std::shared_ptr<KVClientManager> kvClientManager)
: s3ClientAdaptor_(std::move(s3ClientAdaptor)),
chunkCacheManager_(chunkCacheManager),
status_(DataCacheStatus::Dirty), inReadCache_(false) {
Expand Down Expand Up @@ -1792,6 +1795,8 @@ DataCache::DataCache(S3ClientAdaptorImpl *s3ClientAdaptor,
assert((actualLen_ % pageSize) == 0);
assert((actualChunkPos_ % pageSize) == 0);
createTime_ = ::curve::common::TimeUtility::GetTimeofDaySec();

kvClientManager_ = std::move(kvClientManager);
}

void DataCache::CopyBufToDataCache(uint64_t dataCachePos, uint64_t len,
Expand Down Expand Up @@ -2312,7 +2317,7 @@ CURVEFS_ERROR DataCache::PrepareFlushTasks(uint64_t inodeId,
s3Tasks->emplace_back(context);

// generate flush to kvcache task
if (g_kvClientManager) {
if (kvClientManager_) {
auto task = std::make_shared<SetKVCacheTask>();
task->key = objectName;
task->value = data + (*writeOffset);
Expand Down Expand Up @@ -2398,11 +2403,11 @@ void DataCache::FlushTaskExecute(
});
}
// kvtask execute
if (g_kvClientManager && kvPendingTaskCal.load()) {
if (kvClientManager_ && kvPendingTaskCal.load()) {
std::for_each(kvCacheTasks.begin(), kvCacheTasks.end(),
[&](const std::shared_ptr<SetKVCacheTask> &task) {
task->done = kvdone;
g_kvClientManager->Set(task);
kvClientManager_->Set(task);
});
kvTaskEnvent.Wait();
}
Expand Down

0 comments on commit bc36988

Please sign in to comment.