diff --git a/conanfile.py b/conanfile.py index 6b00687..a802f3e 100644 --- a/conanfile.py +++ b/conanfile.py @@ -2,7 +2,7 @@ class ThousandEyesFuturesConan(ConanFile): name = "thousandeyes-futures" - version = "0.8" + version = "0.9" exports_sources = "include/*", "FindThousandEyesFutures.cmake" no_copy_source = True short_paths = True diff --git a/include/thousandeyes/futures/detail/ObservedFutureWithContinuation.h b/include/thousandeyes/futures/detail/ObservedFutureWithContinuation.h new file mode 100644 index 0000000..8b19b67 --- /dev/null +++ b/include/thousandeyes/futures/detail/ObservedFutureWithContinuation.h @@ -0,0 +1,61 @@ +/* + * Copyright 2024 ThousandEyes, Inc. + * + * Use of this source code is governed by an MIT-style + * license that can be found in the LICENSE file or at + * https://opensource.org/licenses/MIT. + * + * @author Marcus Nutzinger, https://github.com/manutzin-te + */ + +#pragma once + +#include +#include + +#include + +namespace thousandeyes { +namespace futures { +namespace detail { + +template +class ObservedFutureWithContinuation : public TimedWaitable { +public: + ObservedFutureWithContinuation(std::chrono::microseconds waitLimit, + std::future f, + TFunc&& cont) : + TimedWaitable(std::move(waitLimit)), + f_(std::move(f)), + cont_(std::forward(cont)) + {} + + ObservedFutureWithContinuation(const ObservedFutureWithContinuation& o) = delete; + ObservedFutureWithContinuation& operator=(const ObservedFutureWithContinuation& o) = delete; + + ObservedFutureWithContinuation(ObservedFutureWithContinuation&& o) = default; + ObservedFutureWithContinuation& operator=(ObservedFutureWithContinuation&& o) = default; + + bool timedWait(const std::chrono::microseconds& timeout) override + { + return f_.wait_for(timeout) == std::future_status::ready; + } + + void dispatch(std::exception_ptr err) override + { + if (err) { + std::rethrow_exception(err); + } + else { + cont_(std::move(f_)); + } + } + +private: + std::future f_; + TFunc cont_; +}; + +} // namespace detail +} // namespace futures +} // namespace thousandeyes diff --git a/include/thousandeyes/futures/observe.h b/include/thousandeyes/futures/observe.h new file mode 100644 index 0000000..78120e6 --- /dev/null +++ b/include/thousandeyes/futures/observe.h @@ -0,0 +1,117 @@ +/* + * Copyright 2024 ThousandEyes, Inc. + * + * Use of this source code is governed by an MIT-style + * license that can be found in the LICENSE file or at + * https://opensource.org/licenses/MIT. + * + * @author Marcus Nutzinger, https://github.com/manutzin-te + */ + +#pragma once + +#include +#include + +#include +#include +#include + +namespace thousandeyes { +namespace futures { + +//! \brief Observes the input futures and calls the given continuation function once +//! it becomes ready. +//! +//! \param executor The object that waits for the given future to become ready. +//! \param timeLimit The maximum time to wait for the given future to become ready. +//! \param f The input future to wait and invoke the continuation function on. +//! \param cont The continuation function to invoke on the ready input future. +//! +//! \note Any exceptions from the continuation or from the executor will be thrown +//! on the thread on which the continuation is scheduled. +//! +//! \sa WaitableTimedOutException +template +void observe(std::shared_ptr executor, + std::chrono::microseconds timeLimit, + std::future f, + TFunc&& cont) +{ + executor->watch(std::make_unique>( + std::move(timeLimit), + std::move(f), + std::forward(cont))); +} + +//! \brief Observes the input futures and calls the given continuation function once +//! it becomes ready. +//! +//! \param executor The object that waits for the given future to become ready. +//! \param f The input future to wait and invoke the continuation function on. +//! \param cont The continuation function to invoke on the ready input future. +//! +//! \note If the total time for waiting the input future to become ready exceeds +//! a maximum threshold defined by the library (typically 1h), an exception of type +//! WaitableTimedOutException will be thrown on the thread on which the continuation +//! is scheduled. +//! +//! \sa WaitableTimedOutException +template +void observe(std::shared_ptr executor, std::future f, TFunc&& cont) +{ + observe(std::move(executor), + std::chrono::hours(1), + std::move(f), + std::forward(cont)); +} + +//! \brief Observes the input futures and calls the given continuation function once +//! it becomes ready. +//! +//! \par This function uses the default Executor object to wait for the given futures +//! to become ready. If there isn't any default Executor object registered, this +//! function's behavior is undefined. +//! +//! \param timeLimit The maximum time to wait for the given future to become ready. +//! \param f The input future to wait and invoke the continuation function on. +//! \param cont The continuation function to invoke on the ready input future. +//! +//! \note If the total time for waiting the input future to become ready exceeds the +//! given timeLimit, an exception of type WaitableTimedOutException will be thrown on +//! the thread on which the continuation is scheduled. +//! +//! \sa Default, WaitableTimedOutException +template +void observe(std::chrono::microseconds timeLimit, std::future f, TFunc&& cont) +{ + observe(Default(), + std::move(timeLimit), + std::move(f), + std::forward(cont)); +} + +//! \brief Observes the input futures and calls the given continuation function once +//! it becomes ready. +//! +//! \par This function uses the default Executor object to wait for the given futures +//! to become ready. If there isn't any default Executor object registered, this +//! function's behavior is undefined. +//! +//! \param f The input future to wait and invoke the continuation function on. +//! \param cont The continuation function to invoke on the ready input future. +//! +//! \note If the total time for waiting the input future to become ready exceeds +//! a maximum threshold defined by the library (typically 1h), an exception of +//! type WaitableTimedOutException will be thrown on the thread on which the +//! continuation is scheduled. +//! +//! \sa Default, WaitableTimedOutException +template +void observe(std::future f, TFunc&& cont) +{ + return observe(std::chrono::hours(1), std::move(f), std::forward(cont)); +} + +} // namespace futures +} // namespace thousandeyes diff --git a/tests/defaultexecutor.cpp b/tests/defaultexecutor.cpp index 374085d..2cb5851 100644 --- a/tests/defaultexecutor.cpp +++ b/tests/defaultexecutor.cpp @@ -11,6 +11,7 @@ #include #include #include +#include #include #include #include @@ -29,11 +30,13 @@ #include #include +#include #include #include using std::array; using std::bind; +using std::condition_variable; using std::exception; using std::function; using std::future; @@ -44,6 +47,7 @@ using std::make_shared; using std::make_tuple; using std::make_unique; using std::map; +using std::move; using std::mt19937; using std::mutex; using std::promise; @@ -55,6 +59,7 @@ using std::thread; using std::to_string; using std::tuple; using std::uniform_int_distribution; +using std::unique_lock; using std::unique_ptr; using std::vector; using std::chrono::duration_cast; @@ -71,6 +76,7 @@ using thousandeyes::futures::DefaultExecutor; using thousandeyes::futures::Executor; using thousandeyes::futures::fromException; using thousandeyes::futures::fromValue; +using thousandeyes::futures::observe; using thousandeyes::futures::then; using thousandeyes::futures::Waitable; using thousandeyes::futures::WaitableWaitException; @@ -78,6 +84,7 @@ using thousandeyes::futures::WaitableWaitException; namespace detail = thousandeyes::futures::detail; using ::testing::_; +using ::testing::UnorderedElementsAre; using ::testing::Range; using ::testing::Return; using ::testing::SaveArg; @@ -167,6 +174,36 @@ TEST_F(DefaultExecutorTest, ThenWithoutException) executor->stop(); } +TEST_F(DefaultExecutorTest, ObserveWithoutException) +{ + auto executor = make_shared(milliseconds(10)); + Default::Setter execSetter(executor); + + mutex mtx; + condition_variable cv; + vector result; + + auto recordResult = [&](int num) { + lock_guard lock(mtx); + + result.push_back(num); + cv.notify_one(); + }; + + observe(getValueAsync(1821), [&](future f) { recordResult(f.get()); }); + + observe(getValueAsync(1822), [&](future f) { recordResult(f.get()); }); + + { + unique_lock lock(mtx); + cv.wait(lock, [&] { return result.size() == 2; }); + } + + EXPECT_THAT(result, UnorderedElementsAre(1821, 1822)); + + executor->stop(); +} + TEST_F(DefaultExecutorTest, ThenWithException) { auto executor = make_shared(milliseconds(10)); @@ -180,6 +217,39 @@ TEST_F(DefaultExecutorTest, ThenWithException) executor->stop(); } +using DefaultExecutorDeathTest = DefaultExecutorTest; + +TEST_F(DefaultExecutorDeathTest, ObserveWithException) +{ + GTEST_FLAG_SET(death_test_style, "threadsafe"); + + auto executor = make_shared(milliseconds(10)); + Default::Setter execSetter(executor); + + mutex mtx; + condition_variable cv; + + // This will never become true, as the continuation will rethrow the exception + // thrown by the future passed to observe(). It only serves as barrier to block + // the execution until observe() throws. + bool success = false; + + EXPECT_DEATH( + { + observe(getExceptionAsync(), [](future f) { + f.get(); + + FAIL() << "As f.get() throws this code should never be executed"; + }); + + unique_lock lock(mtx); + cv.wait(lock, [&]{ return success; }); + }, + _); + + executor->stop(); +} + TEST_F(DefaultExecutorTest, ThenWithVoidInputWithoutException) { auto executor = make_shared(milliseconds(10)); @@ -781,7 +851,6 @@ TEST_F(DefaultExecutorTest, TupleAllWithException) } namespace { - future recFunc1(int count); future recFunc2(future f);