Skip to content

Commit

Permalink
[adhoc] ybase - Fix behavior of BackgroundTask configured with interv…
Browse files Browse the repository at this point in the history
…al wake.

Summary:
The BackgroundTask component is used to house a background thread which runs a single pre-defined task. The
API allows you to explicitly wake the task or configure an interval on which the task should wake itself.
However, in the latter case, we would not run any task until an explicit Wake() call happened due to a bug.
This diff fixes that bug and adds unit tests for the BackgroundTask component.

Test Plan:
ybd --cxx-test util_background_task-test
and Jenkins

Reviewers: amitanand

Reviewed By: amitanand

Subscribers: jmeehan, bogdan, sergei, timur, ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D11674
  • Loading branch information
robertsami committed May 21, 2021
1 parent 6077bc8 commit 1320d05
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 11 deletions.
7 changes: 1 addition & 6 deletions src/yb/tserver/tablet_memory_manager.cc
Expand Up @@ -47,10 +47,6 @@ DEFINE_int64(global_memstore_size_mb_max, 2048,
"Global memstore size is determined as a percentage of the available "
"memory. However, this flag limits it in absolute size. Value of 0 "
"means no limit on the value obtained by the percentage. Default is 2048.");
DEFINE_int32(flush_background_task_interval_msec, 0,
"The tick interval time for the flush background task. "
"This defaults to 0, which means disable the background task "
"And only use callbacks on memstore allocations. ");

namespace {
constexpr int kDbCacheSizeUsePercentage = -1;
Expand Down Expand Up @@ -234,8 +230,7 @@ void TabletMemoryManager::ConfigureBackgroundTask(tablet::TabletOptions* options
background_task_.reset(new BackgroundTask(
std::function<void()>([this]() { FlushTabletIfLimitExceeded(); }),
"tablet manager",
"flush scheduler bgtask",
std::chrono::milliseconds(FLAGS_flush_background_task_interval_msec)));
"flush scheduler bgtask"));
options->memory_monitor = std::make_shared<rocksdb::MemoryMonitor>(
memstore_size_bytes,
std::function<void()>([this](){
Expand Down
1 change: 1 addition & 0 deletions src/yb/util/CMakeLists.txt
Expand Up @@ -350,6 +350,7 @@ endif()

set(YB_TEST_LINK_LIBS yb_test_util gutil gmock ${YB_TEST_LINK_LIBS_EXTENSIONS} ${YB_MIN_TEST_LIBS})
ADD_YB_TEST(atomic-test)
ADD_YB_TEST(background_task-test)
ADD_YB_TEST(bit-util-test)
ADD_YB_TEST(bitmap-test)
ADD_YB_TEST(blocking_queue-test)
Expand Down
109 changes: 109 additions & 0 deletions src/yb/util/background_task-test.cc
@@ -0,0 +1,109 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// The following only applies to changes made to this file as part of YugaByte development.
//
// Portions Copyright (c) YugaByte, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.
//

#include <chrono>
#include <thread>

#include <gtest/gtest-param-test.h>
#include <gtest/gtest.h>

#include "yb/util/background_task.h"

#include "yb/util/size_literals.h"
#include "yb/util/test_macros.h"
#include "yb/util/test_util.h"
#include "yb/util/tsan_util.h"

using namespace std::chrono_literals;

namespace yb {

class BackgroundTaskTest : public YBTest {
public:
BackgroundTaskTest() : YBTest(), run_count_(0) {}

std::unique_ptr<BackgroundTask> GetTask(std::chrono::milliseconds timeout = 0s) {
auto task = std::make_unique<BackgroundTask>([this]() {
run_count_.fetch_add(1);
}, "test", "test", timeout);
EXPECT_OK(task->Init());
return task;
}

protected:
std::atomic<int> run_count_;
};

TEST_F(BackgroundTaskTest, RunsTaskOnWake) {
constexpr int kNumTaskRuns = 1000;
auto bg_task = GetTask();

for (int i = 1; i < kNumTaskRuns; ++i) {
EXPECT_OK(bg_task->Wake());
EXPECT_OK(WaitFor([this, i]() {
return run_count_ == i;
}, 1s * kTimeMultiplier, Format("Wait for i-th ($0) task run.", i)));
}

bg_task->Shutdown();
}

TEST_F(BackgroundTaskTest, RunsTaskOnInterval) {
constexpr int kNumTaskRuns = 1000;

auto interval = 10ms * kTimeMultiplier;
auto bg_task = GetTask(interval);

for (int i = 1; i < kNumTaskRuns; ++i) {
std::this_thread::sleep_for(interval);
EXPECT_OK(WaitFor([this, i]() {
return run_count_ >= i;
}, interval / 10, Format("Wait for i-th ($0) task run.", i)));
}

bg_task->Shutdown();
}

class BackgroundTaskShutdownTest :
public BackgroundTaskTest, public ::testing::WithParamInterface<std::chrono::milliseconds> {};

TEST_P(BackgroundTaskShutdownTest, InitAndShutdown) {
auto interval = GetParam() * kTimeMultiplier;
auto bg_task = GetTask(interval);
std::this_thread::sleep_for(1s * kTimeMultiplier);
bg_task->Shutdown();
}

INSTANTIATE_TEST_CASE_P(
InitAndShutdown, BackgroundTaskShutdownTest, ::testing::Values(0s, 20ms, 5s));

} // namespace yb
16 changes: 11 additions & 5 deletions src/yb/util/background_task.h
Expand Up @@ -22,13 +22,15 @@ namespace yb {
// Executions of RunTask are serialized. If interval_msec is 0, the task only runs when explicitly
// woken up.
// TODO(bojanserafimov): Use in CatalogManagerBgTasks
// TODO(bojanserafimov): Add unit tests
class BackgroundTask {
public:
explicit BackgroundTask(std::function<void()> task, std::string category,
const std::string& name, std::chrono::milliseconds interval_msec)
: task_(std::move(task)), category_(category),
name_(std::move(name)), interval_(interval_msec) { }
BackgroundTask(
std::function<void()> task, std::string category, const std::string& name,
std::chrono::milliseconds interval_msec = std::chrono::milliseconds(0)):
task_(std::move(task)),
category_(category),
name_(std::move(name)),
interval_(interval_msec) {}

CHECKED_STATUS Init() {
RETURN_NOT_OK(yb::Thread::Create(category_, name_, &BackgroundTask::Run, this, &thread_));
Expand Down Expand Up @@ -84,6 +86,10 @@ class BackgroundTask {
// Wait
if (interval_ != std::chrono::milliseconds::zero()) {
cond_.wait_for(lock, interval_);
// If we wake here from the interval_ timeout, then we should behave as if we have a job. If
// we wake from an explicit notify from a Wake() call, we should still behave as if we have
// a job.
have_job_ = true;
} else {
cond_.wait(lock);
}
Expand Down

0 comments on commit 1320d05

Please sign in to comment.