Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

log_streaming: make it robust to message drops #2290

Merged
merged 1 commit into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions examples/log_streaming/log_streaming.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,19 @@ using std::this_thread::sleep_for;

void usage(const std::string& bin_name)
{
std::cerr << "Usage : " << bin_name << " <connection_url> [--rm]\n"
std::cerr << "Usage : " << bin_name << " <connection_url> [--drop]\n"
<< '\n'
<< "Connection URL format should be :\n"
<< " For TCP : tcp://[server_host][:server_port]\n"
<< " For UDP : udp://[bind_host][:bind_port]\n"
<< " For Serial : serial:///path/to/serial/dev[:baudrate]\n"
<< "For example, to connect to the simulator use URL: udp://:14540" << std::endl;
<< "For example, to connect to the simulator use URL: udp://:14540" << '\n'
<< "--drop To drop some of the messages" << std::endl;
}

int main(int argc, char** argv)
{
if (argc > 2) {
if (argc < 2) {
usage(argv[0]);
return 1;
}
Expand All @@ -49,6 +50,14 @@ int main(int argc, char** argv)
return 1;
}

// To simulate message drops.
if (argc == 3 && std::string(argv[2]) == "--drop") {
std::cout << "Dropping some messages" << std::endl;
unsigned counter = 0;
mavsdk.intercept_incoming_messages_async(
[&](const mavlink_message_t&) { return counter++ % 10 != 0; });
}

// Create file to log to.
// Get current time
auto now = std::chrono::system_clock::now();
Expand Down
140 changes: 89 additions & 51 deletions src/mavsdk/plugins/log_streaming/log_streaming_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,28 +178,45 @@ void LogStreamingImpl::process_logging_data(const mavlink_message_t& message)
}

std::lock_guard<std::mutex> lock(_mutex);
check_sequence(logging_data.sequence);

if (logging_data.first_message_offset == std::numeric_limits<uint8_t>::max()) {
_ulog_data.insert(
_ulog_data.end(), logging_data.data, logging_data.data + logging_data.length);
} else {
if (logging_data.first_message_offset > sizeof(logging_data.data)) {
LogWarn() << "Invalid first_message_offset";
return;
}

_ulog_data.insert(
_ulog_data.end(),
logging_data.data,
logging_data.data + logging_data.first_message_offset);
process_message();
_ulog_data.clear();

_ulog_data.insert(
_ulog_data.end(),
logging_data.data + logging_data.first_message_offset,
logging_data.data + logging_data.length);
auto drop_state = check_sequence(logging_data.sequence);

switch (drop_state) {
case DropState::Ok:
if (logging_data.first_message_offset == std::numeric_limits<uint8_t>::max()) {
_ulog_data.insert(
_ulog_data.end(), logging_data.data, logging_data.data + logging_data.length);

} else {
if (logging_data.first_message_offset > sizeof(logging_data.data)) {
LogWarn() << "Invalid first_message_offset";
return;
}

_ulog_data.insert(
_ulog_data.end(),
logging_data.data,
logging_data.data + logging_data.first_message_offset);
process_message();

_ulog_data.clear();
_ulog_data.insert(
_ulog_data.end(),
logging_data.data + logging_data.first_message_offset,
logging_data.data + logging_data.length);
}
break;

case DropState::Dropped:
_ulog_data.clear();
_ulog_data.insert(
_ulog_data.end(),
logging_data.data + logging_data.first_message_offset,
logging_data.data + logging_data.length);
break;

case DropState::Duplicate:
// Ignore.
break;
}
}

Expand Down Expand Up @@ -254,46 +271,62 @@ void LogStreamingImpl::process_logging_data_acked(const mavlink_message_t& messa
}

std::lock_guard<std::mutex> lock(_mutex);
check_sequence(logging_data_acked.sequence);

