Skip to content

Commit

Permalink
curvefs-bs: merge extent if possible
Browse files Browse the repository at this point in the history
  • Loading branch information
wu-hanqing committed Mar 28, 2022
1 parent 4755bb7 commit 4172780
Show file tree
Hide file tree
Showing 4 changed files with 642 additions and 23 deletions.
100 changes: 82 additions & 18 deletions curvefs/src/client/volume/extent_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@
#include <glog/logging.h>

#include <algorithm>
#include "absl/types/optional.h"
#include <utility>

#include "absl/types/optional.h"
#include "curvefs/src/client/volume/metric.h"
#include "src/common/fast_align.h"

Expand All @@ -41,21 +42,20 @@ using ::curve::common::is_aligned;
using ::curve::common::is_alignment;
using ::curve::common::ReadLockGuard;
using ::curve::common::WriteLockGuard;
using ::curvefs::metaserver::VolumeExtent;

namespace {

// TODO(wuhanqing): this value should align with backend storage's block size
constexpr uint64_t kMinAllocGranularity = 4096;

bvar::LatencyRecorder g_write_divide_latency("extent_cache_write_divide");
bvar::LatencyRecorder g_read_divide_latency_("extent_cache_read_divide");
bvar::LatencyRecorder g_merge_latency("extent_cache_merge");
bvar::LatencyRecorder g_mark_written_latency("extent_cache_mark_written");

} // namespace

bvar::LatencyRecorder ExtentCache::writeDivideLatency_(
"extent_cache_write_divide");
bvar::LatencyRecorder ExtentCache::readDivideLatency_(
"extent_cache_read_divide");
bvar::LatencyRecorder ExtentCache::mergeLatency_("extent_cache_merge");
bvar::LatencyRecorder ExtentCache::markWrittenLatency_(
"extent_cache_mark_written");

ExtentCacheOption ExtentCache::option_;

