Skip to content

Commit

Permalink
Merge branch 'develop' into Fix134DetachLeak
Browse files Browse the repository at this point in the history
  • Loading branch information
FelixPetriconi committed Jul 9, 2018
2 parents 344ce53 + a970ba4 commit 007846d
Show file tree
Hide file tree
Showing 6 changed files with 209 additions and 154 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Expand Up @@ -43,4 +43,5 @@ clion_build/
cmake-*
clion_*


# jupyter backups
.ipynb_checkpoints/
141 changes: 87 additions & 54 deletions stlab/concurrency/channel.hpp
Expand Up @@ -34,6 +34,7 @@ namespace stlab {
/**************************************************************************************************/

inline namespace v1 {

/**************************************************************************************************/

template <typename, typename = void>
Expand Down Expand Up @@ -357,6 +358,8 @@ void await_variant_args(P& process, std::tuple<variant<T, std::exception_ptr>...
await_variant_args_<P, T...>(process, args, std::make_index_sequence<sizeof...(T)>());
}

/**************************************************************************************************/

template <typename T>
stlab::optional<std::exception_ptr> find_argument_error(T& argument) {
stlab::optional<std::exception_ptr> result;
Expand Down Expand Up @@ -403,7 +406,7 @@ struct default_queue_strategy {
/**************************************************************************************************/

template <typename... T>
struct join_queue_strategy {
struct zip_with_queue_strategy {
static const std::size_t Size = sizeof...(T);
static const std::size_t arguments_size = Size;
using value_type = std::tuple<variant<T, std::exception_ptr>...>;
Expand Down Expand Up @@ -446,7 +449,7 @@ struct join_queue_strategy {
/**************************************************************************************************/

template <typename... T>
struct zip_queue_strategy {
struct round_robin_queue_strategy {
static const std::size_t Size = sizeof...(T);
static const std::size_t arguments_size = 1;
using item_t = variant<first_t<T...>, std::exception_ptr>;
Expand Down Expand Up @@ -495,7 +498,7 @@ struct zip_queue_strategy {
/**************************************************************************************************/

template <typename... T>
struct merge_queue_strategy {
struct unordered_queue_strategy {
static const std::size_t Size = sizeof...(T);
static const std::size_t arguments_size = 1;
using item_t = variant<first_t<T...>, std::exception_ptr>;
Expand Down Expand Up @@ -1032,6 +1035,31 @@ struct shared_process

/**************************************************************************************************/

} // namespace detail

struct unordered_t
{
template<typename...R>
using strategy_type = detail::unordered_queue_strategy<detail::receiver_t<R>...>;
};

struct round_robin_t
{
template<typename...R>
using strategy_type = detail::round_robin_queue_strategy<detail::receiver_t<R>...>;
};

struct zip_with_t
{
template<typename...R>
using strategy_type = detail::zip_with_queue_strategy<detail::receiver_t<R>...>;
};


/**************************************************************************************************/

namespace detail {

// This helper class is necessary to encapsulate the following functions, because Clang
// currently has a bug in accepting friend functions with auto return type
struct channel_combiner {
Expand All @@ -1057,60 +1085,43 @@ struct channel_combiner {
std::make_index_sequence<sizeof...(R)>());
}

template <typename S, typename F, typename... R>
static auto join_(S&& s, F&& f, R&&... upstream_receiver) {
using result_t = yield_type<F, receiver_t<R>...>;

auto upstream_receiver_processes = std::make_tuple(upstream_receiver._p...);
auto join_process = std::make_shared<
shared_process<join_queue_strategy<receiver_t<R>...>, F, result_t, receiver_t<R>...>>(
std::move(s), std::forward<F>(f), upstream_receiver._p...);

map_as_sender<decltype(join_process), decltype(upstream_receiver_processes),
receiver_t<R>...>(join_process, upstream_receiver_processes);

return receiver<result_t>(std::move(join_process));
}

template <typename S, typename F, typename... R>
static auto zip_(S&& s, F&& f, R&&... upstream_receiver) {
static_assert(
all_true<std::is_convertible<receiver_t<R>, receiver_t<first_t<R...>>>::value...>{},
"All receiver types must be convertible to the type of the firsts receiver type!");

using result_t = yield_type<F, receiver_t<first_t<R...>>>;

auto upstream_receiver_processes = std::make_tuple(upstream_receiver._p...);
auto zip_process = std::make_shared<
shared_process<zip_queue_strategy<receiver_t<R>...>, F, result_t, receiver_t<R>...>>(
std::move(s), std::forward<F>(f), upstream_receiver._p...);

map_as_sender<decltype(zip_process), decltype(upstream_receiver_processes),
receiver_t<R>...>(zip_process, upstream_receiver_processes);
template <typename M, typename F, typename... R>
struct merge_result
{
using type = yield_type<F, receiver_t<first_t<R...>>>;
};

return receiver<result_t>(std::move(zip_process));
}
template <typename F, typename... R>
struct merge_result<zip_with_t, F, R...>
{
using type = yield_type<F, receiver_t<R>...>;
};

template <typename S, typename F, typename... R>
static auto merge_(S&& s, F&& f, R&&... upstream_receiver) {
static_assert(
all_true<std::is_convertible<receiver_t<R>, receiver_t<first_t<R...>>>::value...>{},
"All receiver types must be convertible to the type of the firsts receiver type!");
template <typename M, typename S, typename F, typename... R>
static auto merge_helper(S&& s, F&& f, R&&... upstream_receiver) {

using result_t = yield_type<F, receiver_t<first_t<R...>>>;
using result_t = typename merge_result<M, F, R...>::type;

auto upstream_receiver_processes = std::make_tuple(upstream_receiver._p...);
auto merge_process = std::make_shared<
shared_process<merge_queue_strategy<receiver_t<R>...>, F, result_t, receiver_t<R>...>>(
std::move(s), std::forward<F>(f), upstream_receiver._p...);
shared_process<typename M::template strategy_type<R...>, F, result_t, receiver_t<R>...>>(
std::forward<S>(s), std::forward<F>(f), upstream_receiver._p...);

map_as_sender<decltype(merge_process), decltype(upstream_receiver_processes),
receiver_t<R>...>(merge_process, upstream_receiver_processes);
receiver_t<R>...>(merge_process, upstream_receiver_processes);

return receiver<result_t>(std::move(merge_process));
}
};

struct zip_helper
{
template <typename... T>
auto operator()(T&&... t) const {
return std::make_tuple(std::forward<T>(t)...);
}
};

/**************************************************************************************************/

} // namespace detail
Expand All @@ -1128,27 +1139,45 @@ auto channel(S s) -> std::pair<sender<T>, receiver<T>> {
/**************************************************************************************************/

template <typename S, typename F, typename... R>
auto join(S s, F f, R&&... upstream_receiver) {
return detail::channel_combiner::join_(std::move(s), std::move(f),
std::forward<R>(upstream_receiver)...);
[[deprecated("Use zip_with")]] auto join(S s, F f, R... upstream_receiver) {
return detail::channel_combiner::merge_helper<zip_with_t, S, F, R...>(
std::move(s), std::move(f), std::forward<R>(upstream_receiver)...);
}

/**************************************************************************************************/

template <typename S, typename F, typename... R>
auto zip(S s, F f, R&&... upstream_receiver) {
return detail::channel_combiner::zip_(std::move(s), std::move(f),
std::forward<R>(upstream_receiver)...);
[[deprecated("Use merge_channel<unordered_t>")]] auto merge(S s, F f, R... upstream_receiver) {
return detail::channel_combiner::merge_helper<unordered_t, S, F, R...>(
std::move(s), std::move(f), std::move(upstream_receiver)...);
}

/**************************************************************************************************/

template <typename M, typename S, typename F, typename... R>
auto merge_channel(S s, F f, R... upstream_receiver) {
return detail::channel_combiner::merge_helper<M, S, F, R...>(
std::move(s), std::move(f), std::move(upstream_receiver)...);
}

/**************************************************************************************************/

template <typename S, typename F, typename... R>
auto merge(S s, F f, R&&... upstream_receiver) {
return detail::channel_combiner::merge_(std::move(s), std::move(f),
std::forward<R>(upstream_receiver)...);
auto zip_with(S s, F f, R... upstream_receiver) {
return detail::channel_combiner::merge_helper<zip_with_t, S, F, R...>(std::move(s), std::move(f),
std::forward<R>(upstream_receiver)...);
}

/**************************************************************************************************/

template <typename S, typename... R>
auto zip(S s, R... r) {
return zip_with(std::move(s), detail::zip_helper{}, std::move(r)...);
}

// template <typename S, typename F, typename... R>
// [[deprecated("Use merge_channel<round_robin_t>")]] auto zip(S s, F f, R... upstream_receiver);

/**************************************************************************************************/

struct buffer_size {
Expand Down Expand Up @@ -1304,7 +1333,7 @@ class receiver {
friend class sender;

template <typename U>
friend class receiver; // huh?
friend class receiver;

template <typename U, typename V>
friend auto channel(V) -> std::pair<sender<U>, receiver<U>>;
Expand Down Expand Up @@ -1428,9 +1457,11 @@ class sender<T, enable_if_copyable<T>> {
void swap(sender& x) noexcept { std::swap(*this, x); }

inline friend void swap(sender& x, sender& y) noexcept { x.swap(y); }

inline friend bool operator==(const sender& x, const sender& y) {
return x._p.lock() == y._p.lock();
};

inline friend bool operator!=(const sender& x, const sender& y) { return !(x == y); };

void close() {
Expand Down Expand Up @@ -1477,9 +1508,11 @@ class sender<T, enable_if_not_copyable<T>> {
void swap(sender& x) noexcept { std::swap(*this, x); }

inline friend void swap(sender& x, sender& y) noexcept { x.swap(y); }

inline friend bool operator==(const sender& x, const sender& y) {
return x._p.lock() == y._p.lock();
};

inline friend bool operator!=(const sender& x, const sender& y) { return !(x == y); };

void close() {
Expand Down
6 changes: 3 additions & 3 deletions test/CMakeLists.txt
Expand Up @@ -2,12 +2,12 @@

add_executable( stlab.test.channel.test
"${CMAKE_CURRENT_SOURCE_DIR}/channel_functor_tests.cpp"
"${CMAKE_CURRENT_SOURCE_DIR}/channel_join_tests.cpp"
"${CMAKE_CURRENT_SOURCE_DIR}/channel_merge_tests.cpp"
"${CMAKE_CURRENT_SOURCE_DIR}/channel_merge_round_robin_tests.cpp"
"${CMAKE_CURRENT_SOURCE_DIR}/channel_merge_unordered_tests.cpp"
"${CMAKE_CURRENT_SOURCE_DIR}/channel_merge_zip_with_tests.cpp"
"${CMAKE_CURRENT_SOURCE_DIR}/channel_process_tests.cpp"
"${CMAKE_CURRENT_SOURCE_DIR}/channel_test_helper.cpp"
"${CMAKE_CURRENT_SOURCE_DIR}/channel_tests.cpp"
"${CMAKE_CURRENT_SOURCE_DIR}/channel_zip_tests.cpp"
"${CMAKE_CURRENT_SOURCE_DIR}/tuple_algorithm_test.cpp"
"${CMAKE_CURRENT_SOURCE_DIR}/main.cpp"
"${CMAKE_CURRENT_SOURCE_DIR}/channel_test_helper.hpp" )
Expand Down

0 comments on commit 007846d

Please sign in to comment.