Skip to content

Commit

Permalink
Fix alloc_bytes of ReadLimiter (#5852) (#5930)
Browse files Browse the repository at this point in the history
close #5801
  • Loading branch information
ti-chi-bot committed Oct 9, 2022
1 parent d90fbfa commit 7668a60
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 38 deletions.
81 changes: 54 additions & 27 deletions dbms/src/Encryption/RateLimiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,23 @@ inline CurrentMetrics::Increment pendingRequestMetrics(LimiterType type)
}
}

String getEnumName(LimiterType type)
{
switch (type)
{
case LimiterType::FG_READ:
return "FG_READ";
case LimiterType::BG_READ:
return "BG_READ";
case LimiterType::FG_WRITE:
return "FG_WRITE";
case LimiterType::BG_WRITE:
return "BG_WRITE";
default:
return "UNKNOWN";
}
}

WriteLimiter::WriteLimiter(Int64 rate_limit_per_sec_, LimiterType type_, UInt64 refill_period_ms_)
: refill_period_ms{refill_period_ms_}
, refill_balance_per_period{calculateRefillBalancePerPeriod(rate_limit_per_sec_)}
Expand All @@ -107,6 +124,7 @@ WriteLimiter::WriteLimiter(Int64 rate_limit_per_sec_, LimiterType type_, UInt64
, requests_to_wait{0}
, type(type_)
, alloc_bytes{0}
, log(Logger::get(getEnumName(type_)))
{}

WriteLimiter::~WriteLimiter()
Expand All @@ -131,7 +149,8 @@ void WriteLimiter::request(Int64 bytes)
consumeBytes(bytes);
return;
}

Stopwatch sw_pending;
Int64 wait_times = 0;
auto pending_request = pendingRequestMetrics(type);

// request cannot be satisfied at this moment, enqueue
Expand All @@ -140,7 +159,7 @@ void WriteLimiter::request(Int64 bytes)
while (!r.granted)
{
assert(!req_queue.empty());

wait_times++;
bool timed_out = false;
// if this request is in the front of req_queue,
// then it is responsible to trigger the refill process.
Expand Down Expand Up @@ -191,6 +210,7 @@ void WriteLimiter::request(Int64 bytes)
}
}
}
LOG_FMT_TRACE(log, "pending_us {} wait_times {} pending_count {} rate_limit_per_sec {}", sw_pending.elapsed() / 1000, wait_times, req_queue.size(), refill_balance_per_period * 1000 / refill_period_ms);
}

size_t WriteLimiter::setStop()
Expand Down Expand Up @@ -298,8 +318,8 @@ ReadLimiter::ReadLimiter(
, getIOStatistic(std::move(getIOStatistic_))
, last_stat_bytes(getIOStatistic())
, last_stat_time(now())
, log(&Poco::Logger::get("ReadLimiter"))
, get_io_statistic_period_us(get_io_stat_period_us)
, last_refill_time(std::chrono::system_clock::now())
{}

Int64 ReadLimiter::getAvailableBalance()
Expand Down Expand Up @@ -333,18 +353,22 @@ Int64 ReadLimiter::refreshAvailableBalance()
else
{
Int64 real_alloc_bytes = bytes - last_stat_bytes;
metricAllocBytes(type, real_alloc_bytes);
// `alloc_bytes` is the number of byte that ReadLimiter has allocated.
if (available_balance > 0)
{
auto can_alloc_bytes = std::min(real_alloc_bytes, available_balance);
alloc_bytes += can_alloc_bytes;
metricAllocBytes(type, can_alloc_bytes);
}
available_balance -= real_alloc_bytes;
alloc_bytes += real_alloc_bytes;
}
last_stat_bytes = bytes;
last_stat_time = us;
return available_balance;
}

void ReadLimiter::consumeBytes(Int64 bytes)
void ReadLimiter::consumeBytes([[maybe_unused]] Int64 bytes)
{
metricRequestBytes(type, bytes);
// Do nothing for read.
}

Expand All @@ -355,10 +379,26 @@ bool ReadLimiter::canGrant([[maybe_unused]] Int64 bytes)

