diff --git a/.travis.yml b/.travis.yml index 2d5e5b21d2..4142bb7921 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,5 +1,5 @@ os: linux -dist: bionic +dist: focal language: cpp compiler: - clang diff --git a/src/scheduler.cpp b/src/scheduler.cpp index f489f613ff..62c290edd6 100644 --- a/src/scheduler.cpp +++ b/src/scheduler.cpp @@ -20,116 +20,62 @@ #include "otpch.h" #include "scheduler.h" - -void Scheduler::threadMain() -{ - std::unique_lock eventLockUnique(eventLock, std::defer_lock); - while (getState() != THREAD_STATE_TERMINATED) { - std::cv_status ret = std::cv_status::no_timeout; - - eventLockUnique.lock(); - if (eventList.empty()) { - eventSignal.wait(eventLockUnique); - } else { - ret = eventSignal.wait_until(eventLockUnique, eventList.top()->getCycle()); - } - - // the mutex is locked again now... - if (ret == std::cv_status::timeout && !eventList.empty()) { - // ok we had a timeout, so there has to be an event we have to execute... - SchedulerTask* task = eventList.top(); - eventList.pop(); - - // check if the event was stopped - auto it = eventIds.find(task->getEventId()); - if (it == eventIds.end()) { - eventLockUnique.unlock(); - delete task; - continue; - } - eventIds.erase(it); - eventLockUnique.unlock(); - - task->setDontExpire(); - g_dispatcher.addTask(task, true); - } else { - eventLockUnique.unlock(); - } - } -} +#include +#include uint32_t Scheduler::addEvent(SchedulerTask* task) { - eventLock.lock(); - - if (getState() != THREAD_STATE_RUNNING) { - eventLock.unlock(); - delete task; - return 0; - } - // check if the event has a valid id if (task->getEventId() == 0) { - // if not generate one - if (++lastEventId == 0) { - lastEventId = 1; - } - - task->setEventId(lastEventId); + task->setEventId(++lastEventId); } // insert the event id in the list of active events - uint32_t eventId = task->getEventId(); - eventIds.insert(eventId); - - // add the event to the queue - eventList.push(task); + auto it = eventIdTimerMap.emplace(task->getEventId(), boost::asio::deadline_timer{io_context}); + auto& timer = it.first->second; - // if the list was empty or this event is the top in the list - // we have to signal it - bool do_signal = (task == eventList.top()); + timer.expires_from_now(boost::posix_time::milliseconds(task->getDelay())); + timer.async_wait([this, task](const boost::system::error_code& error) { + eventIdTimerMap.erase(task->getEventId()); - eventLock.unlock(); + if (error == boost::asio::error::operation_aborted || getState() == THREAD_STATE_TERMINATED) { + // the timer has been manually cancelled(timer->cancel()) or Scheduler::shutdown has been called + delete task; + return; + } - if (do_signal) { - eventSignal.notify_one(); - } + g_dispatcher.addTask(task, true); + }); - return eventId; + return task->getEventId(); } -bool Scheduler::stopEvent(uint32_t eventId) +void Scheduler::stopEvent(uint32_t eventId) { if (eventId == 0) { - return false; - } - - std::lock_guard lockClass(eventLock); - - // search the event id.. - auto it = eventIds.find(eventId); - if (it == eventIds.end()) { - return false; + return; } - eventIds.erase(it); - return true; + boost::asio::post(io_context, [this, eventId]() { + // search the event id.. + auto it = eventIdTimerMap.find(eventId); + if (it != eventIdTimerMap.end()) { + it->second.cancel(); + } + }); } void Scheduler::shutdown() { setState(THREAD_STATE_TERMINATED); - eventLock.lock(); - - //this list should already be empty - while (!eventList.empty()) { - delete eventList.top(); - eventList.pop(); - } + boost::asio::post(io_context, [this]() { + // cancel all active timers + for (auto& it : eventIdTimerMap) { + it.second.cancel(); + } - eventIds.clear(); - eventLock.unlock(); - eventSignal.notify_one(); + io_context.stop(); + }); } SchedulerTask* createSchedulerTask(uint32_t delay, std::function f) diff --git a/src/scheduler.h b/src/scheduler.h index e42f59f9ea..cb3ad780d5 100644 --- a/src/scheduler.h +++ b/src/scheduler.h @@ -21,8 +21,7 @@ #define FS_SCHEDULER_H_2905B3D5EAB34B4BA8830167262D2DC1 #include "tasks.h" -#include -#include +#include #include "thread_holder_base.h" @@ -38,44 +37,34 @@ class SchedulerTask : public Task return eventId; } - std::chrono::system_clock::time_point getCycle() const { - return expiration; + uint32_t getDelay() const { + return delay; } - private: - SchedulerTask(uint32_t delay, std::function&& f) : Task(delay, std::move(f)) {} + SchedulerTask(uint32_t delay, std::function&& f) : Task(std::move(f)), delay(delay) {} uint32_t eventId = 0; + uint32_t delay = 0; friend SchedulerTask* createSchedulerTask(uint32_t, std::function); }; SchedulerTask* createSchedulerTask(uint32_t delay, std::function f); -struct TaskComparator { - bool operator()(const SchedulerTask* lhs, const SchedulerTask* rhs) const { - return lhs->getCycle() > rhs->getCycle(); - } -}; - class Scheduler : public ThreadHolder { public: uint32_t addEvent(SchedulerTask* task); - bool stopEvent(uint32_t eventId); + void stopEvent(uint32_t eventId); void shutdown(); - void threadMain(); - + void threadMain() { io_context.run(); } private: - std::thread thread; - std::mutex eventLock; - std::condition_variable eventSignal; - - uint32_t lastEventId {0}; - std::priority_queue, TaskComparator> eventList; - std::unordered_set eventIds; + std::atomic lastEventId{0}; + std::unordered_map eventIdTimerMap; + boost::asio::io_context io_context; + boost::asio::io_context::work work{io_context}; }; extern Scheduler g_scheduler;