Skip to content

Commit

Permalink
Porting/issue 3238 context lock contention fix (#357)
Browse files Browse the repository at this point in the history
* Fix parallel loading of data parts
porting ClickHouse/ClickHouse#34310 on Feb 04, 2022

* Reset thread name in thread pool
porting ClickHouse/ClickHouse#36115 on Apr 14, 2022

* ThreadPool fixes
porting ClickHouse/ClickHouse#39160 on Jul 14, 2022

* Improve ThreadPool
porting ClickHouse/ClickHouse#47657 on Mar 22, 2023

* Lower ThreadPool mutex contention and simplify
porting ClickHouse/ClickHouse#48750 on Apr 14

* Fix ThreadPool::wait
porting ClickHouse/ClickHouse#49572 on May 6 2023

* ProfileEvents added ContextLockWaitMicroseconds
porting ClickHouse/ClickHouse#55029 (on Sep 26, 2023)

* Fix race in Context::createCopy
porting ClickHouse/ClickHouse#49663

* Use a separate mutex for query_factories_info in Context.
porting ClickHouse/ClickHouse#37532 on May 26, 2022

* Avoid possible deadlock on server shutdown
porting ClickHouse/ClickHouse#35081 on Mar 7, 2022

* Porting/issue 3238 context lock contention fix part 2 (#313)
  • Loading branch information
yokofly committed Nov 29, 2023
1 parent e931bb5 commit f9d81dc
Show file tree
Hide file tree
Showing 6 changed files with 653 additions and 317 deletions.
2 changes: 2 additions & 0 deletions base/base/defines.h
Expand Up @@ -155,6 +155,7 @@
# define TSA_ACQUIRE_SHARED(...) __attribute__((acquire_shared_capability(__VA_ARGS__))) /// function acquires a shared capability, but does not release it
# define TSA_TRY_ACQUIRE_SHARED(...) __attribute__((try_acquire_shared_capability(__VA_ARGS__))) /// function tries to acquire a shared capability and returns a boolean value indicating success or failure
# define TSA_RELEASE_SHARED(...) __attribute__((release_shared_capability(__VA_ARGS__))) /// function releases the given shared capability
# define TSA_SCOPED_LOCKABLE __attribute__((scoped_lockable)) /// object of a class has scoped lockable capability

/// Macros for suppressing TSA warnings for specific reads/writes (instead of suppressing it for the whole function)
/// They use a lambda function to apply function attribute to a single statement. This enable us to suppress warnings locally instead of
Expand Down Expand Up @@ -182,6 +183,7 @@
# define TSA_ACQUIRE_SHARED(...)
# define TSA_TRY_ACQUIRE_SHARED(...)
# define TSA_RELEASE_SHARED(...)
# define TSA_SCOPED_LOCKABLE

# define TSA_SUPPRESS_WARNING_FOR_READ(x) (x)
# define TSA_SUPPRESS_WARNING_FOR_WRITE(x) (x)
Expand Down
25 changes: 25 additions & 0 deletions src/Common/SharedLockGuard.h
@@ -0,0 +1,25 @@
#pragma once

#include <base/defines.h>

namespace DB
{

/** SharedLockGuard provide RAII-style locking mechanism for acquiring shared ownership of the implementation
* of the SharedLockable concept (for example std::shared_mutex) supplied as the constructor argument.
* On construction it acquires shared ownership using `lock_shared` method.
* On desruction shared ownership is released using `unlock_shared` method.
*/
template <typename Mutex>
class TSA_SCOPED_LOCKABLE SharedLockGuard
{
public:
explicit SharedLockGuard(Mutex & mutex_) TSA_ACQUIRE_SHARED(mutex_) : mutex(mutex_) { mutex_.lock_shared(); }

~SharedLockGuard() TSA_RELEASE() { mutex.unlock_shared(); }

private:
Mutex & mutex;
};

}
112 changes: 112 additions & 0 deletions src/Common/SharedMutexHelper.h
@@ -0,0 +1,112 @@
#pragma once

#include <base/types.h>
#include <base/defines.h>
#include <Common/SharedMutex.h>

namespace DB
{

/** SharedMutexHelper class allows to inject specific logic when underlying shared mutex is acquired
* and released.
*
* Example:
*
* class ProfileSharedMutex : public SharedMutexHelper<ProfileSharedMutex>
* {
* public:
* size_t getLockCount() const { return lock_count; }
*
* size_t getSharedLockCount() const { return shared_lock_count; }
*
* private:
* using Base = SharedMutexHelper<ProfileSharedMutex, SharedMutex>;
* friend class SharedMutexHelper<ProfileSharedMutex, SharedMutex>;
*
* void lockImpl()
* {
* ++lock_count;
* Base::lockImpl();
* }
*
* void lockSharedImpl()
* {
* ++shared_lock_count;
* Base::lockSharedImpl();
* }
*
* std::atomic<size_t> lock_count = 0;
* std::atomic<size_t> shared_lock_count = 0;
* };
*/
template <typename Derived, typename MutexType = SharedMutex>
class TSA_CAPABILITY("SharedMutexHelper") SharedMutexHelper
{
public:
// Exclusive ownership
void lock() TSA_ACQUIRE() /// NOLINT
{
static_cast<Derived *>(this)->lockImpl();
}

bool try_lock() TSA_TRY_ACQUIRE(true) /// NOLINT
{
static_cast<Derived *>(this)->tryLockImpl();
}

void unlock() TSA_RELEASE() /// NOLINT
{
static_cast<Derived *>(this)->unlockImpl();
}

// Shared ownership
void lock_shared() TSA_ACQUIRE_SHARED() /// NOLINT
{
static_cast<Derived *>(this)->lockSharedImpl();
}

bool try_lock_shared() TSA_TRY_ACQUIRE_SHARED(true) /// NOLINT
{
static_cast<Derived *>(this)->tryLockSharedImpl();
}

void unlock_shared() TSA_RELEASE_SHARED() /// NOLINT
{
static_cast<Derived *>(this)->unlockSharedImpl();
}

protected:
void lockImpl() TSA_NO_THREAD_SAFETY_ANALYSIS
{
mutex.lock();
}

void tryLockImpl() TSA_NO_THREAD_SAFETY_ANALYSIS
{
mutex.try_lock();
}

void unlockImpl() TSA_NO_THREAD_SAFETY_ANALYSIS
{
mutex.unlock();
}

void lockSharedImpl() TSA_NO_THREAD_SAFETY_ANALYSIS
{
mutex.lock_shared();
}

void tryLockSharedImpl() TSA_NO_THREAD_SAFETY_ANALYSIS
{
mutex.try_lock_shared();
}

void unlockSharedImpl() TSA_NO_THREAD_SAFETY_ANALYSIS
{
mutex.unlock_shared();
}

MutexType mutex;
};

}
16 changes: 16 additions & 0 deletions src/Common/callOnce.h
@@ -0,0 +1,16 @@
#pragma once

#include <mutex>

namespace DB
{

using OnceFlag = std::once_flag;

template <typename Callable, typename ...Args>
void callOnce(OnceFlag & flag, Callable && func, Args&&... args)
{
std::call_once(flag, std::forward<Callable>(func), std::forward<Args>(args)...);
}

}

0 comments on commit f9d81dc

Please sign in to comment.