Skip to content

Commit

Permalink
revised tf::Semaphore
Browse files Browse the repository at this point in the history
  • Loading branch information
Tsung-Wei Huang committed Jun 21, 2024
1 parent 5e17292 commit 4dd5d5f
Show file tree
Hide file tree
Showing 12 changed files with 445 additions and 555 deletions.
28 changes: 11 additions & 17 deletions examples/limited_concurrency.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,22 @@ void sl() {

int main() {

tf::Executor executor(4);
tf::Taskflow taskflow;

// define a critical region of 1 worker
tf::Semaphore semaphore(1);

// create give tasks in taskflow
std::vector<tf::Task> tasks {
taskflow.emplace([](){ sl(); std::cout << "A" << std::endl; }),
taskflow.emplace([](){ sl(); std::cout << "B" << std::endl; }),
taskflow.emplace([](){ sl(); std::cout << "C" << std::endl; }),
taskflow.emplace([](){ sl(); std::cout << "D" << std::endl; }),
taskflow.emplace([](){ sl(); std::cout << "E" << std::endl; })
};

for(auto & task : tasks) {
task.acquire(semaphore);
task.release(semaphore);
tf::Taskflow taskflow;
tf::Executor executor;

for(size_t i=0; i<100; i++) {
taskflow.emplace([&, i](tf::Runtime& rt){
rt.acquire(semaphore);
std::cout << i << "-th " << "message " << "never " << "interleaves with others\n";
rt.release(semaphore);
});
}

executor.run(taskflow);
executor.wait_for_all();
executor.run(taskflow).wait();


return 0;
}
Expand Down
78 changes: 0 additions & 78 deletions taskflow/algorithm/critical.hpp

This file was deleted.

53 changes: 8 additions & 45 deletions taskflow/core/async.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,26 +112,10 @@ template <typename P, typename F, typename... Tasks,
tf::AsyncTask Executor::silent_dependent_async(
P&& params, F&& func, Tasks&&... tasks
){

_increment_topology();

size_t num_dependents = sizeof...(Tasks);

// create a task before scheduling the node to retain a shared ownership first
AsyncTask task(animate(
std::forward<P>(params), nullptr, nullptr, num_dependents,
std::in_place_type_t<Node::DependentAsync>{}, std::forward<F>(func)
));

if constexpr(sizeof...(Tasks) > 0) {
(_process_async_dependent(task._node, tasks, num_dependents), ...);
}

if(num_dependents == 0) {
_schedule_async_task(task._node);
}

return task;
std::array<AsyncTask, sizeof...(Tasks)> array = { std::forward<Tasks>(tasks)... };
return silent_dependent_async(
std::forward<P>(params), std::forward<F>(func), array.begin(), array.end()
);
}

// Function: silent_dependent_async
Expand Down Expand Up @@ -187,31 +171,10 @@ template <typename P, typename F, typename... Tasks,
std::enable_if_t<is_task_params_v<P> && all_same_v<AsyncTask, std::decay_t<Tasks>...>, void>*
>
auto Executor::dependent_async(P&& params, F&& func, Tasks&&... tasks) {

_increment_topology();

using R = std::invoke_result_t<std::decay_t<F>>;

std::packaged_task<R()> p(std::forward<F>(func));
auto fu{p.get_future()};

size_t num_dependents = sizeof...(tasks);

AsyncTask task(animate(
std::forward<P>(params), nullptr, nullptr, num_dependents,
std::in_place_type_t<Node::DependentAsync>{},
[p=make_moc(std::move(p))] () mutable { p.object(); }
));

if constexpr(sizeof...(Tasks) > 0) {
(_process_async_dependent(task._node, tasks, num_dependents), ...);
}

if(num_dependents == 0) {
_schedule_async_task(task._node);
}

return std::make_pair(std::move(task), std::move(fu));
std::array<AsyncTask, sizeof...(Tasks)> array = { std::forward<Tasks>(tasks)... };
return dependent_async(
std::forward<P>(params), std::forward<F>(func), array.begin(), array.end()
);
}

// Function: dependent_async
Expand Down
90 changes: 74 additions & 16 deletions taskflow/core/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1544,16 +1544,6 @@ inline void Executor::_invoke(Worker& worker, Node* node) {
return;
}

// if acquiring semaphore(s) exists, acquire them first
if(node->_semaphores && !node->_semaphores->to_acquire.empty()) {
SmallVector<Node*> nodes;
if(!node->_acquire_all(nodes)) {
_schedule(worker, nodes);
return;
}
node->_state.fetch_or(Node::ACQUIRED, std::memory_order_release);
}

// condition task
//int cond = -1;

Expand Down Expand Up @@ -1616,11 +1606,6 @@ inline void Executor::_invoke(Worker& worker, Node* node) {

//invoke_successors:

// if releasing semaphores exist, release them
if(node->_semaphores && !node->_semaphores->to_release.empty()) {
_schedule(worker, node->_release_all());
}

// Reset the join counter to support the cyclic control flow.
// + We must do this before scheduling the successors to avoid race
// condition on _dependents.
Expand Down Expand Up @@ -2193,7 +2178,7 @@ inline void Executor::_tear_down_topology(Worker& worker, Topology* tpg) {
// case 2: the final run of this topology
else {

// TODO: if the topology is cancelled, need to release all semaphores
// invoke the callback after each run
if(tpg->_call != nullptr) {
tpg->_call();
}
Expand Down Expand Up @@ -2315,6 +2300,74 @@ inline void Runtime::corun_all() {
_parent->_process_exception();
}

// Function: acquire
template <typename... S,
std::enable_if_t<all_same_v<Semaphore, std::decay_t<S>...>, void>*
>
void Runtime::acquire(S&&... semaphores) {
constexpr size_t N = sizeof...(S);
std::array<Semaphore*, N> items { std::addressof(semaphores)... };
_executor._corun_until(_worker, [&](){
// Ideally, we should use a deadlock-avoidance algorithm but
// in practice the number of semaphores will not be too large and
// tf::Semaphore does not provide blocking method. Hence, we are
// mostly safe here. This is similar to the GCC try_lock implementation:
// https://github.com/gcc-mirror/gcc/blob/master/libstdc%2B%2B-v3/include/std/mutex
for(size_t i=0; i < N; i++) {
if(items[i]->try_acquire() == false) {
for(size_t j=0; j<i; j++) {
items[j]->release();
}
return false;
}
}
return true;
});
// TODO: exception?
}

// Function:: acquire
template <typename I,
std::enable_if_t<std::is_same_v<deref_t<I>, Semaphore>, void>*
>
void Runtime::acquire(I begin, I end) {
_executor._corun_until(_worker, [begin, end](){
// Ideally, we should use a deadlock-avoidance algorithm but
// in practice the number of semaphores will not be too large and
// tf::Semaphore does not provide blocking method. Hence, we are
// mostly safe here. This is similar to the GCC try_lock implementation:
// https://github.com/gcc-mirror/gcc/blob/master/libstdc%2B%2B-v3/include/std/mutex
for(I ptr = begin; ptr != end; ptr++) {
if(ptr->try_acquire() == false) {
for(I ptr2 = begin; ptr2 != ptr; ptr2++) {
ptr2->release();
}
return false;
}
}
return true;
});
// TODO: exception?
}

// Function: release
template <typename... S,
std::enable_if_t<all_same_v<Semaphore, std::decay_t<S>...>, void>*
>
void Runtime::release(S&&... semaphores){
(semaphores.release(), ...);
}

// Function:: release
template <typename I,
std::enable_if_t<std::is_same_v<deref_t<I>, Semaphore>, void>*
>
void Runtime::release(I begin, I end) {
for(I ptr = begin; ptr != end; ptr++) {
ptr->release();
}
}

// Destructor
inline Runtime::~Runtime() {
_executor._corun_until(_worker, [this] () -> bool {
Expand Down Expand Up @@ -2402,6 +2455,11 @@ auto Runtime::async(P&& params, F&& f) {
return _async(*_executor._this_worker(), std::forward<P>(params), std::forward<F>(f));
}

// ----------------------------------------------------------------------------
// Runtime: Semaphore series
// ----------------------------------------------------------------------------




} // end of namespace tf -----------------------------------------------------
Expand Down
Loading

0 comments on commit 4dd5d5f

Please sign in to comment.