Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

C++ folly库解读(三)Synchronized —— 比std::lock_guard/std::unique_lock更易用、功能更强大的同步机制 #168

Open
zhangyachen opened this issue Oct 8, 2021 · 0 comments

Comments

@zhangyachen
Copy link
Owner

目录

  • 传统同步方案的缺点
  • folly/Synchronized.h 简单使用
  • Synchronized的模板参数
  • withLock()/withRLock()/withWLock() —— 更易用的加锁方式
  • 升级锁
  • ulock()和 withULockPtr()
  • Timed Locking
  • Synchronized 与 std::condition_variable
  • acquireLocked() —— 同时锁多个数据
  • 使用一把锁,锁多个数据
  • struct
  • std::tuple
  • Benchmark

folly/Synchronized.h 提供了一种更简单、更不容易出错的同步机制,可以用来替代传统 C++标准库中使用较复杂、较容易出错的同步机制。

传统同步方案的缺点

一般是将需要同步的数据和锁一一配对,即 —— associate mutexes with data, not code :

class RequestHandler {
  ...
  std::mutex requestMutex_;
  RequestQueue requestQueue_;

  processRequest(const Request& request);
};

void RequestHandler::processRequest(const Request& request) {
  std::lock_guard<std::mutex> lg(requestMutex_);
  requestQueue_.push_back(request);
}

然而,操作这些数据成员,开发人员必须注意,正确的获取锁、获取正确的锁。

一些常见的错误包括:

  • 操作数据之前没有获取锁。
  • 获取了不配对的锁,这个锁不是用来锁这个数据的。
  • 获取了读锁,但是试图去修改数据。
  • 获取了写锁,但是对数据只有 const access.

一般在使用时,需要提醒开发人员:“别忘了 xxxx”,那一般都会出错,比如 new 的对象别忘了 delete : )

folly/Synchronized.h 简单使用

上面的代码可以用 folly/Synchronized.h 重写为:

class RequestHandler {
  folly::Synchronized<RequestQueue> requestQueue_;

  processRequest(const Request& request);
};

void RequestHandler::processRequest(const Request& request) {
  requestQueue_.wlock()->push_back(request);
}

为什么 folly/Synchronized.h 更加有效呢?

  • 与传统使用方式不同,这里锁和数据是结合成了一个对象 —— requestQueue_。传统方案中,需要寻找锁和数据的配对关系。
  • 几乎不可能在不获取锁的情况下,去操作数据,还是因为它们被封装成了一个对象。传统方案加不加锁全靠自觉。
  • 在 push_back 后,锁立即被释放。

如果在临界区有多个操作,那么可以使用如下方法:

{
      auto lockedQueue = requestQueue_.wlock();
      lockedQueue->push_back(request1);
      lockedQueue->push_back(request2);
}

wlock 返回一个 LockedPtr 对象,这个对象可以被理解为指向数据成员的指针。只有这个对象存在,那么锁就会被锁住,所以最好为这个对象显示定义一个 scope.

更好的方式,是使用 lambdas :

void RequestHandler::processRequest(const Request& request) {
      requestQueue_.withWLock([&](auto& queue "&") {
        // withWLock() automatically holds the lock for the
        // duration of this lambda function
        queue.push_back(request);
      });
}

使用 withWLock 配合 lambdas 强制定义了一个 scope,更清晰。

Synchronized的模板参数

Synchronized 有两个模板参数,数据类型和锁类型:

template <class T, class Mutex = SharedMutex>

如果不指定第二个模板参数,默认是 folly::SharedMutex。只要被 folly::LockTraits 支持的都可以使用,比如 std::mutex、std::recursive_mutex、std::timed_mutex,。std::recursive_timed_mutex、folly::SharedMutex、folly::RWSpinLock、folly::SpinLock.

根据锁类型的不同,Synchronized 会提供不同的 API:

  • 共享锁和升级锁:如果存在 lock_shared()成员函数,Synchronized 会提供 wlock(),rlock(),ulock()三个方法来获取不同的锁类型。其中,rlock()只提供对数据成员 const access.
  • 排他锁:lock()

withLock()/withRLock()/withWLock() —— 更易用的加锁方式

withLock()在上面提到过了,可以用来替代 lock()。在持有锁的期间,执行一个 lambda 或者 function. withRLock()/withWLock()同理可以替代 rlock()/wlock().

