Skip to content

Commit

Permalink
fix concurrent: add Queue::MultiProducer tokens
Browse files Browse the repository at this point in the history
- Add GenericQueue::MultiProducer tokens, which can be used concurrently
- Add MpscQueue::MultiProducer tokens for consistency and to clean up documentation
- Drop NonFifo from single-producer queues, because they are FIFO in moodycamel
- Fix documentation and typos

Relates: TAXICOMMON-5488
  • Loading branch information
Anton3 committed Aug 1, 2022
1 parent edf1d6c commit 357be6f
Show file tree
Hide file tree
Showing 5 changed files with 221 additions and 114 deletions.
28 changes: 17 additions & 11 deletions core/include/userver/concurrent/impl/queue_helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,21 @@ USERVER_NAMESPACE_BEGIN

namespace concurrent::impl {

struct EmplaceEnabler final {
// Disable {}-initialization in Queue's constructor
explicit EmplaceEnabler() = default;
};

struct NoToken final {
template <typename LockFreeQueue>
explicit NoToken(const LockFreeQueue& /*unused*/) {}
};

/// @warning A single Producer must not be used from multiple threads
/// concurrently
template <typename QueueType>
template <typename QueueType, typename ProducerToken>
class Producer final {
using ValueType = typename QueueType::ValueType;
using ProducerToken = typename QueueType::ProducerToken;

public:
Producer(const Producer&) = delete;
Expand All @@ -30,7 +34,7 @@ class Producer final {
if (queue_) queue_->MarkProducerIsDead();
}

/// Push element into queue. May block if queue is full.
/// Push element into queue. May wait asynchronously if the queue is full.
/// @returns whether push succeeded before the deadline.
bool Push(ValueType&& value, engine::Deadline deadline = {}) const {
return queue_->Push(token_, std::move(value), deadline);
Expand All @@ -45,10 +49,11 @@ class Producer final {
/// Const access to source queue.
std::shared_ptr<const QueueType> Queue() const { return {queue_}; }

/// For internal use only
Producer(std::shared_ptr<QueueType> queue,
typename QueueType::EmplaceEnabler /*unused*/)
/// @cond
// For internal use only
Producer(std::shared_ptr<QueueType> queue, EmplaceEnabler /*unused*/)
: queue_(std::move(queue)), token_(queue_->queue_) {}
/// @endcond

private:
std::shared_ptr<QueueType> queue_;
Expand All @@ -72,8 +77,8 @@ class Consumer final {
if (queue_) queue_->MarkConsumerIsDead();
}

/// Pop element from queue. May block if queue is empty, but the producer is
/// alive.
/// Pop element from queue. May wait asynchronously if the queue is empty,
/// but the producer is alive.
/// @returns whether something was popped before the deadline.
/// @note `false` can be returned before the deadline
/// when the producer is no longer alive.
Expand All @@ -90,10 +95,11 @@ class Consumer final {
/// Const access to source queue.
std::shared_ptr<const QueueType> Queue() const { return {queue_}; }

/// For internal use only
Consumer(std::shared_ptr<QueueType> queue,
typename QueueType::EmplaceEnabler /*unused*/)
/// @cond
// For internal use only
Consumer(std::shared_ptr<QueueType> queue, EmplaceEnabler /*unused*/)
: queue_(std::move(queue)), token_(queue_->queue_) {}
/// @endcond

private:
std::shared_ptr<QueueType> queue_{};
Expand Down
62 changes: 43 additions & 19 deletions core/include/userver/concurrent/mpsc_queue.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <atomic>
#include <limits>
#include <memory>

#include <boost/lockfree/queue.hpp>
Expand Down Expand Up @@ -75,45 +76,58 @@ class MpscQueue final : public std::enable_shared_from_this<MpscQueue<T>> {
using ProducerToken = impl::NoToken;
using ConsumerToken = impl::NoToken;

class EmplaceEnabler {};
friend class impl::Producer<MpscQueue, ProducerToken>;
friend class impl::Consumer<MpscQueue>;

public:
static constexpr std::size_t kUnbounded =
std::numeric_limits<std::size_t>::max();

using ValueType = T;
using Producer = impl::Producer<MpscQueue>;
using Consumer = impl::Consumer<MpscQueue>;

friend class impl::Producer<MpscQueue>;
friend class impl::Consumer<MpscQueue>;
using Producer = impl::Producer<MpscQueue, ProducerToken>;
using Consumer = impl::Consumer<MpscQueue>;
using MultiProducer = impl::Producer<MpscQueue, impl::NoToken>;

explicit MpscQueue(std::size_t max_size, EmplaceEnabler /*unused*/)
/// @cond
// For internal use only
explicit MpscQueue(std::size_t max_size, impl::EmplaceEnabler /*unused*/)
: remaining_capacity_(max_size),
remaining_capacity_control_(remaining_capacity_) {}

MpscQueue(MpscQueue&&) = delete;
MpscQueue(const MpscQueue&) = delete;
MpscQueue& operator=(MpscQueue&&) = delete;
MpscQueue& operator=(const MpscQueue&) = delete;
~MpscQueue();
/// @endcond

/// Create a new queue
static std::shared_ptr<MpscQueue> Create(std::size_t max_size = kUnbounded) {
return std::make_shared<MpscQueue>(max_size, EmplaceEnabler{});
return std::make_shared<MpscQueue>(max_size, impl::EmplaceEnabler{});
}

~MpscQueue();
MpscQueue(const MpscQueue&) = delete;
MpscQueue(MpscQueue&&) = delete;

/// Get a producer which makes it possible to push items into the queue.
/// Can be called multiple times. The resulting Producer is not thread-safe,
/// Get a `Producer` which makes it possible to push items into the queue.
/// Can be called multiple times. The resulting `Producer` is not thread-safe,
/// so you have to use multiple Producers of the same queue to simultaneously
/// write from multiple coroutines/threads.
///
/// @note Producer may outlive the queue and the consumer.
/// @note `Producer` may outlive the queue and the `Consumer`.
Producer GetProducer();

/// Get a consumer which makes it possible to read items from the queue.
/// Can be called only once. You may not use the Consumer simultaneously
/// Get a `MultiProducer` which makes it possible to push items into the
/// queue. Can be called multiple times. The resulting `MultiProducer` is
/// thread-safe, so it can be used simultaneously from multiple
/// coroutines/threads.
///
/// @note `MultiProducer` may outlive the queue and the `Consumer`.
MultiProducer GetMultiProducer();

/// Get a `Consumer` which makes it possible to read items from the queue.
/// Can be called only once. You may not use the `Consumer` simultaneously
/// from multiple coroutines/threads.
///
/// @note Consumer may outlive the queue and the producer.
/// @note `Consumer` may outlive the queue and producers.
Consumer GetConsumer();

/// @brief Sets the limit on the queue size, pushes over this limit will block
Expand All @@ -123,13 +137,16 @@ class MpscQueue final : public std::enable_shared_from_this<MpscQueue<T>> {
/// @brief Gets the limit on the queue size
[[nodiscard]] size_t GetSoftMaxSize() const;

/// @brief Gets the approximate size of queue
[[nodiscard]] size_t GetSizeApproximate() const;

/// @cond
[[deprecated("Use SetSoftMaxSize instead")]] void SetMaxLength(size_t length);

[[deprecated("Use GetSoftMaxSize instead")]] size_t GetMaxLength() const;

[[deprecated("Use GetSizeApproximate instead")]] size_t Size() const;
/// @endcond

private:
bool Push(ProducerToken&, T&&, engine::Deadline);
Expand Down Expand Up @@ -172,15 +189,22 @@ typename MpscQueue<T>::Producer MpscQueue<T>::GetProducer() {
producers_count_++;
producer_is_created_and_dead_ = false;
nonempty_event_.Send();
return Producer(this->shared_from_this(), EmplaceEnabler{});
return Producer(this->shared_from_this(), impl::EmplaceEnabler{});
}

template <typename T>
typename MpscQueue<T>::MultiProducer MpscQueue<T>::GetMultiProducer() {
// MultiProducer and Producer are actually the same for MpscQueue, which is an
// implementation detail.
return GetProducer();
}

template <typename T>
typename MpscQueue<T>::Consumer MpscQueue<T>::GetConsumer() {
UINVARIANT(!consumer_is_created_,
"MpscQueue::Consumer must only be obtained a single time");
consumer_is_created_ = true;
return Consumer(this->shared_from_this(), EmplaceEnabler{});
return Consumer(this->shared_from_this(), impl::EmplaceEnabler{});
}

template <typename T>
Expand Down

0 comments on commit 357be6f

Please sign in to comment.