Skip to content

Commit

Permalink
refactor coroutine cancelled set
Browse files Browse the repository at this point in the history
  • Loading branch information
netcan committed Dec 8, 2021
1 parent 6511af9 commit 23e6a38
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 19 deletions.
32 changes: 22 additions & 10 deletions include/asyncio/event_loop.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
#include <asyncio/concept/future.h>
#include <asyncio/selector/selector.h>
#include <utility>
#include <set>
#include <unordered_set>
#include <algorithm>
#include <queue>
#include <chrono>
Expand All @@ -17,10 +17,16 @@
ASYNCIO_NS_BEGIN
class EventLoop: private NonCopyable {
using MSDuration = std::chrono::milliseconds;
// handle maybe destroyed, using the increasing id to track the lifetime of handle.
// don't directly using a raw pointer to track coroutine lifetime,
// because a destroyed coroutine may has the same address as a new ready coroutine has created.
struct HandleInfo {
HandleId id;
Handle* handle;
};

public:
EventLoop() {
auto now = std::chrono::system_clock::now();
EventLoop() { auto now = std::chrono::system_clock::now();
start_time_ = duration_cast<MSDuration>(now.time_since_epoch());
}

Expand All @@ -29,6 +35,10 @@ class EventLoop: private NonCopyable {
return duration_cast<MSDuration>(now.time_since_epoch()) - start_time_;
}

HandleId allocate_handle_id() {
return handle_alloc_id_++;
}

bool is_stop() {
return schedule_.empty() && ready_.empty() && selector_.is_stop();
}
Expand All @@ -41,17 +51,18 @@ class EventLoop: private NonCopyable {
template<typename Rep, typename Period>
void call_at(std::chrono::duration<Rep, Period> when, Handle& callback) {
callback.set_state(PromiseState::PENDING);
schedule_.emplace_back(std::make_pair(duration_cast<MSDuration>(when), &callback));
schedule_.emplace_back(std::make_pair(duration_cast<MSDuration>(when),
HandleInfo{callback.get_handle_id(), &callback}));
std::ranges::push_heap(schedule_, std::ranges::greater{}, &TimerHandle::first);
}

void cancel_handle(Handle& handle) {
cancelled_.insert(&handle);
cancelled_.insert(handle.get_handle_id());
}

void call_soon(Handle& callback) {
callback.set_state(PromiseState::PENDING);
ready_.emplace(&callback);
ready_.push({callback.get_handle_id(), &callback});
}

template<concepts::Future Fut>
Expand Down Expand Up @@ -90,14 +101,15 @@ class EventLoop: private NonCopyable {

private:
MSDuration start_time_;
std::queue<Handle*> ready_;
std::set<Handle*> cancelled_;
std::queue<HandleInfo> ready_;
std::unordered_set<HandleId> cancelled_;
Selector selector_;
using TimerHandle = std::pair<MSDuration, Handle*>;
using TimerHandle = std::pair<MSDuration, HandleInfo>;
std::vector<TimerHandle> schedule_; // min time heap
static HandleId handle_alloc_id_;
};

EventLoop& get_event_loop();
ASYNCIO_NS_END

#endif // ASYNCIO_EVENT_LOOP_H
#endif // ASYNCIO_EVENT_LOOP_H
9 changes: 9 additions & 0 deletions include/asyncio/handle.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ enum class PromiseState: uint8_t {
PENDING,
};

// for cancelled
using HandleId = uint64_t;

struct Handle { // type erase for EventLoop
virtual void run() = 0;
std::string frame_name() const {
Expand All @@ -24,12 +27,18 @@ struct Handle { // type erase for EventLoop
}
virtual void dump_backtrace(size_t depth = 0) const {};
virtual void set_state(PromiseState state) {}
HandleId get_handle_id() { return handle_id_; }
Handle() noexcept;
virtual ~Handle() = default;

private:
virtual const std::source_location& get_frame_info() const {
static const std::source_location frame_info = std::source_location::current();
return frame_info;
}

private:
HandleId handle_id_;
};

ASYNCIO_NS_END
Expand Down
2 changes: 1 addition & 1 deletion include/asyncio/task.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,4 +148,4 @@ struct Task: private NonCopyable {
};

ASYNCIO_NS_END
#endif // ASYNCIO_TASK_H
#endif // ASYNCIO_TASK_H
22 changes: 14 additions & 8 deletions src/event_loop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ void EventLoop::run_once() {
std::optional<MSDuration> timeout;
// Remove delayed calls that were cancelled from head of queue.
while (! schedule_.empty()) {
auto&& [when, handle] = schedule_[0];
if (auto iter = cancelled_.find(handle); iter != cancelled_.end()) {
auto&& [when, handle_info] = schedule_[0];
if (auto iter = cancelled_.find(handle_info.id); iter != cancelled_.end()) {
ranges::pop_heap(schedule_,std::ranges::greater{}, &TimerHandle::first);
schedule_.pop_back();
cancelled_.erase(iter);
Expand All @@ -43,27 +43,33 @@ void EventLoop::run_once() {
auto event_lists = selector_.select(timeout.has_value() ? timeout->count() : -1);
for (auto&& event: event_lists) {
Handle* continuation_ = reinterpret_cast<Handle*>(event.data);
ready_.emplace(continuation_);
ready_.push({continuation_->get_handle_id(), continuation_});
}

auto end_time = time();
while (! schedule_.empty()) {
auto&& [when, handle] = schedule_[0];
auto&& [when, handle_info] = schedule_[0];
if (when >= end_time) break;
ready_.emplace(handle);
ready_.push(handle_info);
ranges::pop_heap(schedule_,std::ranges::greater{}, &TimerHandle::first);
schedule_.pop_back();
}

for (size_t ntodo = ready_.size(), i = 0; i < ntodo ; ++i ) {
auto handle = ready_.front();
auto [handle_id, handle] = ready_.front();
ready_.pop();
if (auto iter = cancelled_.find(handle); iter != cancelled_.end()) {
if (auto iter = cancelled_.find(handle_id); iter != cancelled_.end()) {
cancelled_.erase(iter);
} else {
handle->run();
}
}
}

ASYNCIO_NS_END
HandleId EventLoop::handle_alloc_id_ = 0;

Handle::Handle() noexcept {
auto& loop = get_event_loop();
handle_id_ = loop.allocate_handle_id();
}
ASYNCIO_NS_END

0 comments on commit 23e6a38

Please sign in to comment.