我们再详细说一下这种方式的好处。下面的函数将 vector 里的所有元素都 double:

auto locked = vec.lock();
for (int& n : *locked) {
    n *= 2;
}

使用 lock()/wlock()/rlock()的一个重要注意事项:一个指向数据的指针或者引用,它的生命周期一定不要比 LockedPtr 对象长(lock()/wlock()/rlock()的返回值类型)。 如果我们将上面的例子这样写就会出问题:

// No. NO. NO!
for (int& n : *vec.wlock()) {
      n *= 2;
}

vec.wlock()返回的 LockPtr 对象在 range iterators 建立后就销毁了(详细解释见 Range-based for loop Temporary range expression 小节),range iterators 指向了 vector data,但此时锁已经被释放。想想如果要 debug 这种问题,会用多少时间 :)

这时 withLock()/withRLock()/withWLock()的好处就体现出来了,锁会在 for loop 期间一直持有:

vec.withLock([](auto& data "") {
    for (int& n : data) {
        n *= 2;
    }
});

withLock 定义为(withRLock/withWLock 类似):

  /**
   * Invoke a function while holding the lock.
   *
   * A reference to the datum will be passed into the function as its only
   * argument.
   *
   * This can be used with a lambda argument for easily defining small critical
   * sections in the code.  For example:
   *
   *   auto value = obj.withLock([](auto& data "") {
   *     data.doStuff();
   *     return data.getValue();
   *   });
   */
  template <class Function>
  auto withLock(Function&& function) {
    return function(*lock());
  }

  template <class Function>
  auto withLock(Function&& function) const {
    return function(*lock());
  }

升级锁

ulock()和 withULockPtr()

Synchronized 还支持升级锁。升级锁与共享锁可以共存,但是与排它锁互斥。

/**
 * An enum to describe the "level" of a mutex.  The supported levels are
 *  Unique - a normal mutex that supports only exclusive locking
 *  Shared - a shared mutex which has shared locking and unlocking functions;
 *  Upgrade - a mutex that has all the methods of the two above along with
 *            support for upgradable locking
 */
enum class MutexLevel { UNIQUE, SHARED, UPGRADE };

升级锁解决的问题是:先对数据进行读操作,然后根据一定的条件会进行写操作。

升级锁可以通过 uclock()或者 withULockPtr()获得:

{
    // only const access allowed to the underlying object when an upgrade lock
    // is acquired
    auto ulock = vec.ulock();
    auto newSize = ulock->size();
}

auto newSize = vec.withULockPtr([](auto ulock "") {
    // only const access allowed to the underlying object when an upgrade lock
    // is acquired
    return ulock->size();
});

通过下面的函数可以进行升级或者降级:

  • moveFromUpgradeToWrite()
  • moveFromWriteToUpgrade()
  • moveFromWriteToRead() // withWLockPtr()获得的 wlock 可以调用此函数降级为 rlock
  • moveFromUpgradeToRead()

调用这些函数的 LockedPtr 会被设置为 invalid null state,并返回另一个锁住特定锁的 LockedPtr。这些操作都是原子性的,中间不会出现 unlocked 状态。

比如现在有一个 cache,数据结构为 unordered_map,需求是先检查对应的 key 是否在 unordered_map 中,如果在则返回对应的 value,不在则初始化 value 为 0:

folly::Synchronized<std::unordered_map<int64_t, int64_t>> cache;

int64_t res = cache.withULockPtr([key,value](auto ulock "key,value") {
    int64_t cache_value;
    auto iter = ulock->find(key);
    if (iter != ulock->end()) {
        cache_value = iter->second;
    } else {
        cache_value = 0;

        // ulock is now null
        auto wlock = ulock.moveFromUpgradeToWrite();
        (*wlock)[key] = cache_value;
    }

    return cache_value;
});

Timed Locking

如果初始化 Synchronized 的锁类型支持时间,lock()/wlock()/rlock()可以传入一个类型为 std::chrono::duration 的参数:

void fun(Synchronized<vector<string>>& vec) {
      {
        auto locked = vec.lock(10ms);
        if (!locked) {
          throw std::runtime_error("failed to acquire lock");
        }
        locked->push_back("hello");
        locked->push_back("world");
      }
      LOG(INFO) << "successfully added greeting";
}

Synchronized 与 std::condition_variable