Expand All @@ -64,7 +64,7 @@ void ExtentCache::Merge(uint64_t loffset, const PExtent& pExt) {
VLOG(9) << "merge extnet, loffset: " << loffset
<< ", physical offset: " << pExt.pOffset << ", len: " << pExt.len
<< ", writtern: " << !pExt.UnWritten;
LatencyUpdater updater(&mergeLatency_);
LatencyUpdater updater(&g_merge_latency);
WriteLockGuard lk(lock_);
MergeWithinRange(&extents_[align_down(loffset, option_.rangeSize)], loffset,
pExt);
Expand Down Expand Up @@ -111,7 +111,7 @@ void ExtentCache::DivideForWrite(uint64_t offset,
const char* data,
std::vector<WritePart>* allocated,
std::vector<AllocPart>* needAlloc) {
LatencyUpdater updater(&writeDivideLatency_);
LatencyUpdater updater(&g_write_divide_latency);
ReadLockGuard lk(lock_);

const auto end = offset + len;
Expand Down Expand Up @@ -295,7 +295,7 @@ void ExtentCache::DivideForWriteWithinRange(
}

void ExtentCache::MarkWritten(uint64_t offset, uint64_t len) {
LatencyUpdater updater(&markWrittenLatency_);
LatencyUpdater updater(&g_mark_written_latency);
WriteLockGuard lk(lock_);

auto cur = offset;
Expand All @@ -311,14 +311,16 @@ void ExtentCache::MarkWritten(uint64_t offset, uint64_t len) {
}
}

// TODO(wuhanqing): merge continuous extents
void ExtentCache::MarkWrittenWithinRange(std::map<uint64_t, PExtent>* range,
uint64_t offset,
uint64_t len) {
uint64_t curOff = offset;
const uint64_t curEnd = offset + len;

auto lower = range->lower_bound(curOff);
auto upper = range->upper_bound(curEnd);
const auto upper = range->upper_bound(curEnd);
auto prev = range->end();

if (lower != range->begin()) {
--lower;
Expand All @@ -329,14 +331,36 @@ void ExtentCache::MarkWrittenWithinRange(std::map<uint64_t, PExtent>* range,
return off1 + len1 <= off2 || off2 + len2 <= off1;
};

// merge written extents
auto mergeable =
[&prev, &range](
std::map<uint64_t, curvefs::client::PExtent>::iterator current) {
return prev != range->end() &&
!prev->second.UnWritten &&
!current->second.UnWritten &&
// logical is continuous
prev->first + prev->second.len == current->first &&
// physical is continuous
prev->second.pOffset + prev->second.len ==
current->second.pOffset;
};

while (curOff < curEnd && lower != upper) {
if (nonoverlap(curOff, curEnd, lower->first, lower->second.len)) {
prev = lower;
++lower;
continue;
}

if (!lower->second.UnWritten) {
++lower;
if (mergeable(lower)) {
prev->second.len += lower->second.len;
lower = range->erase(lower);
} else {
prev = lower;
++lower;
}

continue;
}

Expand All @@ -348,8 +372,15 @@ void ExtentCache::MarkWrittenWithinRange(std::map<uint64_t, PExtent>* range,
// write |-------| |--------|
// extent |----| |----|
lower->second.UnWritten = false;
if (mergeable(lower)) {
prev->second.len += lower->second.len;
lower = range->erase(lower);
} else {
prev = lower;
++lower;
}

curOff = extEnd;
++lower;
} else {
// write |----|
// extent |----|
Expand All @@ -372,8 +403,15 @@ void ExtentCache::MarkWrittenWithinRange(std::map<uint64_t, PExtent>* range,
// write |----| |-------|
// extent |----| |----|
lower->second.UnWritten = false;
if (mergeable(lower)) {
prev->second.len += lower->second.len;
lower = range->erase(lower);
} else {
prev = lower;
++lower;
}

curOff = extEnd;
++lower;
} else {
// write |----|
// extent |--------|
Expand All @@ -386,6 +424,11 @@ void ExtentCache::MarkWrittenWithinRange(std::map<uint64_t, PExtent>* range,
lower->second.len = overlap;
lower->second.UnWritten = false;

if (mergeable(lower)) {
prev->second.len += lower->second.len;
range->erase(lower);
}

range->emplace(curEnd, sep);
return;
}
Expand Down Expand Up @@ -443,6 +486,7 @@ void ExtentCache::MarkWrittenWithinRange(std::map<uint64_t, PExtent>* range,
range->emplace(curOff, sep);

curOff = extEnd;
prev = lower;
++lower;
}
}
Expand All @@ -457,21 +501,41 @@ ExtentCache::GetExtentsForTesting() const {

google::protobuf::Map<uint64_t, curvefs::metaserver::VolumeExtentList>
ExtentCache::ToInodePb() const {
ReadLockGuard lk(lock_);

google::protobuf::Map<uint64_t, curvefs::metaserver::VolumeExtentList> res;

auto mergeable =
[](const VolumeExtent* prev,
const std::pair<const uint64_t, curvefs::client::PExtent>& ext) {
if (!prev) {
return false;
}

return prev->isused() != ext.second.UnWritten &&
prev->fsoffset() + prev->length() == ext.first &&
prev->volumeoffset() + prev->length() == ext.second.pOffset;
};

ReadLockGuard lk(lock_);

for (const auto& range : extents_) {
curvefs::metaserver::VolumeExtentList& lis = res[range.first];
auto* pbExt = lis.mutable_volumeextents();
pbExt->Reserve(range.second.size());

VolumeExtent* prev = nullptr;

for (const auto& ext : range.second) {
if (mergeable(prev, ext)) {
prev->set_length(prev->length() + ext.second.len);
continue;;
}

auto* t = pbExt->Add();
t->set_fsoffset(ext.first);
t->set_volumeoffset(ext.second.pOffset);
t->set_length(ext.second.len);
t->set_isused(!ext.second.UnWritten);
prev = t;
}
}

Expand All @@ -483,7 +547,7 @@ void ExtentCache::DivideForRead(uint64_t offset,
char* data,
std::vector<ReadPart>* reads,
std::vector<ReadPart>* holes) {
LatencyUpdater updater(&readDivideLatency_);
LatencyUpdater updater(&g_read_divide_latency_);
ReadLockGuard lk(lock_);

const auto end = offset + len;
Expand Down
5 changes: 0 additions & 5 deletions curvefs/src/client/volume/extent_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,6 @@ class ExtentCache {
std::unordered_map<uint64_t, std::map<uint64_t, PExtent>> extents_;

private:
static bvar::LatencyRecorder writeDivideLatency_;
static bvar::LatencyRecorder readDivideLatency_;
static bvar::LatencyRecorder mergeLatency_;
static bvar::LatencyRecorder markWrittenLatency_;

static ExtentCacheOption option_;
};

Expand Down
Loading

0 comments on commit 4172780

Please sign in to comment.