Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add async_semaphore #24

Open
lewissbaker opened this issue Jul 21, 2017 · 14 comments
Open

Add async_semaphore #24

lewissbaker opened this issue Jul 21, 2017 · 14 comments

Comments

@lewissbaker
Copy link
Owner

No description provided.

@schoedl
Copy link

schoedl commented Feb 12, 2019

We happen to need a semaphore in cppcoro. I looked at it briefly. The main difference to async_mutex is that we need to store the counter instead of simply "unlocked". I see two possibilities: exploit that the first 64K of pointer space are free in Windows (other OSes?) and keep a lock-free implementation with the counter limited to 64K, or use a std::variant, but this won't be lock-free. Any preference? Or something completely else?

@lewissbaker
Copy link
Owner Author

The implementation would likely be closer to that of async_auto_reset_event which is essentially a binary-semaphore.

This implementation keeps track of the count of calls to 'set' and 'wait' by combining the 2 32-bit counts into a single 64-bit integer. It's implementation currently ensures that 'set' can never get more than 1 above the 'wait' count. If this is generalised to avoid getting more than N above, where N is a constructor parameter to the semaphore, then a similar strategy should serve for an async_semaphore class.

@schoedl
Copy link

schoedl commented Feb 13, 2019

I have trouble using either one as a template. Both exploit the fact that they have a singleton resource, so releasing of waiters is sequenced and thus the list of waiters can be copied as a whole from the atomic list to a non-atomic list and worked on there. I don't see how this works for semaphores. Unlocks may arrive any time, thus I need to atomically dequeue a single waiter at a time. AFAIK, lock-free stack pop already needs a double CAS to avoid ABA, and I would actually need a queue for fairness.

The most pedestrian way would be a regularly locked state, being a variant of a queue of waiters or a count of free resoures. The mutex could in turn be an async_mutex.

Am I missing something? What do you recommend?

@lewissbaker
Copy link
Owner Author

The implementation of async_auto_reset_event actually has an implicit mutex/lock built in to it.

The first thread to increment either 'set' or 'wait' count such that both are non-zero acquires the lock and is responsible for dequeueing waiters and resuming N = min(setCount, waitCount) of them from the list. It then atomically decrements both the 'set' and 'wait' count by N and if the new result has at least one of 'set' or 'wait' count equal to zero then the lock is considred released. Otherwise, some other thread has called set or enqueued more waiters and the current thread must go around the loop again and dequeue some more waiters.

Once it has successfully released the lock then it can go and resume the list of waiters that it dequeued.

@schoedl
Copy link

schoedl commented Feb 14, 2019

I think I see it.

Something else:

In async_auto_reset_event::set(), you get the first oldState with std::memory_order_relaxed, but on failure require std::memory_order_acquire. Is this correct?

	std::uint64_t oldState = m_state.load(std::memory_order_relaxed); // CORRECT?
	do
	{
		if (local::get_set_count(oldState) > local::get_waiter_count(oldState))
		{
			// Already set.
			return;
		}

		// Increment the set-count
	} while (!m_state.compare_exchange_weak(
		oldState,
		oldState + local::set_increment,
		std::memory_order_acq_rel,
		std::memory_order_acquire));

@lewissbaker
Copy link
Owner Author

Yes. I think the compare_exchange failure case can be changed to relaxed too

@schoedl
Copy link

schoedl commented Feb 22, 2019

I hope what I implemented is what you envisioned. It was very simple, more about leaving things out than adding anything:

https://github.com/think-cell/cppcoro/commits/semaphore

Could you review the code?

I wrote a simple test with a thread pool of 100 threads which try to acquire a semaphore of 10 resources. AFAIK, the only suspension point is the acquisition of the semaphore. In the beginning, all 10 resources are acquired. But over time, the maximum number of simultaneously acquired resources goes down, and eventually reaches 1 or 2. I think I understand why this is: whenever there is contention, the acquiring coroutine suspends, and is later resumed not on its original thread but on the thread that just released the semaphore. The original thread that the acquiring coroutine suspended on does not get used anymore, and sits idle forever.

What are your thoughts on this?

cppcoro::async_auto_reset_event semaphore(10);
std::atomic<int> n(0);
cppcoro::static_thread_pool threadPool;

void test() noexcept {

	auto makeTask = []() -> cppcoro::task<>
	{
		co_await threadPool.schedule();
		for(;;) {
			co_await semaphore;
			int m=n.fetch_add(1);
			OutputDebugString(tc::make_c_str<TCHAR>(tc::as_dec(m), _T("\n")));
			_ASSERT(m<10)
			Sleep(1);
			--n;
			semaphore.set();
		}
	};

	std::vector<cppcoro::task<>> tasks;
	for (int i = 0; i < 100; ++i)
	{
		tasks.push_back(makeTask());
	}

	cppcoro::sync_wait(cppcoro::when_all(std::move(tasks)));
}

