Skip to content

Commit

Permalink
Merge pull request #19 from manutzin-te/EAC-4613
Browse files Browse the repository at this point in the history
EAC-4613 Add observe as alternative to then
  • Loading branch information
manutzin-te committed Jan 30, 2024
2 parents dc16236 + 5c04edc commit 7ae0b00
Show file tree
Hide file tree
Showing 4 changed files with 249 additions and 2 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

class ThousandEyesFuturesConan(ConanFile):
name = "thousandeyes-futures"
version = "0.8"
version = "0.9"
exports_sources = "include/*", "FindThousandEyesFutures.cmake"
no_copy_source = True
short_paths = True
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright 2024 ThousandEyes, Inc.
*
* Use of this source code is governed by an MIT-style
* license that can be found in the LICENSE file or at
* https://opensource.org/licenses/MIT.
*
* @author Marcus Nutzinger, https://github.com/manutzin-te
*/

#pragma once

#include <future>
#include <memory>

#include <thousandeyes/futures/TimedWaitable.h>

namespace thousandeyes {
namespace futures {
namespace detail {

template <class TIn, class TFunc>
class ObservedFutureWithContinuation : public TimedWaitable {
public:
ObservedFutureWithContinuation(std::chrono::microseconds waitLimit,
std::future<TIn> f,
TFunc&& cont) :
TimedWaitable(std::move(waitLimit)),
f_(std::move(f)),
cont_(std::forward<TFunc>(cont))
{}

ObservedFutureWithContinuation(const ObservedFutureWithContinuation& o) = delete;
ObservedFutureWithContinuation& operator=(const ObservedFutureWithContinuation& o) = delete;

ObservedFutureWithContinuation(ObservedFutureWithContinuation&& o) = default;
ObservedFutureWithContinuation& operator=(ObservedFutureWithContinuation&& o) = default;

bool timedWait(const std::chrono::microseconds& timeout) override
{
return f_.wait_for(timeout) == std::future_status::ready;
}

void dispatch(std::exception_ptr err) override
{
if (err) {
std::rethrow_exception(err);
}
else {
cont_(std::move(f_));
}
}

private:
std::future<TIn> f_;
TFunc cont_;
};

} // namespace detail
} // namespace futures
} // namespace thousandeyes
117 changes: 117 additions & 0 deletions include/thousandeyes/futures/observe.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Copyright 2024 ThousandEyes, Inc.
*
* Use of this source code is governed by an MIT-style
* license that can be found in the LICENSE file or at
* https://opensource.org/licenses/MIT.
*
* @author Marcus Nutzinger, https://github.com/manutzin-te
*/

#pragma once

#include <future>
#include <memory>

#include <thousandeyes/futures/Default.h>
#include <thousandeyes/futures/detail/ObservedFutureWithContinuation.h>
#include <thousandeyes/futures/Executor.h>

namespace thousandeyes {
namespace futures {

//! \brief Observes the input futures and calls the given continuation function once
//! it becomes ready.
//!
//! \param executor The object that waits for the given future to become ready.
//! \param timeLimit The maximum time to wait for the given future to become ready.
//! \param f The input future to wait and invoke the continuation function on.
//! \param cont The continuation function to invoke on the ready input future.
//!
//! \note Any exceptions from the continuation or from the executor will be thrown
//! on the thread on which the continuation is scheduled.
//!
//! \sa WaitableTimedOutException
template <class TIn, class TFunc>
void observe(std::shared_ptr<Executor> executor,
std::chrono::microseconds timeLimit,
std::future<TIn> f,
TFunc&& cont)
{
executor->watch(std::make_unique<detail::ObservedFutureWithContinuation<TIn, TFunc>>(
std::move(timeLimit),
std::move(f),
std::forward<TFunc>(cont)));
}

//! \brief Observes the input futures and calls the given continuation function once
//! it becomes ready.
//!
//! \param executor The object that waits for the given future to become ready.
//! \param f The input future to wait and invoke the continuation function on.
//! \param cont The continuation function to invoke on the ready input future.
//!
//! \note If the total time for waiting the input future to become ready exceeds
//! a maximum threshold defined by the library (typically 1h), an exception of type
//! WaitableTimedOutException will be thrown on the thread on which the continuation
//! is scheduled.
//!
//! \sa WaitableTimedOutException
template <class TIn, class TFunc>
void observe(std::shared_ptr<Executor> executor, std::future<TIn> f, TFunc&& cont)
{
observe<TIn, TFunc>(std::move(executor),
std::chrono::hours(1),
std::move(f),
std::forward<TFunc>(cont));
}

//! \brief Observes the input futures and calls the given continuation function once
//! it becomes ready.
//!
//! \par This function uses the default Executor object to wait for the given futures
//! to become ready. If there isn't any default Executor object registered, this
//! function's behavior is undefined.
//!
//! \param timeLimit The maximum time to wait for the given future to become ready.
//! \param f The input future to wait and invoke the continuation function on.
//! \param cont The continuation function to invoke on the ready input future.
//!
//! \note If the total time for waiting the input future to become ready exceeds the
//! given timeLimit, an exception of type WaitableTimedOutException will be thrown on
//! the thread on which the continuation is scheduled.
//!
//! \sa Default, WaitableTimedOutException
template <class TIn, class TFunc>
void observe(std::chrono::microseconds timeLimit, std::future<TIn> f, TFunc&& cont)
{
observe<TIn, TFunc>(Default<Executor>(),
std::move(timeLimit),
std::move(f),
std::forward<TFunc>(cont));
}

//! \brief Observes the input futures and calls the given continuation function once
//! it becomes ready.
//!
//! \par This function uses the default Executor object to wait for the given futures
//! to become ready. If there isn't any default Executor object registered, this
//! function's behavior is undefined.
//!
//! \param f The input future to wait and invoke the continuation function on.
//! \param cont The continuation function to invoke on the ready input future.
//!
//! \note If the total time for waiting the input future to become ready exceeds
//! a maximum threshold defined by the library (typically 1h), an exception of
//! type WaitableTimedOutException will be thrown on the thread on which the
//! continuation is scheduled.
//!
//! \sa Default, WaitableTimedOutException
template <class TIn, class TFunc>
void observe(std::future<TIn> f, TFunc&& cont)
{
return observe<TIn, TFunc>(std::chrono::hours(1), std::move(f), std::forward<TFunc>(cont));
}

} // namespace futures
} // namespace thousandeyes
71 changes: 70 additions & 1 deletion tests/defaultexecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <array>
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <functional>
#include <future>
#include <map>
Expand All @@ -29,11 +30,13 @@

