Skip to content
This repository has been archived by the owner on Apr 14, 2022. It is now read-only.

Commit

Permalink
Improve rpl integration.
Browse files Browse the repository at this point in the history
  • Loading branch information
john-preston committed Feb 22, 2019
1 parent 74ddc4d commit 84072fb
Showing 1 changed file with 27 additions and 23 deletions.
50 changes: 27 additions & 23 deletions src/crl/crl_object_on_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,10 @@ class weak_on_queue final {
#ifdef CRL_ENABLE_RPL_INTEGRATION
template <
typename Method,
typename Callback,
typename Invoke,
typename Result = decltype(
std::declval<Method>()(std::declval<Type&>()))>
Result producer(Method &&method, Callback &&callback) const;
Result producer(Method &&method, Invoke &&invoke) const;

template <
typename Method,
Expand Down Expand Up @@ -134,10 +134,10 @@ class object_on_queue final {
#ifdef CRL_ENABLE_RPL_INTEGRATION
template <
typename Method,
typename Callback,
typename Invoke,
typename Result = decltype(
std::declval<Method>()(std::declval<Type&>()))>
Result producer(Method &&method, Callback &&callback) const;
Result producer(Method &&method, Invoke &&invoke) const;

template <
typename Method,
Expand Down Expand Up @@ -250,31 +250,42 @@ void weak_on_queue<Type>::destroy(Value &&value) const {

#ifdef CRL_ENABLE_RPL_INTEGRATION
template <typename Type>
template <typename Method, typename Callback, typename Result>
template <typename Method, typename Invoke, typename Result>
Result weak_on_queue<Type>::producer(
Method &&method,
Callback &&callback) const {
Invoke &&invoke) const {
return [
weak = *this,
method = std::forward<Method>(method),
callback = std::forward<Callback>(callback)
invoke = std::forward<Invoke>(invoke)
](auto consumer) mutable {
auto lifetime_on_queue = std::make_shared<rpl::lifetime>();
weak.with([
method = std::move(method),
callback = std::move(callback),
invoke = std::move(invoke),
consumer = std::move(consumer),
lifetime_on_queue
](const Type &that) mutable {
method(
that
) | rpl::start_with_next([
callback = std::move(callback),
consumer = std::move(consumer)
](auto &&value) {
callback(
) | rpl::start_with_next_error_done([=](auto &&value) {
invoke([
consumer,
value = std::forward<decltype(value)>(value)
]() mutable {
consumer.put_next(std::move(value));
});
}, [=](auto &&error) {
invoke([
consumer,
std::forward<decltype(value)>(value));
error = std::forward<decltype(error)>(error)
]() mutable {
consumer.put_error(std::move(error));
});
}, [=] {
invoke([=] {
consumer.put_done();
});
}, *lifetime_on_queue);
});
return rpl::lifetime([
Expand All @@ -289,15 +300,8 @@ Result weak_on_queue<Type>::producer(
template <typename Type>
template <typename Method, typename Result>
Result weak_on_queue<Type>::producer_on_main(Method &&method) const {
return producer(std::forward<Method>(method), [](
const auto &consumer,
auto &&value) {
crl::on_main([
consumer,
event = std::forward<decltype(value)>(value)
]() mutable {
consumer.put_next(std::move(event));
});
return producer(std::forward<Method>(method), [](auto &&callback) {
crl::on_main(std::forward<decltype(callback)>(callback));
});
}
#endif // CRL_ENABLE_RPL_INTEGRATION
Expand Down

0 comments on commit 84072fb

Please sign in to comment.