如果 Synchronized 的锁类型是 std::mutex,那么可以和 std::condition_variable 配合使用。

Synchronized<vector<string>, std::mutex> vec;
std::condition_variable emptySignal;

// Assuming some other thread will put data on vec and signal
// emptySignal, we can then wait on it as follows:
auto locked = vec.lock();
emptySignal.wait(locked.getUniqueLock(),
                    [&] { return !locked->empty(); });

getUniqueLock()返回一个 std::unique_lockstd::mutex的引用。但是不推荐这么使用,因为这绕过了 Synchronized 的 API,可以直接操作对应的锁:

 /**
   * Get a reference to the std::unique_lock.
   *
   * This is provided so that callers can use Synchronized<T, std::mutex>
   * with a std::condition_variable.
   *
   * While this API could be used to bypass the normal Synchronized APIs and
   * manually interact with the underlying unique_lock, this is strongly
   * discouraged.
   */
  std::unique_lock<std::mutex>& getUniqueLock() { return lock_; }

acquireLocked() —— 同时锁多个数据

假如需要将一个 vector 的数据拷贝到另一个 vector,wlock()可能会实现需求:

void fun(Synchronized<vector<int>>& a, Synchronized<vector<int>>& b) {
    auto lockedA = a.wlock();
    auto lockedB = b.wlock();
    ... use lockedA and lockedB ...
}

但是如果一个线程调用 fun(x,y),另一个线程调用 func(y,x),就很有可能出现死锁。经典的解决方式是,所有的线程以同样的顺序获取锁。许多库的实现是通过比较锁地址的大小来决定加锁顺序:

void fun(Synchronized<vector<int>>& a, Synchronized<vector<int>>& b) {
    auto ret = folly::acquireLocked(a, b);
    auto& lockedA = std::get<0>(ret);
    auto& lockedB = std::get<1>(ret);
    ... use lockedA and lockedB ...
}

// 实现:通过比较锁地址的大小
/**
 * Acquire locks for multiple Synchronized<T> objects, in a deadlock-safe
 * manner.
 *
 * The locks are acquired in order from lowest address to highest address.
 * (Note that this is not necessarily the same algorithm used by std::lock().)
 * For parameters that are const and support shared locks, a read lock is
 * acquired.  Otherwise an exclusive lock is acquired.
 *
 * use lock() with folly::wlock(), folly::rlock() and folly::ulock() for
 * arbitrary locking without causing a deadlock (as much as possible), with the
 * same effects as std::lock()
 */
template <class Sync1, class Sync2>
std::tuple<detail::LockedPtrType<Sync1>, detail::LockedPtrType<Sync2>>
acquireLocked(Sync1& l1, Sync2& l2) {
  if (static_cast<const void*>(&l1) < static_cast<const void*>(&l2)) {
    auto p1 = l1.contextualLock();
    auto p2 = l2.contextualLock();
    return std::make_tuple(std::move(p1), std::move(p2));
  } else {
    auto p2 = l2.contextualLock();
    auto p1 = l1.contextualLock();
    return std::make_tuple(std::move(p1), std::move(p2));
  }
}

C++17 引入了 structured binding syntax,可以使代码更简单:

void fun(Synchronized<vector<int>>& a, Synchronized<vector<int>>& b) {
     auto [lockedA, lockedB] = folly::acquireLocked(a, b);
     ... use lockedA and lockedB ...
}

acquireLockedPair()返回 std::pair,在不支持 C++17 的编译器情况下,使用也很方便。

使用一把锁,锁多个数据

比如一个 bidirectional map,需要同时操作。一般有两个方案:

Struct

class Server {
    struct BiMap {
    map<int, string> direct;
    map<string, int> inverse;
    };
    Synchronized<BiMap> bimap_;
    ...
};

...
bimap_.withLock([](auto& locked "") {
    locked.direct[0] = "zero";
    locked.inverse["zero"] = 0;
});

std::tuple

class Server {
    Synchronized<tuple<map<int, string>, map<string, int>>> bimap_;
    ...
};

...
bimap_.withLock([](auto& locked "") {
    get<0>(locked)[0] = "zero";
    get<1>(locked)["zero"] = 0;
});

Benchmark

SynchronizedBenchmark.cpp

下篇文章写一下 Synchronized 的基本实现 :)

参考资料:

(完)

朋友们可以关注下我的公众号,获得最及时的更新:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant