Skip to content

Commit

Permalink
Fix the future chaining
Browse files Browse the repository at this point in the history
  • Loading branch information
palikar committed Jul 28, 2020
1 parent 6d02d9d commit f94ea79
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 14 deletions.
8 changes: 4 additions & 4 deletions src/alisp/src/async/asyncs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -287,17 +287,15 @@ void AsyncS::submit_future(uint32_t t_id, ALObjectPtr t_value, bool t_good)

Future::resolve(t_id);

m_eval->futures_cv.notify_all();

if (t_good and pfunction(fut.success_callback))
{
submit_callback(fut.success_callback, make_list(t_value), [&, next = fut.next_in_line](auto res) {
if (next > 0)
{

if (pint(res) and future_registry.belong(object_to_resource(res)))
auto other = object_to_resource(res);
if (pint(res) and future_registry.belong(other))
{
auto other = object_to_resource(res);
Future::merge(other, next);
return;
}
Expand All @@ -317,6 +315,8 @@ void AsyncS::submit_future(uint32_t t_id, ALObjectPtr t_value, bool t_good)
fut.internal(fut.value);
}

m_eval->futures_cv.notify_all();

init();
}

Expand Down
14 changes: 9 additions & 5 deletions src/alisp/src/async/future.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,6 @@ void Future::dispose_future(uint32_t t_id)

void Future::resolve(uint32_t t_id)
{
if (!future_registry.belong(t_id))
{
return;
}

if (!is_truthy(future_registry[t_id].resolved))
{
Expand All @@ -80,14 +76,22 @@ ALObjectPtr Future::future_resolved(uint32_t t_id)

void Future::merge(uint32_t t_next, uint32_t t_current)
{
std::lock_guard<std::mutex> lock(Future::future_mutex);

if (!future_registry.belong(t_next) or !future_registry.belong(t_current))
{
return;
}

future_registry[t_next].value = future_registry[t_current].value;
future_registry[t_next].success_state = future_registry[t_current].resolved;
future_registry[t_next].success_callback = future_registry[t_current].success_callback;
future_registry[t_next].reject_callback = future_registry[t_current].reject_callback;
future_registry[t_next].internal = future_registry[t_current].internal;
future_registry[t_next].next_in_line = future_registry[t_current].next_in_line;

Future::dispose_future(t_current);
--m_pending_futures;
future_registry.destroy_resource(t_current);
}

Future &Future::future(uint32_t t_id)
Expand Down
11 changes: 6 additions & 5 deletions src/alisp/src/modules/async.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,12 @@ struct async_start
future.success_callback = arg_eval(eval, obj, 1);
}

eval->async().submit_callback(action, nullptr, [&async = eval->async(), future = future_id](auto value) {
async.submit_future(future, value);
});
eval->async().submit_callback(
action, nullptr, [&async = eval->async(), future = future_id, env = env](auto value) {
async.submit_future(future, value);
env->defer_callback([id = future]() { async::Future::dispose_future(id); });
});

env->defer_callback([id = future_id]() { async::Future::dispose_future(id); });

return resource_to_object(future_id);
}
Expand Down Expand Up @@ -87,7 +88,7 @@ struct async_then
{
static inline const std::string name{ "async-then" };

static inline const std::string doc{ R"((async-ready FUTURE))" };
static inline const std::string doc{ R"((async-ready FUTURE FUNCTION [FUNCTION]))" };

static inline const Signature signature{ Int{}, Function{}, Optional{}, Function{} };

Expand Down

0 comments on commit f94ea79

Please sign in to comment.