From bc6bdae970db54a4639d518a843824f603ab5ed1 Mon Sep 17 00:00:00 2001 From: Marcus Nutzinger Date: Mon, 29 Jan 2024 16:06:49 +0000 Subject: [PATCH 1/2] EAC-4613 Add observe as alternative to then Different from then(), observe() returns void and will call the continuation on the executor thread. Any exceptions thrown by the continuation will therefore affect the executor's dispatch thread. It's intended use case is fire-and-forget futures that we still want to .get() to not silently swallow any exceptions. --- .../detail/ObservedFutureWithContinuation.h | 61 +++++++++ include/thousandeyes/futures/observe.h | 117 ++++++++++++++++++ tests/defaultexecutor.cpp | 71 ++++++++++- 3 files changed, 248 insertions(+), 1 deletion(-) create mode 100644 include/thousandeyes/futures/detail/ObservedFutureWithContinuation.h create mode 100644 include/thousandeyes/futures/observe.h diff --git a/include/thousandeyes/futures/detail/ObservedFutureWithContinuation.h b/include/thousandeyes/futures/detail/ObservedFutureWithContinuation.h new file mode 100644 index 0000000..8b19b67 --- /dev/null +++ b/include/thousandeyes/futures/detail/ObservedFutureWithContinuation.h @@ -0,0 +1,61 @@ +/* + * Copyright 2024 ThousandEyes, Inc. + * + * Use of this source code is governed by an MIT-style + * license that can be found in the LICENSE file or at + * https://opensource.org/licenses/MIT. + * + * @author Marcus Nutzinger, https://github.com/manutzin-te + */ + +#pragma once + +#include +#include + +#include + +namespace thousandeyes { +namespace futures { +namespace detail { + +template +class ObservedFutureWithContinuation : public TimedWaitable { +public: + ObservedFutureWithContinuation(std::chrono::microseconds waitLimit, + std::future f, + TFunc&& cont) : + TimedWaitable(std::move(waitLimit)), + f_(std::move(f)), + cont_(std::forward(cont)) + {} + + ObservedFutureWithContinuation(const ObservedFutureWithContinuation& o) = delete; + ObservedFutureWithContinuation& operator=(const ObservedFutureWithContinuation& o) = delete; + + ObservedFutureWithContinuation(ObservedFutureWithContinuation&& o) = default; + ObservedFutureWithContinuation& operator=(ObservedFutureWithContinuation&& o) = default; + + bool timedWait(const std::chrono::microseconds& timeout) override + { + return f_.wait_for(timeout) == std::future_status::ready; + } + + void dispatch(std::exception_ptr err) override + { + if (err) { + std::rethrow_exception(err); + } + else { + cont_(std::move(f_)); + } + } + +private: + std::future f_; + TFunc cont_; +}; + +} // namespace detail +} // namespace futures +} // namespace thousandeyes diff --git a/include/thousandeyes/futures/observe.h b/include/thousandeyes/futures/observe.h new file mode 100644 index 0000000..78120e6 --- /dev/null +++ b/include/thousandeyes/futures/observe.h @@ -0,0 +1,117 @@ +/* + * Copyright 2024 ThousandEyes, Inc. + * + * Use of this source code is governed by an MIT-style + * license that can be found in the LICENSE file or at + * https://opensource.org/licenses/MIT. + * + * @author Marcus Nutzinger, https://github.com/manutzin-te + */ + +#pragma once + +#include +#include + +#include +#include +#include + +namespace thousandeyes { +namespace futures { + +//! \brief Observes the input futures and calls the given continuation function once +//! it becomes ready. +//! +//! \param executor The object that waits for the given future to become ready. +//! \param timeLimit The maximum time to wait for the given future to become ready. +//! \param f The input future to wait and invoke the continuation function on. +//! \param cont The continuation function to invoke on the ready input future. +//! +//! \note Any exceptions from the continuation or from the executor will be thrown +//! on the thread on which the continuation is scheduled. +//! +//! \sa WaitableTimedOutException +template +void observe(std::shared_ptr executor, + std::chrono::microseconds timeLimit, + std::future f, + TFunc&& cont) +{ + executor->watch(std::make_unique>( + std::move(timeLimit), + std::move(f), + std::forward(cont))); +} + +//! \brief Observes the input futures and calls the given continuation function once +//! it becomes ready. +//! +//! \param executor The object that waits for the given future to become ready. +//! \param f The input future to wait and invoke the continuation function on. +//! \param cont The continuation function to invoke on the ready input future. +//! +//! \note If the total time for waiting the input future to become ready exceeds +//! a maximum threshold defined by the library (typically 1h), an exception of type +//! WaitableTimedOutException will be thrown on the thread on which the continuation +//! is scheduled. +//! +//! \sa WaitableTimedOutException +template +void observe(std::shared_ptr executor, std::future f, TFunc&& cont) +{ + observe(std::move(executor), + std::chrono::hours(1), + std::move(f), + std::forward(cont)); +} + +//! \brief Observes the input futures and calls the given continuation function once +//! it becomes ready. +//! +//! \par This function uses the default Executor object to wait for the given futures +//! to become ready. If there isn't any default Executor object registered, this +//! function's behavior is undefined. +//! +//! \param timeLimit The maximum time to wait for the given future to become ready. +//! \param f The input future to wait and invoke the continuation function on. +//! \param cont The continuation function to invoke on the ready input future. +//! +//! \note If the total time for waiting the input future to become ready exceeds the +//! given timeLimit, an exception of type WaitableTimedOutException will be thrown on +//! the thread on which the continuation is scheduled. +//! +//! \sa Default, WaitableTimedOutException +template +void observe(std::chrono::microseconds timeLimit, std::future f, TFunc&& cont) +{ + observe(Default(), + std::move(timeLimit), + std::move(f), + std::forward(cont)); +} + +//! \brief Observes the input futures and calls the given continuation function once +//! it becomes ready. +//! +//! \par This function uses the default Executor object to wait for the given futures +//! to become ready. If there isn't any default Executor object registered, this +//! function's behavior is undefined. +//! +//! \param f The input future to wait and invoke the continuation function on. +//! \param cont The continuation function to invoke on the ready input future. +//! +//! \note If the total time for waiting the input future to become ready exceeds +//! a maximum threshold defined by the library (typically 1h), an exception of +//! type WaitableTimedOutException will be thrown on the thread on which the +//! continuation is scheduled. +//! +//! \sa Default, WaitableTimedOutException +template +void observe(std::future f, TFunc&& cont) +{ + return observe(std::chrono::hours(1), std::move(f), std::forward(cont)); +} + +} // namespace futures +} // namespace thousandeyes diff --git a/tests/defaultexecutor.cpp b/tests/defaultexecutor.cpp index 374085d..2cb5851 100644 --- a/tests/defaultexecutor.cpp +++ b/tests/defaultexecutor.cpp @@ -11,6 +11,7 @@ #include #include #include +#include #include #include #include @@ -29,11 +30,13 @@ #include #include +#include #include #include using std::array; using std::bind; +using std::condition_variable; using std::exception; using std::function; using std::future; @@ -44,6 +47,7 @@ using std::make_shared; using std::make_tuple; using std::make_unique; using std::map; +using std::move; using std::mt19937; using std::mutex; using std::promise; @@ -55,6 +59,7 @@ using std::thread; using std::to_string; using std::tuple; using std::uniform_int_distribution; +using std::unique_lock; using std::unique_ptr; using std::vector; using std::chrono::duration_cast; @@ -71,6 +76,7 @@ using thousandeyes::futures::DefaultExecutor; using thousandeyes::futures::Executor; using thousandeyes::futures::fromException; using thousandeyes::futures::fromValue; +using thousandeyes::futures::observe; using thousandeyes::futures::then; using thousandeyes::futures::Waitable; using thousandeyes::futures::WaitableWaitException; @@ -78,6 +84,7 @@ using thousandeyes::futures::WaitableWaitException; namespace detail = thousandeyes::futures::detail; using ::testing::_; +using ::testing::UnorderedElementsAre; using ::testing::Range; using ::testing::Return; using ::testing::SaveArg; @@ -167,6 +174,36 @@ TEST_F(DefaultExecutorTest, ThenWithoutException) executor->stop(); } +TEST_F(DefaultExecutorTest, ObserveWithoutException) +{ + auto executor = make_shared(milliseconds(10)); + Default::Setter execSetter(executor); + + mutex mtx; + condition_variable cv; + vector result; + + auto recordResult = [&](int num) { + lock_guard lock(mtx); + + result.push_back(num); + cv.notify_one(); + }; + + observe(getValueAsync(1821), [&](future f) { recordResult(f.get()); }); + + observe(getValueAsync(1822), [&](future f) { recordResult(f.get()); }); + + { + unique_lock lock(mtx); + cv.wait(lock, [&] { return result.size() == 2; }); + } + + EXPECT_THAT(result, UnorderedElementsAre(1821, 1822)); + + executor->stop(); +} + TEST_F(DefaultExecutorTest, ThenWithException) { auto executor = make_shared(milliseconds(10)); @@ -180,6 +217,39 @@ TEST_F(DefaultExecutorTest, ThenWithException) executor->stop(); } +using DefaultExecutorDeathTest = DefaultExecutorTest; + +TEST_F(DefaultExecutorDeathTest, ObserveWithException) +{ + GTEST_FLAG_SET(death_test_style, "threadsafe"); + + auto executor = make_shared(milliseconds(10)); + Default::Setter execSetter(executor); + + mutex mtx; + condition_variable cv; + + // This will never become true, as the continuation will rethrow the exception + // thrown by the future passed to observe(). It only serves as barrier to block + // the execution until observe() throws. + bool success = false; + + EXPECT_DEATH( + { + observe(getExceptionAsync(), [](future f) { + f.get(); + + FAIL() << "As f.get() throws this code should never be executed"; + }); + + unique_lock lock(mtx); + cv.wait(lock, [&]{ return success; }); + }, + _); + + executor->stop(); +} + TEST_F(DefaultExecutorTest, ThenWithVoidInputWithoutException) { auto executor = make_shared(milliseconds(10)); @@ -781,7 +851,6 @@ TEST_F(DefaultExecutorTest, TupleAllWithException) } namespace { - future recFunc1(int count); future recFunc2(future f); From 5c04edc7919c7818a34a67ab4f54cbbbef8201f9 Mon Sep 17 00:00:00 2001 From: Marcus Nutzinger Date: Mon, 29 Jan 2024 16:15:13 +0000 Subject: [PATCH 2/2] EAC-4613 Bump version to 0.9 --- conanfile.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/conanfile.py b/conanfile.py index 6b00687..a802f3e 100644 --- a/conanfile.py +++ b/conanfile.py @@ -2,7 +2,7 @@ class ThousandEyesFuturesConan(ConanFile): name = "thousandeyes-futures" - version = "0.8" + version = "0.9" exports_sources = "include/*", "FindThousandEyesFutures.cmake" no_copy_source = True short_paths = True