Skip to content

Commit

Permalink
Implement scheduler timers using asio deadline timers (#3278)
Browse files Browse the repository at this point in the history
Co-authored-by: cristofer_martins <cristofermartins@hotmail.com>
  • Loading branch information
ranisalt and yamaken93 committed Feb 11, 2021
1 parent 60231e7 commit 0d12ee8
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 109 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
@@ -1,5 +1,5 @@
os: linux
dist: bionic
dist: focal
language: cpp
compiler:
- clang
Expand Down
118 changes: 32 additions & 86 deletions src/scheduler.cpp
Expand Up @@ -20,116 +20,62 @@
#include "otpch.h"

#include "scheduler.h"

void Scheduler::threadMain()
{
std::unique_lock<std::mutex> eventLockUnique(eventLock, std::defer_lock);
while (getState() != THREAD_STATE_TERMINATED) {
std::cv_status ret = std::cv_status::no_timeout;

eventLockUnique.lock();
if (eventList.empty()) {
eventSignal.wait(eventLockUnique);
} else {
ret = eventSignal.wait_until(eventLockUnique, eventList.top()->getCycle());
}

// the mutex is locked again now...
if (ret == std::cv_status::timeout && !eventList.empty()) {
// ok we had a timeout, so there has to be an event we have to execute...
SchedulerTask* task = eventList.top();
eventList.pop();

// check if the event was stopped
auto it = eventIds.find(task->getEventId());
if (it == eventIds.end()) {
eventLockUnique.unlock();
delete task;
continue;
}
eventIds.erase(it);
eventLockUnique.unlock();

task->setDontExpire();
g_dispatcher.addTask(task, true);
} else {
eventLockUnique.unlock();
}
}
}
#include <boost/asio/post.hpp>
#include <memory>

uint32_t Scheduler::addEvent(SchedulerTask* task)
{
eventLock.lock();

if (getState() != THREAD_STATE_RUNNING) {
eventLock.unlock();
delete task;
return 0;
}

// check if the event has a valid id
if (task->getEventId() == 0) {
// if not generate one
if (++lastEventId == 0) {
lastEventId = 1;
}

task->setEventId(lastEventId);
task->setEventId(++lastEventId);
}

// insert the event id in the list of active events
uint32_t eventId = task->getEventId();
eventIds.insert(eventId);

// add the event to the queue
eventList.push(task);
auto it = eventIdTimerMap.emplace(task->getEventId(), boost::asio::deadline_timer{io_context});
auto& timer = it.first->second;

// if the list was empty or this event is the top in the list
// we have to signal it
bool do_signal = (task == eventList.top());
timer.expires_from_now(boost::posix_time::milliseconds(task->getDelay()));
timer.async_wait([this, task](const boost::system::error_code& error) {
eventIdTimerMap.erase(task->getEventId());

eventLock.unlock();
if (error == boost::asio::error::operation_aborted || getState() == THREAD_STATE_TERMINATED) {
// the timer has been manually cancelled(timer->cancel()) or Scheduler::shutdown has been called
delete task;
return;
}

if (do_signal) {
eventSignal.notify_one();
}
g_dispatcher.addTask(task, true);
});

return eventId;
return task->getEventId();
}

bool Scheduler::stopEvent(uint32_t eventId)
void Scheduler::stopEvent(uint32_t eventId)
{
if (eventId == 0) {
return false;
}

std::lock_guard<std::mutex> lockClass(eventLock);

// search the event id..
auto it = eventIds.find(eventId);
if (it == eventIds.end()) {
return false;
return;
}

eventIds.erase(it);
return true;
boost::asio::post(io_context, [this, eventId]() {
// search the event id..
auto it = eventIdTimerMap.find(eventId);
if (it != eventIdTimerMap.end()) {
it->second.cancel();
}
});
}

void Scheduler::shutdown()
{
setState(THREAD_STATE_TERMINATED);
eventLock.lock();

//this list should already be empty
while (!eventList.empty()) {
delete eventList.top();
eventList.pop();
}
boost::asio::post(io_context, [this]() {
// cancel all active timers
for (auto& it : eventIdTimerMap) {
it.second.cancel();
}

eventIds.clear();
eventLock.unlock();
eventSignal.notify_one();
io_context.stop();
});
}

SchedulerTask* createSchedulerTask(uint32_t delay, std::function<void (void)> f)
Expand Down
33 changes: 11 additions & 22 deletions src/scheduler.h
Expand Up @@ -21,8 +21,7 @@
#define FS_SCHEDULER_H_2905B3D5EAB34B4BA8830167262D2DC1

#include "tasks.h"
#include <unordered_set>
#include <queue>
#include <unordered_map>

#include "thread_holder_base.h"

Expand All @@ -38,44 +37,34 @@ class SchedulerTask : public Task
return eventId;
}

std::chrono::system_clock::time_point getCycle() const {
return expiration;
uint32_t getDelay() const {
return delay;
}

private:
SchedulerTask(uint32_t delay, std::function<void (void)>&& f) : Task(delay, std::move(f)) {}
SchedulerTask(uint32_t delay, std::function<void (void)>&& f) : Task(std::move(f)), delay(delay) {}

uint32_t eventId = 0;
uint32_t delay = 0;

friend SchedulerTask* createSchedulerTask(uint32_t, std::function<void (void)>);
};

SchedulerTask* createSchedulerTask(uint32_t delay, std::function<void (void)> f);

struct TaskComparator {
bool operator()(const SchedulerTask* lhs, const SchedulerTask* rhs) const {
return lhs->getCycle() > rhs->getCycle();
}
};

class Scheduler : public ThreadHolder<Scheduler>
{
public:
uint32_t addEvent(SchedulerTask* task);
bool stopEvent(uint32_t eventId);
void stopEvent(uint32_t eventId);

void shutdown();

void threadMain();

void threadMain() { io_context.run(); }
private:
std::thread thread;
std::mutex eventLock;
std::condition_variable eventSignal;

uint32_t lastEventId {0};
std::priority_queue<SchedulerTask*, std::deque<SchedulerTask*>, TaskComparator> eventList;
std::unordered_set<uint32_t> eventIds;
std::atomic<uint32_t> lastEventId{0};
std::unordered_map<uint32_t, boost::asio::deadline_timer> eventIdTimerMap;
boost::asio::io_context io_context;
boost::asio::io_context::work work{io_context};
};

extern Scheduler g_scheduler;
Expand Down

0 comments on commit 0d12ee8

Please sign in to comment.