Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EAC-4613 Add observe as alternative to then #19

Merged
merged 2 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

class ThousandEyesFuturesConan(ConanFile):
name = "thousandeyes-futures"
version = "0.8"
version = "0.9"
exports_sources = "include/*", "FindThousandEyesFutures.cmake"
no_copy_source = True
short_paths = True
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright 2024 ThousandEyes, Inc.
*
* Use of this source code is governed by an MIT-style
* license that can be found in the LICENSE file or at
* https://opensource.org/licenses/MIT.
*
* @author Marcus Nutzinger, https://github.com/manutzin-te
*/

#pragma once

#include <future>
#include <memory>
Copy link
Contributor

@jstinson-te jstinson-te Jan 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: memory is unused but chrono is used, I'd swap this for a chrono include


#include <thousandeyes/futures/TimedWaitable.h>

namespace thousandeyes {
namespace futures {
namespace detail {

template <class TIn, class TFunc>
class ObservedFutureWithContinuation : public TimedWaitable {
public:
ObservedFutureWithContinuation(std::chrono::microseconds waitLimit,
std::future<TIn> f,
TFunc&& cont) :
TimedWaitable(std::move(waitLimit)),
f_(std::move(f)),
cont_(std::forward<TFunc>(cont))
{}

ObservedFutureWithContinuation(const ObservedFutureWithContinuation& o) = delete;
ObservedFutureWithContinuation& operator=(const ObservedFutureWithContinuation& o) = delete;

ObservedFutureWithContinuation(ObservedFutureWithContinuation&& o) = default;
ObservedFutureWithContinuation& operator=(ObservedFutureWithContinuation&& o) = default;

bool timedWait(const std::chrono::microseconds& timeout) override
{
return f_.wait_for(timeout) == std::future_status::ready;
}

void dispatch(std::exception_ptr err) override
{
if (err) {
std::rethrow_exception(err);
}
else {
cont_(std::move(f_));
}
}

private:
std::future<TIn> f_;
TFunc cont_;
};

} // namespace detail
} // namespace futures
} // namespace thousandeyes
117 changes: 117 additions & 0 deletions include/thousandeyes/futures/observe.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Copyright 2024 ThousandEyes, Inc.
*
* Use of this source code is governed by an MIT-style
* license that can be found in the LICENSE file or at
* https://opensource.org/licenses/MIT.
*
* @author Marcus Nutzinger, https://github.com/manutzin-te
*/

#pragma once

#include <future>
#include <memory>

#include <thousandeyes/futures/Default.h>
#include <thousandeyes/futures/detail/ObservedFutureWithContinuation.h>
#include <thousandeyes/futures/Executor.h>

namespace thousandeyes {
namespace futures {

//! \brief Observes the input futures and calls the given continuation function once
//! it becomes ready.
//!
//! \param executor The object that waits for the given future to become ready.
//! \param timeLimit The maximum time to wait for the given future to become ready.
//! \param f The input future to wait and invoke the continuation function on.
//! \param cont The continuation function to invoke on the ready input future.
//!
//! \note Any exceptions from the continuation or from the executor will be thrown
//! on the thread on which the continuation is scheduled.
//!
//! \sa WaitableTimedOutException
template <class TIn, class TFunc>
void observe(std::shared_ptr<Executor> executor,
std::chrono::microseconds timeLimit,
std::future<TIn> f,
TFunc&& cont)
{
executor->watch(std::make_unique<detail::ObservedFutureWithContinuation<TIn, TFunc>>(
std::move(timeLimit),
std::move(f),
std::forward<TFunc>(cont)));
}

//! \brief Observes the input futures and calls the given continuation function once
//! it becomes ready.
//!
//! \param executor The object that waits for the given future to become ready.
//! \param f The input future to wait and invoke the continuation function on.
//! \param cont The continuation function to invoke on the ready input future.
//!
//! \note If the total time for waiting the input future to become ready exceeds
//! a maximum threshold defined by the library (typically 1h), an exception of type
//! WaitableTimedOutException will be thrown on the thread on which the continuation
//! is scheduled.
//!
//! \sa WaitableTimedOutException
template <class TIn, class TFunc>
void observe(std::shared_ptr<Executor> executor, std::future<TIn> f, TFunc&& cont)
{
observe<TIn, TFunc>(std::move(executor),
std::chrono::hours(1),
std::move(f),
std::forward<TFunc>(cont));
}

//! \brief Observes the input futures and calls the given continuation function once
//! it becomes ready.
//!
//! \par This function uses the default Executor object to wait for the given futures
//! to become ready. If there isn't any default Executor object registered, this
//! function's behavior is undefined.
//!
//! \param timeLimit The maximum time to wait for the given future to become ready.
//! \param f The input future to wait and invoke the continuation function on.
//! \param cont The continuation function to invoke on the ready input future.
//!
//! \note If the total time for waiting the input future to become ready exceeds the
//! given timeLimit, an exception of type WaitableTimedOutException will be thrown on
//! the thread on which the continuation is scheduled.
//!
//! \sa Default, WaitableTimedOutException
template <class TIn, class TFunc>
void observe(std::chrono::microseconds timeLimit, std::future<TIn> f, TFunc&& cont)
{
observe<TIn, TFunc>(Default<Executor>(),
std::move(timeLimit),
std::move(f),
std::forward<TFunc>(cont));
}

//! \brief Observes the input futures and calls the given continuation function once
//! it becomes ready.
//!
//! \par This function uses the default Executor object to wait for the given futures
//! to become ready. If there isn't any default Executor object registered, this
//! function's behavior is undefined.
//!
//! \param f The input future to wait and invoke the continuation function on.
//! \param cont The continuation function to invoke on the ready input future.
//!
//! \note If the total time for waiting the input future to become ready exceeds
//! a maximum threshold defined by the library (typically 1h), an exception of
//! type WaitableTimedOutException will be thrown on the thread on which the
//! continuation is scheduled.
//!
//! \sa Default, WaitableTimedOutException
template <class TIn, class TFunc>
void observe(std::future<TIn> f, TFunc&& cont)
{
return observe<TIn, TFunc>(std::chrono::hours(1), std::move(f), std::forward<TFunc>(cont));
}

} // namespace futures
} // namespace thousandeyes
71 changes: 70 additions & 1 deletion tests/defaultexecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <array>
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <functional>
#include <future>
#include <map>
Expand All @@ -29,11 +30,13 @@

