Skip to content

Commit

Permalink
Merge 2c066b6 into e12c969
Browse files Browse the repository at this point in the history
  • Loading branch information
julianoes committed Oct 25, 2017
2 parents e12c969 + 2c066b6 commit 0eb6c08
Show file tree
Hide file tree
Showing 13 changed files with 459 additions and 128 deletions.
2 changes: 2 additions & 0 deletions CMakeLists.txt
Expand Up @@ -127,6 +127,7 @@ add_library(dronecore STATIC
core/curl_wrapper.cpp
core/http_loader.cpp
core/timeout_handler.cpp
core/call_every_handler.cpp
${CMAKE_CURRENT_BINARY_DIR}/core/device_plugin_container.cpp
${plugin_source_files}
)
Expand Down Expand Up @@ -185,6 +186,7 @@ if(NOT IOS AND NOT ANDROID)
core/unittests_main.cpp
core/http_loader_test.cpp
core/timeout_handler_test.cpp
core/call_every_handler_test.cpp
${plugin_unittest_source_files}
${unit_tests_src}
)
Expand Down
87 changes: 87 additions & 0 deletions core/call_every_handler.cpp
@@ -0,0 +1,87 @@
#include "call_every_handler.h"

namespace dronecore {

CallEveryHandler::CallEveryHandler()
{
}

CallEveryHandler::~CallEveryHandler()
{
}

void CallEveryHandler::add(std::function<void()> callback, float interval_s, void **cookie)
{
auto new_entry = std::make_shared<Entry>();
new_entry->callback = callback;
new_entry->last_time = steady_time();
new_entry->interval_s = interval_s;

void *new_cookie = static_cast<void *>(new_entry.get());

{
std::lock_guard<std::mutex> lock(_entries_mutex);
_entries.insert(std::pair<void *, std::shared_ptr<Entry>>(new_cookie, new_entry));
}

if (cookie != nullptr) {
*cookie = new_cookie;
}
}

void CallEveryHandler::change(float interval_s, const void *cookie)
{
std::lock_guard<std::mutex> lock(_entries_mutex);

auto it = _entries.find(const_cast<void *>(cookie));
if (it != _entries.end()) {
it->second->interval_s = interval_s;
}
}

void CallEveryHandler::reset(const void *cookie)
{
std::lock_guard<std::mutex> lock(_entries_mutex);

auto it = _entries.find(const_cast<void *>(cookie));
if (it != _entries.end()) {
it->second->last_time = steady_time();
}
}

void CallEveryHandler::remove(const void *cookie)
{
std::lock_guard<std::mutex> lock(_entries_mutex);

auto it = _entries.find(const_cast<void *>(cookie));
if (it != _entries.end()) {
_entries.erase(const_cast<void *>(cookie));
}
}

void CallEveryHandler::run_once()
{
_entries_mutex.lock();

for (auto it = _entries.begin(); it != _entries.end(); ++it) {

if (elapsed_since_s(it->second->last_time) > double(it->second->interval_s)) {

shift_steady_time_by(it->second->last_time, double(it->second->interval_s));

if (it->second->callback) {

// Get a copy for the callback because we unlock.
std::function<void()> callback = it->second->callback;

// Unlock while we callback because it might in turn want to add timeouts.
_entries_mutex.unlock();
callback();
_entries_mutex.lock();
}
}
}
_entries_mutex.unlock();
}

} // namespace dronecore
41 changes: 41 additions & 0 deletions core/call_every_handler.h
@@ -0,0 +1,41 @@
#pragma once

#include <mutex>
#include <memory>
#include <functional>
#include <map>
#include "global_include.h"

namespace dronecore {

class CallEveryHandler
{
public:
CallEveryHandler();
~CallEveryHandler();

// delete copy and move constructors and assign operators
CallEveryHandler(CallEveryHandler const &) = delete; // Copy construct
CallEveryHandler(CallEveryHandler &&) = delete; // Move construct
CallEveryHandler &operator=(CallEveryHandler const &) = delete; // Copy assign
CallEveryHandler &operator=(CallEveryHandler &&) = delete; // Move assign

void add(std::function<void()> callback, float interval_s, void **cookie);
void change(float interval_s, const void *cookie);
void reset(const void *cookie);
void remove(const void *cookie);

void run_once();

private:
struct Entry {
std::function<void()> callback;
dl_time_t last_time;
float interval_s;
};

std::map<void *, std::shared_ptr<Entry>> _entries {};
std::mutex _entries_mutex {};
};

} // namespace dronecore
123 changes: 123 additions & 0 deletions core/call_every_handler_test.cpp
@@ -0,0 +1,123 @@
#include "call_every_handler.h"
#include <gtest/gtest.h>
#include <atomic>
#include "log.h"

using namespace dronecore;

TEST(CallEveryHandler, Single)
{
CallEveryHandler ceh;

int num_called = 0;

void *cookie = nullptr;
ceh.add([&num_called]() { ++num_called; }, 0.1f, &cookie);

for (int i = 0; i < 11; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
ceh.run_once();
}
EXPECT_EQ(num_called, 1);

UNUSED(cookie);
}

