Skip to content

Commit

Permalink
wpiutil: uv::Async: Keep weak reference to loop
Browse files Browse the repository at this point in the history
Other handles can only be used within the loop itself, but Async is intended
to be used from another thread.  This introduces the possibility of a race
condition between the loop being destroyed and the Async being destroyed.
Change Async to keep a weak reference to a loop and check it before performing
libuv operations.
  • Loading branch information
PeterJohnson committed Oct 16, 2018
1 parent 1a7a0db commit 172e438
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 61 deletions.
15 changes: 11 additions & 4 deletions wpiutil/src/main/native/cpp/uv/Async.cpp
Expand Up @@ -12,14 +12,21 @@
namespace wpi {
namespace uv {

std::shared_ptr<Async<>> Async<>::Create(Loop& loop) {
auto h = std::make_shared<Async>(private_init{});
int err = uv_async_init(loop.GetRaw(), h->GetRaw(), [](uv_async_t* handle) {
Async<>::~Async() noexcept {
if (auto loop = m_loop.lock())
Close();
else
ForceClosed();
}

std::shared_ptr<Async<>> Async<>::Create(const std::shared_ptr<Loop>& loop) {
auto h = std::make_shared<Async>(loop, private_init{});
int err = uv_async_init(loop->GetRaw(), h->GetRaw(), [](uv_async_t* handle) {
Async& h = *static_cast<Async*>(handle->data);
h.wakeup();
});
if (err < 0) {
loop.ReportError(err);
loop->ReportError(err);
return nullptr;
}
h->Keep();
Expand Down
62 changes: 38 additions & 24 deletions wpiutil/src/main/native/include/wpi/uv/Async.h
Expand Up @@ -40,28 +40,22 @@ class Async final : public HandleImpl<Async<T...>, uv_async_t> {
struct private_init {};

public:
explicit Async(const private_init&) {}
~Async() noexcept override = default;
Async(const std::shared_ptr<Loop>& loop, const private_init&)
: m_loop{loop} {}
~Async() noexcept override {
if (auto loop = m_loop.lock())
this->Close();
else
this->ForceClosed();
}

/**
* Create an async handle.
*
* @param loop Loop object where this handle runs.
*/
static std::shared_ptr<Async> Create(Loop& loop) {
auto h = std::make_shared<Async>(private_init{});
int err = uv_async_init(loop.GetRaw(), h->GetRaw(), [](uv_async_t* handle) {
auto& h = *static_cast<Async*>(handle->data);
std::lock_guard<wpi::mutex> lock(h.m_mutex);
for (auto&& v : h.m_data) apply_tuple(h.wakeup, v);
h.m_data.clear();
});
if (err < 0) {
loop.ReportError(err);
return nullptr;
}
h->Keep();
return h;
return Create(loop.shared_from_this());
}

/**
Expand All @@ -70,7 +64,20 @@ class Async final : public HandleImpl<Async<T...>, uv_async_t> {
* @param loop Loop object where this handle runs.
*/
static std::shared_ptr<Async> Create(const std::shared_ptr<Loop>& loop) {
return Create(*loop);
auto h = std::make_shared<Async>(loop, private_init{});
int err =
uv_async_init(loop->GetRaw(), h->GetRaw(), [](uv_async_t* handle) {
auto& h = *static_cast<Async*>(handle->data);
std::lock_guard<wpi::mutex> lock(h.m_mutex);
for (auto&& v : h.m_data) apply_tuple(h.wakeup, v);
h.m_data.clear();
});
if (err < 0) {
loop->ReportError(err);
return nullptr;
}
h->Keep();
return h;
}

/**
Expand All @@ -85,7 +92,7 @@ class Async final : public HandleImpl<Async<T...>, uv_async_t> {
std::lock_guard<wpi::mutex> lock(m_mutex);
m_data.emplace_back(std::forward_as_tuple(std::forward<U>(u)...));
}
this->Invoke(&uv_async_send, this->GetRaw());
if (auto loop = m_loop.lock()) this->Invoke(&uv_async_send, this->GetRaw());
}

/**
Expand All @@ -96,6 +103,7 @@ class Async final : public HandleImpl<Async<T...>, uv_async_t> {
private:
wpi::mutex m_mutex;
std::vector<std::tuple<T...>> m_data;
std::weak_ptr<Loop> m_loop;
};

/**
Expand All @@ -107,37 +115,43 @@ class Async<> final : public HandleImpl<Async<>, uv_async_t> {
struct private_init {};

public:
explicit Async(const private_init&) {}
~Async() noexcept override = default;
Async(const std::shared_ptr<Loop>& loop, const private_init&)
: m_loop(loop) {}
~Async() noexcept override;

/**
* Create an async handle.
*
* @param loop Loop object where this handle runs.
*/
static std::shared_ptr<Async> Create(Loop& loop);
static std::shared_ptr<Async> Create(Loop& loop) {
return Create(loop.shared_from_this());
}

/**
* Create an async handle.
*
* @param loop Loop object where this handle runs.
*/
static std::shared_ptr<Async> Create(const std::shared_ptr<Loop>& loop) {
return Create(*loop);
}
static std::shared_ptr<Async> Create(const std::shared_ptr<Loop>& loop);

/**
* Wakeup the event loop and emit the event.
*
* It’s safe to call this function from any thread.
* An async event will be emitted on the loop thread.
*/
void Send() { Invoke(&uv_async_send, GetRaw()); }
void Send() {
if (auto loop = m_loop.lock()) Invoke(&uv_async_send, GetRaw());
}

/**
* Signal generated (on event loop thread) when the async event occurs.
*/
sig::Signal<> wakeup;

private:
std::weak_ptr<Loop> m_loop;
};

} // namespace uv
Expand Down
74 changes: 42 additions & 32 deletions wpiutil/src/main/native/include/wpi/uv/AsyncFunction.h
Expand Up @@ -40,9 +40,15 @@ class AsyncFunction<R(T...)> final
struct private_init {};

public:
AsyncFunction(std::function<void(promise<R>, T...)> func, const private_init&)
: wakeup{func} {}
~AsyncFunction() noexcept override = default;
AsyncFunction(const std::shared_ptr<Loop>& loop,
std::function<void(promise<R>, T...)> func, const private_init&)
: wakeup{func}, m_loop{loop} {}
~AsyncFunction() noexcept override {
if (auto loop = m_loop.lock())
this->Close();
else
this->ForceClosed();
}

/**
* Create an async handle.
Expand All @@ -55,33 +61,7 @@ class AsyncFunction<R(T...)> final
*/
static std::shared_ptr<AsyncFunction> Create(
Loop& loop, std::function<void(promise<R>, T...)> func = nullptr) {
auto h = std::make_shared<AsyncFunction>(std::move(func), private_init{});
int err = uv_async_init(loop.GetRaw(), h->GetRaw(), [](uv_async_t* handle) {
auto& h = *static_cast<AsyncFunction*>(handle->data);
std::unique_lock<wpi::mutex> lock(h.m_mutex);

if (!h.m_params.empty()) {
// for each set of parameters in the input queue, call the wakeup
// function and put the result in the output queue if the caller is
// waiting for it
for (auto&& v : h.m_params) {
auto p = h.m_promises.CreatePromise(v.first);
if (h.wakeup)
apply_tuple(h.wakeup, std::tuple_cat(std::make_tuple(std::move(p)),
std::move(v.second)));
}
h.m_params.clear();
// wake up any threads that might be waiting for the result
lock.unlock();
h.m_promises.Notify();
}
});
if (err < 0) {
loop.ReportError(err);
return nullptr;
}
h->Keep();
return h;
return Create(loop.shared_from_this(), std::move(func));
}

/**
Expand All @@ -96,7 +76,36 @@ class AsyncFunction<R(T...)> final
static std::shared_ptr<AsyncFunction> Create(
const std::shared_ptr<Loop>& loop,
std::function<void(promise<R>, T...)> func = nullptr) {
return Create(*loop, std::move(func));
auto h =
std::make_shared<AsyncFunction>(loop, std::move(func), private_init{});
int err =
uv_async_init(loop->GetRaw(), h->GetRaw(), [](uv_async_t* handle) {
auto& h = *static_cast<AsyncFunction*>(handle->data);
std::unique_lock<wpi::mutex> lock(h.m_mutex);

if (!h.m_params.empty()) {
// for each set of parameters in the input queue, call the wakeup
// function and put the result in the output queue if the caller is
// waiting for it
for (auto&& v : h.m_params) {
auto p = h.m_promises.CreatePromise(v.first);
if (h.wakeup)
apply_tuple(h.wakeup,
std::tuple_cat(std::make_tuple(std::move(p)),
std::move(v.second)));
}
h.m_params.clear();
// wake up any threads that might be waiting for the result
lock.unlock();
h.m_promises.Notify();
}
});
if (err < 0) {
loop->ReportError(err);
return nullptr;
}
h->Keep();
return h;
}

/**
Expand All @@ -123,7 +132,7 @@ class AsyncFunction<R(T...)> final
}

// signal the loop
this->Invoke(&uv_async_send, this->GetRaw());
if (auto loop = m_loop.lock()) this->Invoke(&uv_async_send, this->GetRaw());

// return future
return m_promises.CreateFuture(req);
Expand All @@ -143,6 +152,7 @@ class AsyncFunction<R(T...)> final
wpi::mutex m_mutex;
std::vector<std::pair<uint64_t, std::tuple<T...>>> m_params;
PromiseFactory<R> m_promises;
std::weak_ptr<Loop> m_loop;
};

} // namespace uv
Expand Down
5 changes: 4 additions & 1 deletion wpiutil/src/main/native/include/wpi/uv/Handle.h
Expand Up @@ -104,7 +104,9 @@ class Handle : public std::enable_shared_from_this<Handle> {
*
* @return True if the handle is closing or closed, false otherwise.
*/
bool IsClosing() const noexcept { return uv_is_closing(m_uv_handle) != 0; }
bool IsClosing() const noexcept {
return m_closed || uv_is_closing(m_uv_handle) != 0;
}

/**
* Request handle to be closed.
Expand Down Expand Up @@ -219,6 +221,7 @@ class Handle : public std::enable_shared_from_this<Handle> {

void Keep() noexcept { m_self = shared_from_this(); }
void Release() noexcept { m_self.reset(); }
void ForceClosed() noexcept { m_closed = true; }

static void AllocBuf(uv_handle_t* handle, size_t size, uv_buf_t* buf);
static void DefaultFreeBuf(Buffer& buf);
Expand Down

0 comments on commit 172e438

Please sign in to comment.