void ReadLimiter::refillAndAlloc()
{
if (available_balance < refill_balance_per_period)
// `available_balance` of `ReadLimiter` may be overdrawn.
if (available_balance < 0)
{
// Limiter may not be called for a long time.
// During this time, limiter can be refilled at most `max_refill_times` times and covers some overdraft.
auto elapsed_duration = std::chrono::system_clock::now() - last_refill_time;
UInt64 elapsed_ms = std::chrono::duration_cast<std::chrono::milliseconds>(elapsed_duration).count();
// At least refill one time.
Int64 max_refill_times = std::max(elapsed_ms, refill_period_ms) / refill_period_ms;
Int64 max_refill_bytes = max_refill_times * refill_balance_per_period;
Int64 can_alloc_bytes = std::min(-available_balance, max_refill_bytes);
alloc_bytes += can_alloc_bytes;
metricAllocBytes(type, can_alloc_bytes);
available_balance = std::min(available_balance + max_refill_bytes, refill_balance_per_period);
}
else
{
available_balance += refill_balance_per_period;
available_balance = refill_balance_per_period;
}
last_refill_time = std::chrono::system_clock::now();

assert(!req_queue.empty());
auto * head_req = req_queue.front();
Expand Down Expand Up @@ -723,26 +763,13 @@ IOLimitTuner::TuneResult IOLimitTuner::tune() const
}

auto [max_read_bytes_per_sec, max_write_bytes_per_sec, rw_tuned] = tuneReadWrite();
LOG_FMT_INFO(
log,
"tuneReadWrite: max_read {} max_write {} rw_tuned {}",
max_read_bytes_per_sec,
max_write_bytes_per_sec,
rw_tuned);
auto [max_bg_read_bytes_per_sec, max_fg_read_bytes_per_sec, read_tuned] = tuneRead(max_read_bytes_per_sec);
LOG_FMT_INFO(
log,
"tuneRead: bg_read {} fg_read {} read_tuned {}",
max_bg_read_bytes_per_sec,
max_fg_read_bytes_per_sec,
read_tuned);
auto [max_bg_write_bytes_per_sec, max_fg_write_bytes_per_sec, write_tuned] = tuneWrite(max_write_bytes_per_sec);
LOG_FMT_INFO(
log,
"tuneWrite: bg_write {} fg_write {} write_tuned {}",
max_bg_write_bytes_per_sec,
max_fg_write_bytes_per_sec,
write_tuned);
if (rw_tuned || read_tuned || write_tuned)
{
LOG_FMT_INFO(log, "tune_msg: bg_write {} => {} fg_write {} => {} bg_read {} => {} fg_read {} => {}", bg_write_stat != nullptr ? bg_write_stat->maxBytesPerSec() : 0, max_bg_write_bytes_per_sec, fg_write_stat != nullptr ? fg_write_stat->maxBytesPerSec() : 0, max_fg_write_bytes_per_sec, bg_read_stat != nullptr ? bg_read_stat->maxBytesPerSec() : 0, max_bg_read_bytes_per_sec, fg_read_stat != nullptr ? fg_read_stat->maxBytesPerSec() : 0, max_fg_read_bytes_per_sec);
}

return {.max_bg_read_bytes_per_sec = max_bg_read_bytes_per_sec,
.max_fg_read_bytes_per_sec = max_fg_read_bytes_per_sec,
.read_tuned = read_tuned || rw_tuned,
Expand Down
10 changes: 7 additions & 3 deletions dbms/src/Encryption/RateLimiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

#pragma once

#include <Common/Logger.h>
#include <Common/Stopwatch.h>
#include <Common/nocopyable.h>
#include <Server/StorageConfigParser.h>
Expand Down Expand Up @@ -74,7 +74,10 @@ class WriteLimiter
void request(Int64 bytes);

// just for test purpose
inline UInt64 getTotalBytesThrough() const { return alloc_bytes; }
inline UInt64 getTotalBytesThrough() const
{
return available_balance < 0 ? alloc_bytes - available_balance : alloc_bytes;
}

LimiterStat getStat();

Expand Down Expand Up @@ -134,6 +137,7 @@ class WriteLimiter

Stopwatch stat_stop_watch;
UInt64 alloc_bytes;
LoggerPtr log;
};

