Skip to content

Commit

Permalink
Merge pull request #2290 from mavlink/pr-log-streaming-drops
Browse files Browse the repository at this point in the history
log_streaming: make it robust to message drops
  • Loading branch information
julianoes committed May 13, 2024
2 parents dddc6fe + ab853be commit 8932698
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 55 deletions.
15 changes: 12 additions & 3 deletions examples/log_streaming/log_streaming.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,19 @@ using std::this_thread::sleep_for;

void usage(const std::string& bin_name)
{
std::cerr << "Usage : " << bin_name << " <connection_url> [--rm]\n"
std::cerr << "Usage : " << bin_name << " <connection_url> [--drop]\n"
<< '\n'
<< "Connection URL format should be :\n"
<< " For TCP : tcp://[server_host][:server_port]\n"
<< " For UDP : udp://[bind_host][:bind_port]\n"
<< " For Serial : serial:///path/to/serial/dev[:baudrate]\n"
<< "For example, to connect to the simulator use URL: udp://:14540" << std::endl;
<< "For example, to connect to the simulator use URL: udp://:14540" << '\n'
<< "--drop To drop some of the messages" << std::endl;
}

int main(int argc, char** argv)
{
if (argc > 2) {
if (argc < 2) {
usage(argv[0]);
return 1;
}
Expand All @@ -49,6 +50,14 @@ int main(int argc, char** argv)
return 1;
}

// To simulate message drops.
if (argc == 3 && std::string(argv[2]) == "--drop") {
std::cout << "Dropping some messages" << std::endl;
unsigned counter = 0;
mavsdk.intercept_incoming_messages_async(
[&](const mavlink_message_t&) { return counter++ % 10 != 0; });
}

// Create file to log to.
// Get current time
auto now = std::chrono::system_clock::now();
Expand Down
140 changes: 89 additions & 51 deletions src/mavsdk/plugins/log_streaming/log_streaming_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,28 +178,45 @@ void LogStreamingImpl::process_logging_data(const mavlink_message_t& message)
}

std::lock_guard<std::mutex> lock(_mutex);
check_sequence(logging_data.sequence);

if (logging_data.first_message_offset == std::numeric_limits<uint8_t>::max()) {
_ulog_data.insert(
_ulog_data.end(), logging_data.data, logging_data.data + logging_data.length);
} else {
if (logging_data.first_message_offset > sizeof(logging_data.data)) {
LogWarn() << "Invalid first_message_offset";
return;
}

_ulog_data.insert(
_ulog_data.end(),
logging_data.data,
logging_data.data + logging_data.first_message_offset);
process_message();
_ulog_data.clear();

_ulog_data.insert(
_ulog_data.end(),
logging_data.data + logging_data.first_message_offset,
logging_data.data + logging_data.length);
auto drop_state = check_sequence(logging_data.sequence);

switch (drop_state) {
case DropState::Ok:
if (logging_data.first_message_offset == std::numeric_limits<uint8_t>::max()) {
_ulog_data.insert(
_ulog_data.end(), logging_data.data, logging_data.data + logging_data.length);

} else {
if (logging_data.first_message_offset > sizeof(logging_data.data)) {
LogWarn() << "Invalid first_message_offset";
return;
}

_ulog_data.insert(
_ulog_data.end(),
logging_data.data,
logging_data.data + logging_data.first_message_offset);
process_message();

_ulog_data.clear();
_ulog_data.insert(
_ulog_data.end(),
logging_data.data + logging_data.first_message_offset,
logging_data.data + logging_data.length);
}
break;

case DropState::Dropped:
_ulog_data.clear();
_ulog_data.insert(
_ulog_data.end(),
logging_data.data + logging_data.first_message_offset,
logging_data.data + logging_data.length);
break;

case DropState::Duplicate:
// Ignore.
break;
}
}

Expand Down Expand Up @@ -254,46 +271,62 @@ void LogStreamingImpl::process_logging_data_acked(const mavlink_message_t& messa
}

std::lock_guard<std::mutex> lock(_mutex);
check_sequence(logging_data_acked.sequence);

