Skip to content

Commit

Permalink
curvebs/common: some mirror fixes and optimizations
Browse files Browse the repository at this point in the history
Signed-off-by: Hanqing Wu <wuhanqing@corp.netease.com>

1. remove unnecessary `std::bind` and make use of `std::move`
2. use bthread synchronization primitives in `CopysetNodeManager` and `ConcurrentApplyModule`
3. call `shrink_to_fit()` when segment is closed to reduce memory footprint
4. add some useful log
5. fix some compilation warnings
  • Loading branch information
wu-hanqing committed Feb 14, 2023
1 parent 8db1ec6 commit 1a44544
Show file tree
Hide file tree
Showing 26 changed files with 215 additions and 315 deletions.
15 changes: 7 additions & 8 deletions src/chunkserver/concurrent_apply/concurrent_apply.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
#include <glog/logging.h>

#include <algorithm>
#include <vector>
#include "src/chunkserver/concurrent_apply/concurrent_apply.h"
#include "src/common/concurrent/count_down_event.h"

Expand Down Expand Up @@ -78,9 +77,9 @@ bool ConcurrentApplyModule::checkOptAndInit(


void ConcurrentApplyModule::InitThreadPool(
ThreadPoolType type, int concorrent, int depth) {
for (int i = 0; i < concorrent; i++) {
auto asyncth = new (std::nothrow) taskthread(depth);
ThreadPoolType type, int concurrent, int depth) {
for (int i = 0; i < concurrent; i++) {
auto asyncth = new (std::nothrow) TaskThread(depth);
CHECK(asyncth != nullptr) << "allocate failed!";

switch (type) {
Expand All @@ -94,16 +93,16 @@ void ConcurrentApplyModule::InitThreadPool(
}
}

for (int i = 0; i < concorrent; i++) {
for (int i = 0; i < concurrent; i++) {
switch (type) {
case ThreadPoolType::READ:
rapplyMap_[i]->th = std::move(
std::thread(&ConcurrentApplyModule::Run, this, type, i));
rapplyMap_[i]->th =
std::thread(&ConcurrentApplyModule::Run, this, type, i);
break;

case ThreadPoolType::WRITE:
wapplyMap_[i]->th =
std::thread(&ConcurrentApplyModule::Run, this, type, i);
std::thread(&ConcurrentApplyModule::Run, this, type, i);
break;
}
}
Expand Down
47 changes: 24 additions & 23 deletions src/chunkserver/concurrent_apply/concurrent_apply.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,28 +23,31 @@
#ifndef SRC_CHUNKSERVER_CONCURRENT_APPLY_CONCURRENT_APPLY_H_
#define SRC_CHUNKSERVER_CONCURRENT_APPLY_CONCURRENT_APPLY_H_

#include <bthread/condition_variable.h>
#include <bthread/mutex.h>
#include <glog/logging.h>
#include <unistd.h>

#include <atomic>
#include <mutex> // NOLINT
#include <thread> // NOLINT
#include <condition_variable> // NOLINT
#include <mutex> // NOLINT
#include <thread> // NOLINT
#include <unordered_map>
#include <utility>
#include <condition_variable> // NOLINT

#include "src/common/concurrent/task_queue.h"
#include "src/common/concurrent/count_down_event.h"
#include "proto/chunk.pb.h"
#include "include/curve_compiler_specific.h"
#include "proto/chunk.pb.h"
#include "src/common/concurrent/count_down_event.h"
#include "src/common/concurrent/task_queue.h"

using curve::common::TaskQueue;
using curve::common::CountDownEvent;
using curve::chunkserver::CHUNK_OP_TYPE;

namespace curve {
namespace chunkserver {
namespace concurrent {

using ::curve::common::GenericTaskQueue;

struct ConcurrentApplyOption {
int wconcurrentsize;
int wqueuedepth;
Expand All @@ -62,7 +65,6 @@ class CURVE_CACHELINE_ALIGNMENT ConcurrentApplyModule {
rqueuedepth_(0),
wqueuedepth_(0),
cond_(0) {}
~ConcurrentApplyModule() {}

/**
* Init: initialize ConcurrentApplyModule
Expand All @@ -80,15 +82,16 @@ class CURVE_CACHELINE_ALIGNMENT ConcurrentApplyModule {
* @param[in] f: task
* @param[in] args: param to excute task
*/
template<class F, class... Args>
template <class F, class... Args>
bool Push(uint64_t key, CHUNK_OP_TYPE optype, F&& f, Args&&... args) {
auto task = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
switch (Schedule(optype)) {
case ThreadPoolType::READ:
rapplyMap_[Hash(key, rconcurrentsize_)]->tq.Push(task);
rapplyMap_[Hash(key, rconcurrentsize_)]->tq.Push(
std::forward<F>(f), std::forward<Args>(args)...);
break;
case ThreadPoolType::WRITE:
wapplyMap_[Hash(key, wconcurrentsize_)]->tq.Push(task);
wapplyMap_[Hash(key, wconcurrentsize_)]->tq.Push(
std::forward<F>(f), std::forward<Args>(args)...);
break;
}

Expand All @@ -107,31 +110,29 @@ class CURVE_CACHELINE_ALIGNMENT ConcurrentApplyModule {

void Run(ThreadPoolType type, int index);

ThreadPoolType Schedule(CHUNK_OP_TYPE optype);
static ThreadPoolType Schedule(CHUNK_OP_TYPE optype);

void InitThreadPool(ThreadPoolType type, int concorrent, int depth);

int Hash(uint64_t key, int concurrent) {
static int Hash(uint64_t key, int concurrent) {
return key % concurrent;
}

private:
typedef uint8_t threadIndex;
typedef struct taskthread {
struct TaskThread {
std::thread th;
TaskQueue tq;
taskthread(size_t capacity):tq(capacity) {}
~taskthread() = default;
} taskthread_t;
GenericTaskQueue<bthread::Mutex, bthread::ConditionVariable> tq;
explicit TaskThread(size_t capacity) : tq(capacity) {}
};

bool start_;
int rconcurrentsize_;
int rqueuedepth_;
int wconcurrentsize_;
int wqueuedepth_;
CountDownEvent cond_;
CURVE_CACHELINE_ALIGNMENT std::unordered_map<threadIndex, taskthread_t*> wapplyMap_; // NOLINT
CURVE_CACHELINE_ALIGNMENT std::unordered_map<threadIndex, taskthread_t*> rapplyMap_; // NOLINT
CURVE_CACHELINE_ALIGNMENT std::unordered_map<int, TaskThread*> wapplyMap_;
CURVE_CACHELINE_ALIGNMENT std::unordered_map<int, TaskThread*> rapplyMap_;
};
} // namespace concurrent
} // namespace chunkserver
Expand Down
20 changes: 7 additions & 13 deletions src/chunkserver/copyset_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,13 +288,10 @@ void CopysetNode::on_apply(::braft::Iterator &iter) {
*chunkClosure = dynamic_cast<ChunkClosure *>(iter.done());
CHECK(nullptr != chunkClosure)
<< "ChunkClosure dynamic cast failed";
std::shared_ptr<ChunkOpRequest> opRequest = chunkClosure->request_;
auto task = std::bind(&ChunkOpRequest::OnApply,
opRequest,
iter.index(),
doneGuard.release());
concurrentapply_->Push(
opRequest->ChunkId(), opRequest->OpType(), task);
std::shared_ptr<ChunkOpRequest>& opRequest = chunkClosure->request_;
concurrentapply_->Push(opRequest->ChunkId(), opRequest->OpType(),
&ChunkOpRequest::OnApply, opRequest,
iter.index(), doneGuard.release());
} else {
// 获取log entry
butil::IOBuf log = iter.data();
Expand All @@ -309,12 +306,9 @@ void CopysetNode::on_apply(::braft::Iterator &iter) {
auto opReq = ChunkOpRequest::Decode(log, &request, &data,
iter.index(), GetLeaderId());
auto chunkId = request.chunkid();
auto task = std::bind(&ChunkOpRequest::OnApplyFromLog,
opReq,
dataStore_,
std::move(request),
data);
concurrentapply_->Push(chunkId, request.optype(), task);
concurrentapply_->Push(chunkId, request.optype(),
&ChunkOpRequest::OnApplyFromLog, opReq,
dataStore_, std::move(request), data);
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions src/chunkserver/copyset_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,10 @@ class CopysetNode : public braft::StateMachine,
::braft::Closure *done);

void ShipToSync(ChunkID chunkId) {
if (enableOdsyncWhenOpenChunkFile_) {
return;
}

curve::common::LockGuard lg(chunkIdsLock_);
chunkIdsToSync_.push_back(chunkId);
}
Expand Down
4 changes: 2 additions & 2 deletions src/chunkserver/copyset_node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
namespace curve {
namespace chunkserver {

using curve::common::RWLock;
using curve::common::BthreadRWLock;
using curve::common::ReadLockGuard;
using curve::common::WriteLockGuard;
using curve::common::TaskThreadPool;
Expand Down Expand Up @@ -216,7 +216,7 @@ class CopysetNodeManager : public curve::common::Uncopyable {
using CopysetNodeMap = std::unordered_map<GroupId,
std::shared_ptr<CopysetNode>>;
// 保护复制组 map的读写锁
mutable RWLock rwLock_;
mutable BthreadRWLock rwLock_;
// 复制组map
CopysetNodeMap copysetNodeMap_;
// 复制组配置选项
Expand Down
2 changes: 1 addition & 1 deletion src/chunkserver/datastore/chunkserver_chunkfile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ namespace chunkserver {

namespace {

bool ValidMinIoAlignment(const char* flagname, uint32_t value) {
bool ValidMinIoAlignment(const char* /*flagname*/, uint32_t value) {
return common::is_aligned(value, 512);
}

Expand Down
2 changes: 1 addition & 1 deletion src/chunkserver/datastore/chunkserver_datastore.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ using curve::fs::LocalFileSystem;
using ::curve::common::Atomic;
using CSChunkFilePtr = std::shared_ptr<CSChunkFile>;

inline void TrivialDeleter(void* ptr) {}
inline void TrivialDeleter(void* /*ptr*/) {}

/**
* DataStore configuration parameters
Expand Down
4 changes: 2 additions & 2 deletions src/chunkserver/datastore/file_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ bool FilePool::GetChunk(bool needClean, uint64_t* chunkid, bool* isCleaned) {
}

int FilePool::GetFile(const std::string& targetpath,
char* metapage,
const char* metapage,
bool needClean) {
int ret = -1;
int retry = 0;
Expand Down Expand Up @@ -513,7 +513,7 @@ int FilePool::AllocateChunk(const std::string& chunkpath) {
return ret;
}

bool FilePool::WriteMetaPage(const std::string& sourcepath, char* page) {
bool FilePool::WriteMetaPage(const std::string& sourcepath, const char* page) {
int fd = -1;
int ret = -1;

Expand Down
7 changes: 4 additions & 3 deletions src/chunkserver/datastore/file_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,9 @@ class CURVE_CACHELINE_ALIGNMENT FilePool {
* @param: metapage is the metapage information of the new chunk
* @param: needClean is whether chunk need fill zero
*/
virtual int GetFile(const std::string& chunkpath, char* metapage,
bool needClean = false);
virtual int GetFile(const std::string& chunkpath,
const char* metapage,
bool needClean = false);
/**
* Datastore deletes chunks and recycles directly, not really deleted
* @param: chunkpath is the chunk path that needs to be recycled
Expand Down Expand Up @@ -217,7 +218,7 @@ class CURVE_CACHELINE_ALIGNMENT FilePool {
* @param: page is the metapage information to be written
* @return: returns true if successful, otherwise false
*/
bool WriteMetaPage(const std::string& sourcepath, char* page);
bool WriteMetaPage(const std::string& sourcepath, const char* page);
/**
* Directly allocate chunks, not from FilePool
* @param: chunkpath is the path of the chunk file in the datastore
Expand Down

0 comments on commit 1a44544

Please sign in to comment.