using WriteLimiterPtr = std::shared_ptr<WriteLimiter>;
Expand Down Expand Up @@ -185,9 +189,9 @@ class ReadLimiter : public WriteLimiter
return std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now());
}
TimePoint last_stat_time;
Poco::Logger * log;

Int64 get_io_statistic_period_us;
std::chrono::time_point<std::chrono::system_clock> last_refill_time;
};

using ReadLimiterPtr = std::shared_ptr<ReadLimiter>;
Expand Down
37 changes: 31 additions & 6 deletions dbms/src/Encryption/tests/gtest_rate_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -337,23 +337,48 @@ TEST(ReadLimiterTest, LimiterStat)
ASSERT_GT(stat.pct(), 100) << stat.toString();
}

static constexpr UInt64 alloc_bytes = 2047;
static constexpr UInt64 total_bytes = 2047;
for (int i = 0; i < 11; i++)
{
request(read_limiter, 1 << i);
}

std::this_thread::sleep_for(100ms);
read_limiter.refreshAvailableBalance();

stat = read_limiter.getStat();
ASSERT_EQ(stat.alloc_bytes, alloc_bytes);
ASSERT_GE(stat.elapsed_ms, alloc_bytes / 100 + 1);
ASSERT_EQ(stat.alloc_bytes, total_bytes + read_limiter.getAvailableBalance());
ASSERT_GE(stat.elapsed_ms, stat.alloc_bytes / 100 + 1);
ASSERT_EQ(stat.refill_period_ms, 100ul);
ASSERT_EQ(stat.refill_bytes_per_period, 100);
ASSERT_EQ(stat.maxBytesPerSec(), 1000);
ASSERT_EQ(stat.avgBytesPerSec(), static_cast<Int64>(alloc_bytes * 1000 / stat.elapsed_ms)) << stat.toString();
ASSERT_EQ(stat.pct(), static_cast<Int64>(alloc_bytes * 1000 / stat.elapsed_ms) * 100 / stat.maxBytesPerSec()) << stat.toString();
ASSERT_EQ(stat.avgBytesPerSec(), static_cast<Int64>(stat.alloc_bytes * 1000 / stat.elapsed_ms)) << stat.toString();
ASSERT_EQ(stat.pct(), static_cast<Int64>(stat.alloc_bytes * 1000 / stat.elapsed_ms) * 100 / stat.maxBytesPerSec()) << stat.toString();
}

TEST(ReadLimiterTest, ReadMany)
{
Int64 real_read_bytes{0};
auto get_read_bytes = [&]() {
return real_read_bytes;
};
auto request = [&](ReadLimiter & limiter, Int64 bytes) {
limiter.request(bytes);
real_read_bytes += bytes;
};

constexpr Int64 bytes_per_sec = 1000;
constexpr UInt64 refill_period_ms = 100;
ReadLimiter read_limiter(get_read_bytes, bytes_per_sec, LimiterType::UNKNOW, refill_period_ms);
ASSERT_EQ(read_limiter.getAvailableBalance(), 100);
request(read_limiter, 1000);
std::this_thread::sleep_for(2ms);
ASSERT_EQ(read_limiter.getAvailableBalance(), -900);
ASSERT_EQ(read_limiter.alloc_bytes, 100);

std::this_thread::sleep_for(1200ms);
Stopwatch sw;
request(read_limiter, 100);
ASSERT_LE(sw.elapsedMilliseconds(), 1); // Not blocked.
}

#ifdef __linux__
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/TestUtils/MockReadLimiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ class MockReadLimiter final : public ReadLimiter
protected:
void consumeBytes(Int64 bytes) override
{
// Need soft limit here.
WriteLimiter::consumeBytes(bytes);
alloc_bytes += std::min(available_balance, bytes);
available_balance -= bytes;
}
};

Expand Down

0 comments on commit 7668a60

Please sign in to comment.