#include <thousandeyes/futures/all.h>
#include <thousandeyes/futures/DefaultExecutor.h>
#include <thousandeyes/futures/observe.h>
#include <thousandeyes/futures/then.h>
#include <thousandeyes/futures/util.h>

using std::array;
using std::bind;
using std::condition_variable;
using std::exception;
using std::function;
using std::future;
Expand All @@ -44,6 +47,7 @@ using std::make_shared;
using std::make_tuple;
using std::make_unique;
using std::map;
using std::move;
using std::mt19937;
using std::mutex;
using std::promise;
Expand All @@ -55,6 +59,7 @@ using std::thread;
using std::to_string;
using std::tuple;
using std::uniform_int_distribution;
using std::unique_lock;
using std::unique_ptr;
using std::vector;
using std::chrono::duration_cast;
Expand All @@ -71,13 +76,15 @@ using thousandeyes::futures::DefaultExecutor;
using thousandeyes::futures::Executor;
using thousandeyes::futures::fromException;
using thousandeyes::futures::fromValue;
using thousandeyes::futures::observe;
using thousandeyes::futures::then;
using thousandeyes::futures::Waitable;
using thousandeyes::futures::WaitableWaitException;

namespace detail = thousandeyes::futures::detail;

using ::testing::_;
using ::testing::UnorderedElementsAre;
using ::testing::Range;
using ::testing::Return;
using ::testing::SaveArg;
Expand Down Expand Up @@ -167,6 +174,36 @@ TEST_F(DefaultExecutorTest, ThenWithoutException)
executor->stop();
}

TEST_F(DefaultExecutorTest, ObserveWithoutException)
{
auto executor = make_shared<DefaultExecutor>(milliseconds(10));
Default<Executor>::Setter execSetter(executor);

mutex mtx;
condition_variable cv;
vector<int> result;

auto recordResult = [&](int num) {
lock_guard<mutex> lock(mtx);

result.push_back(num);
cv.notify_one();
};

observe(getValueAsync(1821), [&](future<int> f) { recordResult(f.get()); });

observe(getValueAsync(1822), [&](future<int> f) { recordResult(f.get()); });

{
unique_lock<mutex> lock(mtx);
cv.wait(lock, [&] { return result.size() == 2; });
}

EXPECT_THAT(result, UnorderedElementsAre(1821, 1822));

executor->stop();
}

TEST_F(DefaultExecutorTest, ThenWithException)
{
auto executor = make_shared<DefaultExecutor>(milliseconds(10));
Expand All @@ -180,6 +217,39 @@ TEST_F(DefaultExecutorTest, ThenWithException)
executor->stop();
}

using DefaultExecutorDeathTest = DefaultExecutorTest;

TEST_F(DefaultExecutorDeathTest, ObserveWithException)
{
GTEST_FLAG_SET(death_test_style, "threadsafe");

auto executor = make_shared<DefaultExecutor>(milliseconds(10));
Default<Executor>::Setter execSetter(executor);

mutex mtx;
condition_variable cv;

// This will never become true, as the continuation will rethrow the exception
// thrown by the future passed to observe(). It only serves as barrier to block
// the execution until observe() throws.
bool success = false;

EXPECT_DEATH(
{
observe(getExceptionAsync<SomeKindOfError>(), [](future<void> f) {
f.get();

FAIL() << "As f.get() throws this code should never be executed";
});

unique_lock<mutex> lock(mtx);
cv.wait(lock, [&]{ return success; });
},
_);

executor->stop();
}

TEST_F(DefaultExecutorTest, ThenWithVoidInputWithoutException)
{
auto executor = make_shared<DefaultExecutor>(milliseconds(10));
Expand Down Expand Up @@ -781,7 +851,6 @@ TEST_F(DefaultExecutorTest, TupleAllWithException)
}

namespace {

future<int> recFunc1(int count);
future<int> recFunc2(future<int> f);

Expand Down

0 comments on commit 7ae0b00

Please sign in to comment.