Thanks for all your help!

@lewissbaker
Copy link
Owner Author

lewissbaker commented Feb 23, 2019

The diff looks to be taking the right approach. I've added a few comments to the commit.

With regards to the ever-reducing number of concurrent operations, the approach I've been taking with some of the similar types like sequence_barrier is having the 'wait' operation take a scheduler which is used to schedule the resumption of the coroutine if the operation does not complete synchronously rather than resuming it inline.

For example, see this method:

template<typename SCHEDULER>
[[nodiscard]]
sequence_barrier_wait_operation<SEQUENCE, TRAITS, SCHEDULER> wait_until_published(
SEQUENCE targetSequence,
SCHEDULER& scheduler) const noexcept;

This would ensure that you don't start serialising producer/consumer by having the consumer coroutine steal the thread of the producer coroutine when the producer calls semaphore.set().

@schoedl
Copy link

schoedl commented Feb 23, 2019 via email

@lewissbaker
Copy link
Owner Author

This puts the responsibilty which scheduler to use into the releasing function...

Actually, the responsibility of which scheduler to use is given to the caller of wait().
eg.

task<void> consumer(async_semaphore& sem, static_thread_pool& tp) {
  // Pass thread-pool into the wait().
  // If the wait() does not complete synchronously then it will be resumed on the thread-pool.
  co_await sem.wait(1, tp);
  do_work();

  // This release call will schedule resumption of any waiting coroutines sitting in a wait() call
  // onto the scheduler specified as a parameter to the wait() call.
  sem.release();
}

I think what you're getting at with regards to acquiring multiple resources and only wanting to schedule once all three are acquired may be addressed by instead having the wait() operation return a bool value indicating whether the operation completed synchronously or asynchronously.
eg.

bool completedSynchronously = co_await sem1.wait();
completedSynchronously &= co_await sem2.wait();
completedSynchronosly &= co_await sem3.wait();
if (!completedSynchronously) {
  // Reschedule onto the thread-pool once we are about to start work.
  co_await tp.schedule();
}|

There are still cases where it makes sense for the coroutine to be rescheduled onto the thread-pool, even if it is already executing on the thread-pool as this can allow a greater degree of parallelism between coroutines. Otherwise you can end up with the situation like you described where the coroutines are just running synchronously inside the call to .release() call that released the resources and thus blocking the producer that called .release().

@schoedl
Copy link

schoedl commented Mar 1, 2019

I just checked http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2019/p1135r3.html and could not find a saturating semaphore. You can provide a maximum value for the semaphore count, but not going over it is a precondition to release, without saturation semantics. Do you really want saturation semantics in your library?

@lewissbaker
Copy link
Owner Author

I have a use-case for the current saturating behaviour of async_auto_reset_event.

This is useful for cases where there might be many updates to a shared data-structure and after each update we want to set a dirty flag (in this case the event) such that the coroutine processing the changes suspend until there is at least one change and will be woken up, reset the dirty flag and then process all pending changes before suspending waiting for the dirty flag to be set again.

But perhaps this is necessarily different from a binary_semaphore which does not have saturating behaviour.

@schoedl
Copy link

schoedl commented Mar 1, 2019

I think this does not come up so often because the dirtiness is often part of the data structure itself. As a common example, when adding an item to a queue, you lock the data structure, add the item, remember if you went from 0 to 1 item, and if so, after unlocking, wake up the consumer. The consumer consumes items. It can lock and empty the queue in one go or release the lock in between items. Only if the queue is empty it would go to sleep, and then require another wake-up.

This separation of dirtiness and wake-up method has the advantage of being flexible about the wake-up method, for example a window message would work, too.

Also, releasing the consumer while the producer is still locking the datastructure may lead to unnecessary contention on the lock. The consumer wakes up, tries to lock, and blocks/suspends waiting for the producer to release the lock. Doing it outside the lock may lead to a release of the producer after someone else changed the data structure again, released the consumer, the consumer consumed the item and is already asleep again. So the wake-up would be unnecessary.

Just my two cents.

@schoedl
Copy link

schoedl commented Mar 4, 2019

I checked in a new version, with small check-in steps for easier review. I am not sure if async_auto_reset_event should stay related to async_semaphore, because the latter got a bit more complicated because I needed signed resources counts. I do need a retire function to remove resources from the pool, without waiting for it to happen, which is implemented by allowing the number of resources to become negative. I did not implement the scheduler extension because I did not need it, and it is something that probably could be done in many places in the library. For naming, I sticked to P1135.

Thanks for any suggestions.

Garcia6l20 pushed a commit to Garcia6l20/cppcoro that referenced this issue Jan 5, 2021
remove target_compile_feature(cppcoro PUBLIC cxx_std_20)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants