Skip to content

Commit

Permalink
[#17781] DocDB: Introduce single threaded mutex and atomic
Browse files Browse the repository at this point in the history
Summary:
This diff adds SingleThreadedMutex and SingleThreadedAtomic classes.
They are not actual mutex and atomic, but they check that access to the protected resource is performed without concurrency.

Using those classes in Arena.

Also removed yb_util tests dependency on rocksdb by moving appropriate class to test itself.

Test Plan: Jenkins

Reviewers: mbautin

Reviewed By: mbautin

Subscribers: ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D17781
  • Loading branch information
spolitov committed Jun 19, 2022
1 parent c13740d commit df9bd67
Show file tree
Hide file tree
Showing 7 changed files with 137 additions and 95 deletions.
50 changes: 48 additions & 2 deletions src/yb/integration-tests/load_balancer_mini_cluster-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,52 @@ Status GetTabletsDriveStats(DriveStats* stats,
return Status::OK();
}

class RocksDbMultiDriveTestEnv : public rocksdb::EnvWrapper, public MultiDriveTestEnvBase {
public:
RocksDbMultiDriveTestEnv() : EnvWrapper(Env::Default()) {}

Status NewSequentialFile(const std::string& f, std::unique_ptr<SequentialFile>* r,
const rocksdb::EnvOptions& options) override;
Status NewRandomAccessFile(const std::string& f,
std::unique_ptr<RandomAccessFile>* r,
const rocksdb::EnvOptions& options) override;
Status NewWritableFile(const std::string& f, std::unique_ptr<rocksdb::WritableFile>* r,
const rocksdb::EnvOptions& options) override;
Status ReuseWritableFile(const std::string& f,
const std::string& old_fname,
std::unique_ptr<rocksdb::WritableFile>* r,
const rocksdb::EnvOptions& options) override;
};

Status RocksDbMultiDriveTestEnv::NewSequentialFile(const std::string& f,
std::unique_ptr<SequentialFile>* r,
const rocksdb::EnvOptions& options) {
RETURN_NOT_OK(FailureStatus(f));
return target()->NewSequentialFile(f, r, options);
}

Status RocksDbMultiDriveTestEnv::NewRandomAccessFile(const std::string& f,
std::unique_ptr<RandomAccessFile>* r,
const rocksdb::EnvOptions& options) {
RETURN_NOT_OK(FailureStatus(f));
return target()->NewRandomAccessFile(f, r, options);
}

Status RocksDbMultiDriveTestEnv::NewWritableFile(const std::string& f,
std::unique_ptr<rocksdb::WritableFile>* r,
const rocksdb::EnvOptions& options) {
RETURN_NOT_OK(FailureStatus(f));
return target()->NewWritableFile(f, r, options);
}

Status RocksDbMultiDriveTestEnv::ReuseWritableFile(const std::string& f,
const std::string& old_fname,
std::unique_ptr<rocksdb::WritableFile>* r,
const rocksdb::EnvOptions& options) {
RETURN_NOT_OK(FailureStatus(f));
return target()->ReuseWritableFile(f, old_fname, r, options);
}

} // namespace


