Skip to content

Commit

Permalink
curvefs-bs: merge extent if possible
Browse files Browse the repository at this point in the history
  • Loading branch information
wu-hanqing committed Mar 29, 2022
1 parent bd0bbf6 commit 85ec688
Show file tree
Hide file tree
Showing 5 changed files with 708 additions and 23 deletions.
100 changes: 82 additions & 18 deletions curvefs/src/client/volume/extent_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@
#include <glog/logging.h>

#include <algorithm>
#include "absl/types/optional.h"
#include <utility>

#include "absl/types/optional.h"
#include "curvefs/src/client/volume/metric.h"
#include "src/common/fast_align.h"

Expand All @@ -41,21 +42,20 @@ using ::curve::common::is_aligned;
using ::curve::common::is_alignment;
using ::curve::common::ReadLockGuard;
using ::curve::common::WriteLockGuard;
using ::curvefs::metaserver::VolumeExtent;

namespace {

// TODO(wuhanqing): this value should align with backend storage's block size
constexpr uint64_t kMinAllocGranularity = 4096;

bvar::LatencyRecorder g_write_divide_latency("extent_cache_write_divide");
bvar::LatencyRecorder g_read_divide_latency_("extent_cache_read_divide");
bvar::LatencyRecorder g_merge_latency("extent_cache_merge");
bvar::LatencyRecorder g_mark_written_latency("extent_cache_mark_written");

} // namespace

bvar::LatencyRecorder ExtentCache::writeDivideLatency_(
"extent_cache_write_divide");
bvar::LatencyRecorder ExtentCache::readDivideLatency_(
"extent_cache_read_divide");
bvar::LatencyRecorder ExtentCache::mergeLatency_("extent_cache_merge");
bvar::LatencyRecorder ExtentCache::markWrittenLatency_(
"extent_cache_mark_written");

ExtentCacheOption ExtentCache::option_;