if (logging_data_acked.first_message_offset == std::numeric_limits<uint8_t>::max()) {
_ulog_data.insert(
_ulog_data.end(),
logging_data_acked.data,
logging_data_acked.data + logging_data_acked.length);
} else {
if (logging_data_acked.first_message_offset > sizeof(logging_data_acked.data)) {
LogWarn() << "Invalid first_message_offset";
return;
}

_ulog_data.insert(
_ulog_data.end(),
logging_data_acked.data,
logging_data_acked.data + logging_data_acked.first_message_offset);
process_message();
_ulog_data.clear();

_ulog_data.insert(
_ulog_data.end(),
logging_data_acked.data + logging_data_acked.first_message_offset,
logging_data_acked.data + logging_data_acked.length);
auto drop_state = check_sequence(logging_data_acked.sequence);

switch (drop_state) {
case DropState::Ok:
if (logging_data_acked.first_message_offset == std::numeric_limits<uint8_t>::max()) {
_ulog_data.insert(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After a dropout you must first wait for a valid logging_data_acked.first_message_offset before inserting in _ulog_data

Copy link
Collaborator Author

@julianoes julianoes May 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. So I need to keep track of the previous state. Or rather set if there was a valid one.

_ulog_data.end(),
logging_data_acked.data,
logging_data_acked.data + logging_data_acked.length);
} else {
if (logging_data_acked.first_message_offset > sizeof(logging_data_acked.data)) {
LogWarn() << "Invalid first_message_offset";
return;
}

_ulog_data.insert(
_ulog_data.end(),
logging_data_acked.data,
logging_data_acked.data + logging_data_acked.first_message_offset);
process_message();
_ulog_data.clear();

_ulog_data.insert(
_ulog_data.end(),
logging_data_acked.data + logging_data_acked.first_message_offset,
logging_data_acked.data + logging_data_acked.length);
}
break;

case DropState::Dropped:
_ulog_data.clear();
_ulog_data.insert(
_ulog_data.end(),
logging_data_acked.data + logging_data_acked.first_message_offset,
logging_data_acked.data + logging_data_acked.length);
Comment on lines +305 to +308
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here you must only insert if there's a valid offset: logging_data_acked.first_message_offset != std::numeric_limits<uint8_t>::max()

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, right!

break;

case DropState::Duplicate:
// Ignore.
break;
}
}

void LogStreamingImpl::check_sequence(uint16_t sequence)
LogStreamingImpl::DropState LogStreamingImpl::check_sequence(uint16_t sequence)
{
// Assume we have lock.

if (!_maybe_current_sequence) {
// This is the first time we use the sequence.
_maybe_current_sequence = sequence;
return;
return DropState::Ok;
}

if (_maybe_current_sequence.value() == sequence) {
// Duplicate
return;
return DropState::Duplicate;
}

if (sequence > _maybe_current_sequence.value()) {
Expand All @@ -303,6 +336,10 @@ void LogStreamingImpl::check_sequence(uint16_t sequence)
if (drop > 0 && _debugging) {
LogDebug() << "Dropped: " << drop << " (no wrap around), overall: " << _drops;
}

_maybe_current_sequence = sequence;
return drop > 0 ? DropState::Dropped : DropState::Ok;

} else {
// Wrap around!
uint16_t drop =
Expand All @@ -311,9 +348,10 @@ void LogStreamingImpl::check_sequence(uint16_t sequence)
if (drop > 0 && _debugging) {
LogDebug() << "Dropped: " << drop << " (with wrap around), overall: " << _drops;
}
}

_maybe_current_sequence = sequence;
_maybe_current_sequence = sequence;
return drop > 0 ? DropState::Dropped : DropState::Ok;
}
}

void LogStreamingImpl::process_message()
Expand Down
8 changes: 7 additions & 1 deletion src/mavsdk/plugins/log_streaming/log_streaming_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,15 @@ class LogStreamingImpl : public PluginImplBase {
void unsubscribe_log_streaming_raw(LogStreaming::LogStreamingRawHandle handle);

private:
enum class DropState {
Ok,
Dropped,
Duplicate,
};

void process_logging_data(const mavlink_message_t& message);
void process_logging_data_acked(const mavlink_message_t& message);
void check_sequence(uint16_t sequence);
DropState check_sequence(uint16_t sequence);
void process_message();

static LogStreaming::Result
Expand Down
Loading