Expand Down Expand Up @@ -465,14 +511,14 @@ class LoadBalancerFailedDrive : public LoadBalancerMiniClusterTestBase {
protected:
void SetUp() override {
ts_env_.reset(new MultiDriveTestEnv());
ts_rocksdb_env_.reset(new rocksdb::MultiDriveTestEnv());
ts_rocksdb_env_.reset(new RocksDbMultiDriveTestEnv());
YBTableTestBase::SetUp();
}

void BeforeStartCluster() override {
auto ts1_drive0 = mini_cluster()->GetTabletServerDrive(0, 0);
dynamic_cast<MultiDriveTestEnv*>(ts_env_.get())->AddFailedPath(ts1_drive0);
dynamic_cast<rocksdb::MultiDriveTestEnv*>(ts_rocksdb_env_.get())->AddFailedPath(ts1_drive0);
dynamic_cast<RocksDbMultiDriveTestEnv*>(ts_rocksdb_env_.get())->AddFailedPath(ts1_drive0);
}

int num_drives() override {
Expand Down
5 changes: 3 additions & 2 deletions src/yb/util/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -307,8 +307,9 @@ add_dependencies(yb_util gen_version_info)

ADD_YB_TEST_LIBRARY(
yb_test_util
SRCS logging_test_util.cc memory/memory_usage_test_util.cc test_util.cc test_thread_holder.cc multi_drive_test_env.cc
DEPS gflags glog gmock rocksdb yb_util)
SRCS logging_test_util.cc memory/memory_usage_test_util.cc test_util.cc test_thread_holder.cc
multi_drive_test_env.cc
DEPS gflags glog gmock yb_util)

#######################################
# yb_test_main
Expand Down
14 changes: 14 additions & 0 deletions src/yb/util/debug/lock_debug.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,18 @@ NonRecursiveSharedLockBase::~NonRecursiveSharedLockBase() {
head = next_;
}

void SingleThreadedMutex::lock() {
auto old_value = locked_.exchange(true, std::memory_order_acq_rel);
LOG_IF(DFATAL, old_value) << "Thread collision on lock";
}

void SingleThreadedMutex::unlock() {
auto old_value = locked_.exchange(false, std::memory_order_acq_rel);
LOG_IF(DFATAL, !old_value) << "Unlock of not locked mutex";
}

bool SingleThreadedMutex::try_lock() {
return !locked_.exchange(true, std::memory_order_acq_rel);
}

} // namespace yb
48 changes: 48 additions & 0 deletions src/yb/util/debug/lock_debug.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
#ifndef YB_UTIL_DEBUG_LOCK_DEBUG_H
#define YB_UTIL_DEBUG_LOCK_DEBUG_H

#include <atomic>
#include <mutex>

#include "yb/gutil/thread_annotations.h"

namespace yb {
Expand Down Expand Up @@ -46,6 +49,51 @@ class SCOPED_CAPABILITY NonRecursiveSharedLock : public NonRecursiveSharedLockBa
}
};

// Mutex that allows only single lock at a time.
// Logs DFATAL in case of concurrent access.
// Could be used by classes that a designed to be single threaded to check single threaded access.
class SingleThreadedMutex {
public:
void lock();
void unlock();
bool try_lock();
private:
std::atomic<bool> locked_{false};
};

// Atomic that allows only single threaded access.
// Could be used by classes that a designed to be single threaded to check single threaded access.
template <class T>
class SingleThreadedAtomic {
public:
SingleThreadedAtomic() = default;
explicit SingleThreadedAtomic(const T& t) : value_(t) {}

T load(std::memory_order) const {
std::lock_guard<SingleThreadedMutex> lock(mutex_);
return value_;
}

void store(const T& value, std::memory_order) {
std::lock_guard<SingleThreadedMutex> lock(mutex_);
value_ = value;
}

bool compare_exchange_strong(T& old_value, const T& new_value) { // NOLINT
std::lock_guard<SingleThreadedMutex> lock(mutex_);
if (value_ == old_value) {
value_ = new_value;
return true;
}
old_value = value_;
return false;
}

private:
T value_;
mutable SingleThreadedMutex mutex_;
};

} // namespace yb

#endif // YB_UTIL_DEBUG_LOCK_DEBUG_H
26 changes: 8 additions & 18 deletions src/yb/util/memory/arena.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
#include "yb/gutil/logging-inl.h"
#include "yb/gutil/macros.h"