if (logging_data_acked.first_message_offset == std::numeric_limits<uint8_t>::max()) {
_ulog_data.insert(
_ulog_data.end(),
logging_data_acked.data,
logging_data_acked.data + logging_data_acked.length);
} else {
if (logging_data_acked.first_message_offset > sizeof(logging_data_acked.data)) {
LogWarn() << "Invalid first_message_offset";
return;
}

_ulog_data.insert(
_ulog_data.end(),
logging_data_acked.data,
logging_data_acked.data + logging_data_acked.first_message_offset);
process_message();
_ulog_data.clear();

_ulog_data.insert(
_ulog_data.end(),
logging_data_acked.data + logging_data_acked.first_message_offset,
logging_data_acked.data + logging_data_acked.length);
auto drop_state = check_sequence(logging_data_acked.sequence);

switch (drop_state) {
case DropState::Ok:
if (logging_data_acked.first_message_offset == std::numeric_limits<uint8_t>::max()) {
_ulog_data.insert(
_ulog_data.end(),
logging_data_acked.data,
logging_data_acked.data + logging_data_acked.length);
} else {
if (logging_data_acked.first_message_offset > sizeof(logging_data_acked.data)) {
LogWarn() << "Invalid first_message_offset";
return;
}

_ulog_data.insert(
_ulog_data.end(),
logging_data_acked.data,
logging_data_acked.data + logging_data_acked.first_message_offset);
process_message();
_ulog_data.clear();

_ulog_data.insert(
_ulog_data.end(),
logging_data_acked.data + logging_data_acked.first_message_offset,
logging_data_acked.data + logging_data_acked.length);
}
break;

case DropState::Dropped:
_ulog_data.clear();
_ulog_data.insert(
_ulog_data.end(),
logging_data_acked.data + logging_data_acked.first_message_offset,
logging_data_acked.data + logging_data_acked.length);
break;

case DropState::Duplicate:
// Ignore.
break;
}
}

void LogStreamingImpl::check_sequence(uint16_t sequence)
LogStreamingImpl::DropState LogStreamingImpl::check_sequence(uint16_t sequence)
{
// Assume we have lock.

if (!_maybe_current_sequence) {
// This is the first time we use the sequence.
_maybe_current_sequence = sequence;
return;
return DropState::Ok;
}

if (_maybe_current_sequence.value() == sequence) {
// Duplicate
return;
return DropState::Duplicate;
}

if (sequence > _maybe_current_sequence.value()) {
Expand All @@ -303,6 +336,10 @@ void LogStreamingImpl::check_sequence(uint16_t sequence)
if (drop > 0 && _debugging) {
LogDebug() << "Dropped: " << drop << " (no wrap around), overall: " << _drops;
}

_maybe_current_sequence = sequence;
return drop > 0 ? DropState::Dropped : DropState::Ok;

} else {
// Wrap around!
uint16_t drop =
Expand All @@ -311,9 +348,10 @@ void LogStreamingImpl::check_sequence(uint16_t sequence)
if (drop > 0 && _debugging) {
LogDebug() << "Dropped: " << drop << " (with wrap around), overall: " << _drops;
}
}

_maybe_current_sequence = sequence;
_maybe_current_sequence = sequence;
return drop > 0 ? DropState::Dropped : DropState::Ok;
}
}

void LogStreamingImpl::process_message()
Expand Down
8 changes: 7 additions & 1 deletion src/mavsdk/plugins/log_streaming/log_streaming_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,15 @@ class LogStreamingImpl : public PluginImplBase {
void unsubscribe_log_streaming_raw(LogStreaming::LogStreamingRawHandle handle);

private:
enum class DropState {
Ok,
Dropped,
Duplicate,
};

void process_logging_data(const mavlink_message_t& message);
void process_logging_data_acked(const mavlink_message_t& message);
void check_sequence(uint16_t sequence);
DropState check_sequence(uint16_t sequence);
void process_message();

static LogStreaming::Result
Expand Down

0 comments on commit 8932698

Please sign in to comment.