Expand All @@ -64,7 +64,7 @@ void ExtentCache::Merge(uint64_t loffset, const PExtent& pExt) {
VLOG(9) << "merge extnet, loffset: " << loffset
<< ", physical offset: " << pExt.pOffset << ", len: " << pExt.len
<< ", writtern: " << !pExt.UnWritten;
LatencyUpdater updater(&mergeLatency_);
LatencyUpdater updater(&g_merge_latency);
WriteLockGuard lk(lock_);
MergeWithinRange(&extents_[align_down(loffset, option_.rangeSize)], loffset,
pExt);
Expand Down Expand Up @@ -111,7 +111,7 @@ void ExtentCache::DivideForWrite(uint64_t offset,
const char* data,
std::vector<WritePart>* allocated,
std::vector<AllocPart>* needAlloc) {
LatencyUpdater updater(&writeDivideLatency_);
LatencyUpdater updater(&g_write_divide_latency);
ReadLockGuard lk(lock_);

const auto end = offset + len;
Expand Down Expand Up @@ -295,7 +295,7 @@ void ExtentCache::DivideForWriteWithinRange(
}

void ExtentCache::MarkWritten(uint64_t offset, uint64_t len) {
LatencyUpdater updater(&markWrittenLatency_);
LatencyUpdater updater(&g_mark_written_latency);
WriteLockGuard lk(lock_);

auto cur = offset;
Expand All @@ -311,14 +311,16 @@ void ExtentCache::MarkWritten(uint64_t offset, uint64_t len) {
}
}

// TODO(wuhanqing): merge continuous extents
void ExtentCache::MarkWrittenWithinRange(std::map<uint64_t, PExtent>* range,
uint64_t offset,
uint64_t len) {
uint64_t curOff = offset;
const uint64_t curEnd = offset + len;

auto lower = range->lower_bound(curOff);
auto upper = range->upper_bound(curEnd);
const auto upper = range->upper_bound(curEnd);
auto prev = range->end();

if (lower != range->begin()) {
--lower;
Expand All @@ -329,14 +331,36 @@ void ExtentCache::MarkWrittenWithinRange(std::map<uint64_t, PExtent>* range,
return off1 + len1 <= off2 || off2 + len2 <= off1;
};

// merge written extents
auto mergeable =
[&prev, &range](
std::map<uint64_t, curvefs::client::PExtent>::iterator current) {
return prev != range->end() &&
!prev->second.UnWritten &&
!current->second.UnWritten &&
// logical is continuous
prev->first + prev->second.len == current->first &&
// physical is continuous
prev->second.pOffset + prev->second.len ==
current->second.pOffset;
};

while (curOff < curEnd && lower != upper) {
if (nonoverlap(curOff, curEnd, lower->first, lower->second.len)) {
prev = lower;
++lower;
continue;
}

if (!lower->second.UnWritten) {
++lower;
if (mergeable(lower)) {
prev->second.len += lower->second.len;
lower = range->erase(lower);
} else {
prev = lower;
++lower;
}

continue;
}

Expand All @@ -348,8 +372,15 @@ void ExtentCache::MarkWrittenWithinRange(std::map<uint64_t, PExtent>* range,
// write |-------| |--------|
// extent |----| |----|
lower->second.UnWritten = false;
if (mergeable(lower)) {
prev->second.len += lower->second.len;
lower = range->erase(lower);
} else {
prev = lower;
++lower;
}

curOff = extEnd;
++lower;
} else {
// write |----|
// extent |----|
Expand All @@ -372,8 +403,15 @@ void ExtentCache::MarkWrittenWithinRange(std::map<uint64_t, PExtent>* range,
// write |----| |-------|
// extent |----| |----|
lower->second.UnWritten = false;
if (mergeable(lower)) {
prev->second.len += lower->second.len;
lower = range->erase(lower);
} else {
prev = lower;
++lower;
}

curOff = extEnd;
++lower;
} else {
// write |----|
// extent |--------|
Expand All @@ -386,6 +424,11 @@ void ExtentCache::MarkWrittenWithinRange(std::map<uint64_t, PExtent>* range,
lower->second.len = overlap;
lower->second.UnWritten = false;

if (mergeable(lower)) {
prev->second.len += lower->second.len;
range->erase(lower);
}

range->emplace(curEnd, sep);
return;
}
Expand Down Expand Up @@ -443,6 +486,7 @@ void ExtentCache::MarkWrittenWithinRange(std::map<uint64_t, PExtent>* range,
range->emplace(curOff, sep);

curOff = extEnd;
prev = lower;
++lower;
}
}
Expand All @@ -457,21 +501,41 @@ ExtentCache::GetExtentsForTesting() const {

google::protobuf::Map<uint64_t, curvefs::metaserver::VolumeExtentList>
ExtentCache::ToInodePb() const {
ReadLockGuard lk(lock_);

google::protobuf::Map<uint64_t, curvefs::metaserver::VolumeExtentList> res;

auto mergeable =
[](const VolumeExtent* prev,
const std::pair<const uint64_t, curvefs::client::PExtent>& ext) {
if (!prev) {
return false;
}

return prev->isused() != ext.second.UnWritten &&
prev->fsoffset() + prev->length() == ext.first &&
prev->volumeoffset() + prev->length() == ext.second.pOffset;
};

ReadLockGuard lk(lock_);

for (const auto& range : extents_) {
curvefs::metaserver::VolumeExtentList& lis = res[range.first];
auto* pbExt = lis.mutable_volumeextents();
pbExt->Reserve(range.second.size());

VolumeExtent* prev = nullptr;

for (const auto& ext : range.second) {
if (mergeable(prev, ext)) {
prev->set_length(prev->length() + ext.second.len);
continue;;
}

auto* t = pbExt->Add();
t->set_fsoffset(ext.first);
t->set_volumeoffset(ext.second.pOffset);
t->set_length(ext.second.len);
t->set_isused(!ext.second.UnWritten);
prev = t;
}
}

Expand All @@ -483,7 +547,7 @@ void ExtentCache::DivideForRead(uint64_t offset,
char* data,
std::vector<ReadPart>* reads,
std::vector<ReadPart>* holes) {
LatencyUpdater updater(&readDivideLatency_);
LatencyUpdater updater(&g_read_divide_latency_);
ReadLockGuard lk(lock_);

const auto end = offset + len;
Expand Down
5 changes: 0 additions & 5 deletions curvefs/src/client/volume/extent_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,6 @@ class ExtentCache {
std::unordered_map<uint64_t, std::map<uint64_t, PExtent>> extents_;

private:
static bvar::LatencyRecorder writeDivideLatency_;
static bvar::LatencyRecorder readDivideLatency_;
static bvar::LatencyRecorder mergeLatency_;
static bvar::LatencyRecorder markWrittenLatency_;

static ExtentCacheOption option_;
};

Expand Down
66 changes: 66 additions & 0 deletions curvefs/test/client/test_fuse_s3_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1132,5 +1132,71 @@ TEST_F(TestFuseS3Client, FuseOpOpen_Trunc_EnableSummary) {
ASSERT_EQ(p.xattr().find(XATTRFBYTES)->second, "100");
}

TEST_F(TestFuseS3Client, FuseOpListXattr) {
char buf[256];
std::memset(buf, 0, 256);
size_t size = 0;

fuse_req_t req;
fuse_ino_t ino = 1;
struct fuse_file_info fi;
Inode inode;
inode.set_inodeid(ino);
inode.set_length(4096);
inode.set_type(FsFileType::TYPE_S3);
std::string key = "security";
inode.mutable_xattr()->insert({key, "0"});

auto inodeWrapper = std::make_shared<InodeWrapper>(inode, metaClient_);
size_t realSize = 0;

// failed when get inode
EXPECT_CALL(*inodeManager_, GetInode(ino, _))
.WillOnce(
DoAll(SetArgReferee<1>(inodeWrapper),
Return(CURVEFS_ERROR::INTERNAL)));
CURVEFS_ERROR ret = client_->FuseOpListXattr(
req, ino, buf, size, &realSize);
ASSERT_EQ(CURVEFS_ERROR::INTERNAL, ret);

EXPECT_CALL(*inodeManager_, GetInode(ino, _))
.WillOnce(
DoAll(SetArgReferee<1>(inodeWrapper), Return(CURVEFS_ERROR::OK)));
ret = client_->FuseOpListXattr(
req, ino, buf, size, &realSize);
ASSERT_EQ(CURVEFS_ERROR::OK, ret);
ASSERT_EQ(realSize, key.length() + 1);

realSize = 0;
inodeWrapper->GetMutableInodeUnlocked()->
set_type(FsFileType::TYPE_DIRECTORY);
EXPECT_CALL(*inodeManager_, GetInode(ino, _))
.WillOnce(
DoAll(SetArgReferee<1>(inodeWrapper), Return(CURVEFS_ERROR::OK)));
ret = client_->FuseOpListXattr(
req, ino, buf, size, &realSize);
ASSERT_EQ(CURVEFS_ERROR::OK, ret);
auto expected = key.length() + 1 + strlen(XATTRRFILES) + 1 +
strlen(XATTRRSUBDIRS) + 1 + strlen(XATTRRENTRIES) + 1 +
strlen(XATTRRFBYTES) + 1;
ASSERT_EQ(realSize, expected);

realSize = 0;
EXPECT_CALL(*inodeManager_, GetInode(ino, _))
.WillOnce(
DoAll(SetArgReferee<1>(inodeWrapper), Return(CURVEFS_ERROR::OK)));
ret = client_->FuseOpListXattr(
req, ino, buf, expected - 1, &realSize);
ASSERT_EQ(CURVEFS_ERROR::OUT_OF_RANGE, ret);

realSize = 0;
EXPECT_CALL(*inodeManager_, GetInode(ino, _))
.WillOnce(
DoAll(SetArgReferee<1>(inodeWrapper), Return(CURVEFS_ERROR::OK)));
ret = client_->FuseOpListXattr(
req, ino, buf, expected, &realSize);
ASSERT_EQ(CURVEFS_ERROR::OK, ret);
}

} // namespace client
} // namespace curvefs
Loading

0 comments on commit 85ec688

Please sign in to comment.