#include "yb/util/debug/lock_debug.h"
#include "yb/util/memory/arena_fwd.h"
#include "yb/util/memory/memory.h"
#include "yb/util/slice.h"
Expand All @@ -63,14 +64,9 @@ struct ThreadSafeArenaTraits {

template<class T>
struct MakeAtomic {
typedef std::atomic<T> type;
using type = std::atomic<T>;
};

template<class T>
static T AcquireLoad(std::atomic<T>* input) {
return input->load(std::memory_order_acquire);
}

template<class T>
static void StoreRelease(const T& t, std::atomic<T>* out) {
out->store(t, std::memory_order_release);
Expand All @@ -83,20 +79,14 @@ struct ThreadSafeArenaTraits {
};

struct ArenaTraits {
typedef uint8_t* pointer;
// For non-threadsafe, we don't need any real locking.
typedef boost::signals2::dummy_mutex mutex_type;
using pointer = uint8_t*;
using mutex_type = SingleThreadedMutex;

template<class T>
struct MakeAtomic {
typedef T type;
using type = SingleThreadedAtomic<T>;
};

template<class T>
static T AcquireLoad(T* input) {
return *input;
}

template<class T>
static void StoreRelease(const T& t, T* out) {
*out = t;
Expand Down Expand Up @@ -247,20 +237,20 @@ class ArenaBase {
// Load the current component, with "Acquire" semantics (see atomicops.h)
// if the arena is meant to be thread-safe.
inline Component* AcquireLoadCurrent() {
return Traits::AcquireLoad(&current_);
return current_.load(std::memory_order_acquire);
}

// Store the current component, with "Release" semantics (see atomicops.h)
// if the arena is meant to be thread-safe.
inline void ReleaseStoreCurrent(Component* c) {
return Traits::StoreRelease(c, &current_);
return current_.store(c, std::memory_order_release);
}

BufferAllocator* const buffer_allocator_;

// The current component to allocate from.
// Use AcquireLoadCurrent and ReleaseStoreCurrent to load/store.
typename Traits::template MakeAtomic<Component*>::type current_ = {nullptr};
typename Traits::template MakeAtomic<Component*>::type current_{nullptr};
const size_t max_buffer_size_;
size_t arena_footprint_ = 0;

Expand Down
64 changes: 15 additions & 49 deletions src/yb/util/multi_drive_test_env.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,100 +24,66 @@ void MultiDriveTestEnvBase::AddFailedPath(const std::string& path) {
failed_set_.emplace(path);
}

bool MultiDriveTestEnvBase::IsFailed(const std::string& filename) const {
Status MultiDriveTestEnvBase::FailureStatus(const std::string& filename) const {
std::shared_lock lock(data_mutex_);
if (failed_set_.empty()) {
return false;
return Status::OK();
}
auto it = failed_set_.lower_bound(filename);
if ((it == failed_set_.end() || *it != filename) && it != failed_set_.begin()) {
--it;
}
return boost::starts_with(filename, *it);
}

#define RETURN_ERROR_ON_FAULTY_DRIVE(f) if (IsFailed(f)) { \
return STATUS(IOError, "Test Error"); \
if (boost::starts_with(filename, *it)) {
return STATUS_FORMAT(IOError, "TEST Error, drive failed: $0", *it);
}
return Status::OK();
}

Status MultiDriveTestEnv::NewSequentialFile(const std::string& f,
std::unique_ptr<SequentialFile>* r) {
RETURN_ERROR_ON_FAULTY_DRIVE(f);
RETURN_NOT_OK(FailureStatus(f));
return target()->NewSequentialFile(f, r);
}

Status MultiDriveTestEnv::NewRandomAccessFile(const std::string& f,
std::unique_ptr<RandomAccessFile>* r) {
RETURN_ERROR_ON_FAULTY_DRIVE(f);
RETURN_NOT_OK(FailureStatus(f));
return target()->NewRandomAccessFile(f, r);
}

Status MultiDriveTestEnv::NewWritableFile(const std::string& f,
std::unique_ptr<WritableFile>* r) {
RETURN_ERROR_ON_FAULTY_DRIVE(f);
RETURN_NOT_OK(FailureStatus(f));
return target()->NewWritableFile(f, r);
}

Status MultiDriveTestEnv::NewWritableFile(const WritableFileOptions& o,
const std::string& f,
std::unique_ptr<WritableFile>* r) {
RETURN_ERROR_ON_FAULTY_DRIVE(f);
RETURN_NOT_OK(FailureStatus(f));
return target()->NewWritableFile(o, f, r);
}

Status MultiDriveTestEnv::NewTempWritableFile(const WritableFileOptions& o,
const std::string& t,
const std::string& templ,
std::string* f,
std::unique_ptr<WritableFile>* r) {
RETURN_ERROR_ON_FAULTY_DRIVE(t);
return target()->NewTempWritableFile(o, t, f, r);
RETURN_NOT_OK(FailureStatus(templ));
return target()->NewTempWritableFile(o, templ, f, r);
}

Status MultiDriveTestEnv::NewRWFile(const std::string& f, std::unique_ptr<RWFile>* r) {
RETURN_ERROR_ON_FAULTY_DRIVE(f);
RETURN_NOT_OK(FailureStatus(f));
return target()->NewRWFile(f, r);
}

Status MultiDriveTestEnv::NewRWFile(const RWFileOptions& o,
const std::string& f,
std::unique_ptr<RWFile>* r) {
RETURN_ERROR_ON_FAULTY_DRIVE(f);
RETURN_NOT_OK(FailureStatus(f));
return target()->NewRWFile(o, f, r);
}

namespace rocksdb {

Status MultiDriveTestEnv::NewSequentialFile(const std::string& f,
std::unique_ptr<SequentialFile>* r,
const ::rocksdb::EnvOptions& options) {
RETURN_ERROR_ON_FAULTY_DRIVE(f);
return target()->NewSequentialFile(f, r, options);
}

Status MultiDriveTestEnv::NewRandomAccessFile(const std::string& f,
std::unique_ptr<RandomAccessFile>* r,
const ::rocksdb::EnvOptions& options) {
RETURN_ERROR_ON_FAULTY_DRIVE(f);
return target()->NewRandomAccessFile(f, r, options);
}

Status MultiDriveTestEnv::NewWritableFile(const std::string& f,
std::unique_ptr<::rocksdb::WritableFile>* r,
const ::rocksdb::EnvOptions& options) {
RETURN_ERROR_ON_FAULTY_DRIVE(f);
return target()->NewWritableFile(f, r, options);
}

Status MultiDriveTestEnv::ReuseWritableFile(const std::string& f,
const std::string& old_fname,
std::unique_ptr<::rocksdb::WritableFile>* r,
const ::rocksdb::EnvOptions& options) {
RETURN_ERROR_ON_FAULTY_DRIVE(f);
return target()->ReuseWritableFile(f, old_fname, r, options);
}

} // namespace rocksdb

#undef RETURN_ERROR_ON_FAULTY_DRIVE

} // namespace yb
25 changes: 1 addition & 24 deletions src/yb/util/multi_drive_test_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
#include "yb/util/env.h"
#include "yb/util/status.h"

#include "yb/rocksdb/env.h"

#include "yb/util/monotime.h"

namespace yb {
Expand All @@ -38,7 +36,7 @@ class MultiDriveTestEnvBase {
void AddFailedPath(const std::string& path);

protected:
bool IsFailed(const std::string& filename) const;
Status FailureStatus(const std::string& filename) const;

std::set<std::string> failed_set_;
mutable std::shared_mutex data_mutex_;
Expand All @@ -64,27 +62,6 @@ class MultiDriveTestEnv : public EnvWrapper, public MultiDriveTestEnvBase {
std::unique_ptr<RWFile>* r) override;
};

namespace rocksdb {

class MultiDriveTestEnv : public ::rocksdb::EnvWrapper, public MultiDriveTestEnvBase {
public:
MultiDriveTestEnv() : EnvWrapper(Env::Default()) {}

Status NewSequentialFile(const std::string& f, std::unique_ptr<SequentialFile>* r,
const ::rocksdb::EnvOptions& options) override;
Status NewRandomAccessFile(const std::string& f,
std::unique_ptr<RandomAccessFile>* r,
const ::rocksdb::EnvOptions& options) override;
Status NewWritableFile(const std::string& f, std::unique_ptr<::rocksdb::WritableFile>* r,
const ::rocksdb::EnvOptions& options) override;
Status ReuseWritableFile(const std::string& f,
const std::string& old_fname,
std::unique_ptr<::rocksdb::WritableFile>* r,
const ::rocksdb::EnvOptions& options) override;
};

} // namespace rocksdb

} // namespace yb

#endif // YB_UTIL_MULTI_DRIVE_TEST_ENV_H

0 comments on commit df9bd67

Please sign in to comment.