TEST(CallEveryHandler, Multiple)
{
CallEveryHandler ceh;

int num_called = 0;

void *cookie = nullptr;
ceh.add([&num_called]() { ++num_called; }, 0.1f, &cookie);

for (int i = 0; i < 10; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
ceh.run_once();
}
EXPECT_EQ(num_called, 10);

num_called = 0;
ceh.change(0.2f, cookie);

for (int i = 0; i < 20; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
ceh.run_once();
}

EXPECT_EQ(num_called, 10);

num_called = 0;
ceh.remove(cookie);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
ceh.run_once();
EXPECT_EQ(num_called, 0);
}

TEST(CallEveryHandler, InParallel)
{
CallEveryHandler ceh;

int num_called1 = 0;
int num_called2 = 0;

void *cookie1 = nullptr;
void *cookie2 = nullptr;
ceh.add([&num_called1]() { ++num_called1; }, 0.1f, &cookie1);
ceh.add([&num_called2]() { ++num_called2; }, 0.2f, &cookie2);

for (int i = 0; i < 10; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
ceh.run_once();
}

EXPECT_EQ(num_called1, 10);
EXPECT_EQ(num_called2, 5);

num_called1 = 0;
num_called2 = 0;

ceh.change(0.4f, cookie1);
ceh.change(0.1f, cookie2);

for (int i = 0; i < 10; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
ceh.run_once();
}

EXPECT_EQ(num_called1, 2);
EXPECT_EQ(num_called2, 10);
}

TEST(CallEveryHandler, Reset)
{
CallEveryHandler ceh;

int num_called = 0;

void *cookie = nullptr;
ceh.add([&num_called]() { ++num_called; }, 0.1f, &cookie);

for (int i = 0; i < 8; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
ceh.run_once();
if (i == 8) {
}
}
EXPECT_EQ(num_called, 0);

ceh.reset(cookie);

for (int i = 0; i < 8; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
ceh.run_once();
}
EXPECT_EQ(num_called, 0);

for (int i = 0; i < 3; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
ceh.run_once();
}
EXPECT_EQ(num_called, 1);
}
21 changes: 21 additions & 0 deletions core/device_impl.cpp
Expand Up @@ -105,6 +105,26 @@ void DeviceImpl::process_mavlink_message(const mavlink_message_t &message)
}
}

void DeviceImpl::add_call_every(std::function<void()> callback, float interval_s, void **cookie)
{
_call_every_handler.add(callback, interval_s, cookie);
}

void DeviceImpl::change_call_every(float interval_s, const void *cookie)
{
_call_every_handler.change(interval_s, cookie);
}

void DeviceImpl::reset_call_every(const void *cookie)
{
_call_every_handler.reset(cookie);
}

void DeviceImpl::remove_call_every(const void *cookie)
{
_call_every_handler.remove(cookie);
}

void DeviceImpl::process_heartbeat(const mavlink_message_t &message)
{
mavlink_heartbeat_t heartbeat;
Expand Down Expand Up @@ -220,6 +240,7 @@ void DeviceImpl::device_thread(DeviceImpl *self)
last_time = steady_time();
}

self->_call_every_handler.run_once();
self->_timeout_handler.run_once();
self->_params.do_work();
self->_commands.do_work();
Expand Down
9 changes: 7 additions & 2 deletions core/device_impl.h
Expand Up @@ -5,6 +5,7 @@
#include "mavlink_parameters.h"
#include "mavlink_commands.h"
#include "timeout_handler.h"
#include "call_every_handler.h"
#include <cstdint>
#include <functional>
#include <atomic>
Expand Down Expand Up @@ -37,11 +38,14 @@ class DeviceImpl
void register_timeout_handler(std::function<void()> callback,
double duration_s,
void **cookie);

void refresh_timeout_handler(const void *cookie);

void unregister_timeout_handler(const void *cookie);

void add_call_every(std::function<void()> callback, float interval_s, void **cookie);
void change_call_every(float interval_s, const void *cookie);
void reset_call_every(const void *cookie);
void remove_call_every(const void *cookie);

bool send_message(const mavlink_message_t &message);

MavlinkCommands::Result send_command_with_ack(uint16_t command,
Expand Down Expand Up @@ -158,6 +162,7 @@ class DeviceImpl
MavlinkCommands _commands;

TimeoutHandler _timeout_handler {};
CallEveryHandler _call_every_handler {};
};


Expand Down
4 changes: 4 additions & 0 deletions core/global_include.cpp
Expand Up @@ -39,6 +39,10 @@ dl_time_t steady_time_in_future(double duration_s)
return now + std::chrono::milliseconds(int64_t(duration_s * 1e3));
}

void shift_steady_time_by(dl_time_t &time, double offset_s)
{
time += std::chrono::milliseconds(int64_t(offset_s * 1e3));
}

double to_rad_from_deg(double deg)
{
Expand Down
1 change: 1 addition & 0 deletions core/global_include.h
Expand Up @@ -35,6 +35,7 @@ typedef std::chrono::time_point<std::chrono::steady_clock> dl_time_t;

dl_time_t steady_time();
dl_time_t steady_time_in_future(double duration_s);
void shift_steady_time_by(dl_time_t &time, double offset_s);

double elapsed_s();
double elapsed_since_s(const dl_time_t &since);
Expand Down

0 comments on commit 0eb6c08

Please sign in to comment.