#include <thousandeyes/futures/all.h>
#include <thousandeyes/futures/DefaultExecutor.h>
#include <thousandeyes/futures/observe.h>
#include <thousandeyes/futures/then.h>
#include <thousandeyes/futures/util.h>

using std::array;
using std::bind;
using std::condition_variable;
using std::exception;
using std::function;
using std::future;
Expand All @@ -44,6 +47,7 @@ using std::make_shared;
using std::make_tuple;
using std::make_unique;
using std::map;
using std::move;
using std::mt19937;
using std::mutex;
using std::promise;
Expand All @@ -55,6 +59,7 @@ using std::thread;
using std::to_string;
using std::tuple;
using std::uniform_int_distribution;
using std::unique_lock;
using std::unique_ptr;
using std::vector;
using std::chrono::duration_cast;
Expand All @@ -71,13 +76,15 @@ using thousandeyes::futures::DefaultExecutor;
using thousandeyes::futures::Executor;
using thousandeyes::futures::fromException;
using thousandeyes::futures::fromValue;
using thousandeyes::futures::observe;
using thousandeyes::futures::then;
using thousandeyes::futures::Waitable;
using thousandeyes::futures::WaitableWaitException;

namespace detail = thousandeyes::futures::detail;

using ::testing::_;
using ::testing::UnorderedElementsAre;
using ::testing::Range;
using ::testing::Return;
using ::testing::SaveArg;
Expand Down Expand Up @@ -167,6 +174,36 @@ TEST_F(DefaultExecutorTest, ThenWithoutException)
executor->stop();
}

TEST_F(DefaultExecutorTest, ObserveWithoutException)
{
auto executor = make_shared<DefaultExecutor>(milliseconds(10));
Default<Executor>::Setter execSetter(executor);

mutex mtx;
condition_variable cv;
vector<int> result;

auto recordResult = [&](int num) {
lock_guard<mutex> lock(mtx);

result.push_back(num);
cv.notify_one();
};

observe(getValueAsync(1821), [&](future<int> f) { recordResult(f.get()); });

observe(getValueAsync(1822), [&](future<int> f) { recordResult(f.get()); });

{
unique_lock<mutex> lock(mtx);
cv.wait(lock, [&] { return result.size() == 2; });
}

EXPECT_THAT(result, UnorderedElementsAre(1821, 1822));

executor->stop();
}

TEST_F(DefaultExecutorTest, ThenWithException)
{
auto executor = make_shared<DefaultExecutor>(milliseconds(10));
Expand All @@ -180,6 +217,39 @@ TEST_F(DefaultExecutorTest, ThenWithException)
executor->stop();
}

using DefaultExecutorDeathTest = DefaultExecutorTest;

TEST_F(DefaultExecutorDeathTest, ObserveWithException)
{
GTEST_FLAG_SET(death_test_style, "threadsafe");

auto executor = make_shared<DefaultExecutor>(milliseconds(10));
Default<Executor>::Setter execSetter(executor);

mutex mtx;
condition_variable cv;

// This will never become true, as the continuation will rethrow the exception
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a EXPECT_FALSE(success); to the end of the test to make this expectation checked?

// thrown by the future passed to observe(). It only serves as barrier to block
// the execution until observe() throws.
bool success = false;

EXPECT_DEATH(
{
observe(getExceptionAsync<SomeKindOfError>(), [](future<void> f) {
f.get();

FAIL() << "As f.get() throws this code should never be executed";
});

unique_lock<mutex> lock(mtx);
cv.wait(lock, [&]{ return success; });
},
_);

executor->stop();
}

TEST_F(DefaultExecutorTest, ThenWithVoidInputWithoutException)
{
auto executor = make_shared<DefaultExecutor>(milliseconds(10));
Expand Down Expand Up @@ -781,7 +851,6 @@ TEST_F(DefaultExecutorTest, TupleAllWithException)
}

namespace {

future<int> recFunc1(int count);
future<int> recFunc2(future<int> f);

Expand Down