Skip to content

Commit

Permalink
ftp: enable bulk download
Browse files Browse the repository at this point in the history
Signed-off-by: Julian Oes <julian@oes.ch>
  • Loading branch information
julianoes committed Oct 6, 2023
1 parent 32cfb02 commit c818a31
Show file tree
Hide file tree
Showing 16 changed files with 939 additions and 212 deletions.
2 changes: 1 addition & 1 deletion proto
Submodule proto updated 1 files
+1 −0 protos/ftp/ftp.proto
234 changes: 225 additions & 9 deletions src/mavsdk/core/mavlink_ftp_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ void MavlinkFtpClient::do_work()
work_queue_guard.pop_front();
}
},
[&](DownloadBurstItem& item) {
if (!download_burst_start(*work, item)) {
work_queue_guard.pop_front();
}
},
[&](UploadItem& item) {
if (!upload_start(*work, item)) {
work_queue_guard.pop_front();
Expand Down Expand Up @@ -169,6 +174,34 @@ void MavlinkFtpClient::process_mavlink_ftp_message(const mavlink_message_t& msg)
work_queue_guard.pop_front();
}
},
[&](DownloadBurstItem& item) {
if (payload->opcode == RSP_ACK) {
if (payload->req_opcode == CMD_OPEN_FILE_RO ||
payload->req_opcode == CMD_BURST_READ_FILE) {
// Whenever we do get an ack,
// reset the retry counter.
work->retries = RETRIES;

if (!download_burst_continue(*work, item, payload)) {
stop_timer();
work_queue_guard.pop_front();
}
} else if (payload->req_opcode == CMD_TERMINATE_SESSION) {
stop_timer();
item.ofstream.close();
item.callback(ClientResult::Success, {});
work_queue_guard.pop_front();

} else {
LogWarn() << "Unexpected ack";
}

} else if (payload->opcode == RSP_NAK) {
stop_timer();
item.callback(result_from_nak(payload), {});
work_queue_guard.pop_front();
}
},
[&](UploadItem& item) {
if (payload->opcode == RSP_ACK) {
if (payload->req_opcode == CMD_CREATE_FILE ||
Expand Down Expand Up @@ -326,8 +359,6 @@ bool MavlinkFtpClient::download_start(Work& work, DownloadItem& item)
return false;
}

item.last_progress_percentage = -1;

work.last_opcode = CMD_OPEN_FILE_RO;
work.payload = {};
work.payload.seq_number = work.last_sent_seq_number++;
Expand Down Expand Up @@ -385,10 +416,12 @@ bool MavlinkFtpClient::download_continue(Work& work, DownloadItem& item, Payload
work.payload.session = _session;
work.payload.opcode = work.last_opcode;
work.payload.offset = item.bytes_transferred;

work.payload.size =
std::min(static_cast<size_t>(max_data_length), item.file_size - item.bytes_transferred);
LogWarn() << "Request size: " << std::to_string(work.payload.size) << " of left "
<< int(item.file_size - item.bytes_transferred);

start_timer();
send_mavlink_ftp_message(work.payload);

Expand Down Expand Up @@ -416,6 +449,164 @@ bool MavlinkFtpClient::download_continue(Work& work, DownloadItem& item, Payload
return true;
}

bool MavlinkFtpClient::download_burst_start(Work& work, DownloadBurstItem& item)
{
fs::path local_path = fs::path(item.local_folder) / fs::path(item.remote_path).filename();

LogDebug() << "Trying to open write to local path: " << local_path.string();
item.ofstream.open(local_path, std::fstream::trunc | std::fstream::binary);
if (!item.ofstream) {
LogErr() << "Could not open it!";
item.callback(ClientResult::FileIoError, {});
return false;
}

work.last_opcode = CMD_OPEN_FILE_RO;
work.payload = {};
work.payload.seq_number = work.last_sent_seq_number++;
work.payload.session = 0;
work.payload.opcode = work.last_opcode;
work.payload.offset = 0;
strncpy(
reinterpret_cast<char*>(work.payload.data), item.remote_path.c_str(), max_data_length - 1);
work.payload.size = item.remote_path.length() + 1;

start_timer();
send_mavlink_ftp_message(work.payload);

return true;
}

bool MavlinkFtpClient::download_burst_continue(
Work& work, DownloadBurstItem& item, PayloadHeader* payload)
{
if (payload->req_opcode == CMD_OPEN_FILE_RO) {
const size_t file_size = *(reinterpret_cast<uint32_t*>(payload->data));
item.transferred.resize(file_size, DownloadBurstItem::Transferred::No);

// We just use a vector of 0 and write that to the file to prepare it.
// We then fill in the chunks as they come in.
std::vector<char> empty;
empty.resize(file_size);
item.ofstream.write(empty.data(), empty.size());

if (_debugging) {
LogWarn() << "Burst Download continue, got file size: " << file_size;
}

request_next_burst(work, item);
return true;

} else if (payload->req_opcode == CMD_BURST_READ_FILE) {
if (_debugging) {
LogWarn() << "Burst download continue, at: " << std::to_string(payload->offset)
<< " write: " << std::to_string(payload->size);
}

item.ofstream.seekp(payload->offset);
if (item.ofstream.fail()) {
LogWarn() << "Seek failed";
item.callback(ClientResult::FileIoError, {});
return false;
}

// Write actual data to file.
item.ofstream.write(reinterpret_cast<const char*>(payload->data), payload->size);
if (!item.ofstream) {
LogWarn() << "Write failed";
item.callback(ClientResult::FileIoError, {});
return false;
}

// Keep track of what was written.
for (size_t i = payload->offset; i < payload->offset + payload->size; ++i) {
item.transferred[i] = DownloadBurstItem::Transferred::Yes;
}

const size_t bytes_transferred = std::count(
item.transferred.begin(), item.transferred.end(), DownloadBurstItem::Transferred::Yes);

if (bytes_transferred == item.transferred.size()) {
if (_debugging) {
LogDebug() << "Burst complete";
}

// Final step
work.last_opcode = CMD_TERMINATE_SESSION;

work.payload = {};
work.payload.seq_number = work.last_sent_seq_number++;
work.payload.session = _session;

work.payload.opcode = work.last_opcode;
work.payload.offset = 0;
work.payload.size = 0;

start_timer();
send_mavlink_ftp_message(work.payload);

return true;

} else {
item.callback(
ClientResult::Next,
ProgressData{
static_cast<uint32_t>(bytes_transferred),
static_cast<uint32_t>(item.transferred.size())});

if (payload->burst_complete) {
// The burst is supposedly complete but we still need data, so request next burst.
request_next_burst(work, item);

} else {
// There might be more coming, just wait for now.
start_timer();
}

return true;
}

} else {
LogErr() << "Unexpected req_opcode";
return false;
}
}

void MavlinkFtpClient::request_next_burst(Work& work, DownloadBurstItem& item)
{
const auto first_missing = std::find(
item.transferred.begin(), item.transferred.end(), DownloadBurstItem::Transferred::No);
if (first_missing == item.transferred.end()) {
LogErr() << "Nothing missing, this doesn't make sense.";
return;
}

const auto last_missing_plus_one =
std::find(first_missing, item.transferred.end(), DownloadBurstItem::Transferred::Yes);

const size_t offset = std::distance(item.transferred.begin(), first_missing);
const uint32_t size =
static_cast<uint32_t>(std::distance(first_missing, last_missing_plus_one));

if (size == 0) {
LogErr() << "SIZE IS 0";
return;
}

work.last_opcode = CMD_BURST_READ_FILE;
work.payload = {};
work.payload.seq_number = work.last_sent_seq_number++;
work.payload.session = _session;
work.payload.opcode = work.last_opcode;
work.payload.offset = offset;

work.payload.size = 4;
std::memcpy(&work.payload.data, &size, 4);

start_timer();
send_mavlink_ftp_message(work.payload);
}

bool MavlinkFtpClient::upload_start(Work& work, UploadItem& item)
{
std::error_code ec;
Expand Down Expand Up @@ -752,15 +943,27 @@ MavlinkFtpClient::ClientResult MavlinkFtpClient::translate(ServerResult result)
}

void MavlinkFtpClient::download_async(
const std::string& remote_path, const std::string& local_folder, DownloadCallback callback)
const std::string& remote_path,
const std::string& local_folder,
bool use_burst,
DownloadCallback callback)
{
auto item = DownloadItem{};
item.remote_path = remote_path;
item.local_folder = local_folder;
item.callback = callback;
auto new_work = Work{std::move(item)};
if (use_burst) {
auto item = DownloadBurstItem{};
item.remote_path = remote_path;
item.local_folder = local_folder;
item.callback = callback;
auto new_work = Work{std::move(item)};
_work_queue.push_back(std::make_shared<Work>(std::move(new_work)));

_work_queue.push_back(std::make_shared<Work>(std::move(new_work)));
} else {
auto item = DownloadItem{};
item.remote_path = remote_path;
item.local_folder = local_folder;
item.callback = callback;
auto new_work = Work{std::move(item)};
_work_queue.push_back(std::make_shared<Work>(std::move(new_work)));
}
}

void MavlinkFtpClient::upload_async(
Expand Down Expand Up @@ -964,6 +1167,19 @@ void MavlinkFtpClient::timeout()
start_timer();
send_mavlink_ftp_message(work->payload);
},
[&](DownloadBurstItem& item) {
if (--work->retries == 0) {
item.callback(ClientResult::Timeout, {});
work_queue_guard.pop_front();
return;
}
if (_debugging) {
LogDebug() << "Retries left: " << work->retries;
}

start_timer();
send_mavlink_ftp_message(work->payload);
},
[&](UploadItem& item) {
if (--work->retries == 0) {
item.callback(ClientResult::Timeout, {});
Expand Down
19 changes: 19 additions & 0 deletions src/mavsdk/core/mavlink_ftp_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class MavlinkFtpClient {
void download_async(
const std::string& remote_file_path,
const std::string& local_folder,
bool use_burst,
DownloadCallback callback);
void upload_async(
const std::string& local_file_path,
Expand Down Expand Up @@ -153,6 +154,19 @@ class MavlinkFtpClient {
int last_progress_percentage{-1};
};

struct DownloadBurstItem {
std::string remote_path{};
std::string local_folder{};
DownloadCallback callback{};
std::ofstream ofstream{};
enum class Transferred {
No,
Yes,
};
std::vector<Transferred> transferred{};
int last_progress_percentage{-1};
};

struct UploadItem {
std::string local_file_path{};
std::string remote_folder{};
Expand Down Expand Up @@ -200,6 +214,7 @@ class MavlinkFtpClient {

using Item = std::variant<
DownloadItem,
DownloadBurstItem,
UploadItem,
RemoveItem,
RenameItem,
Expand Down Expand Up @@ -264,6 +279,10 @@ class MavlinkFtpClient {
bool download_start(Work& work, DownloadItem& item);
bool download_continue(Work& work, DownloadItem& item, PayloadHeader* payload);

bool download_burst_start(Work& work, DownloadBurstItem& item);
bool download_burst_continue(Work& work, DownloadBurstItem& item, PayloadHeader* payload);
void request_next_burst(Work& work, DownloadBurstItem& item);

bool upload_start(Work& work, UploadItem& item);
bool upload_continue(Work& work, UploadItem& item);

Expand Down

0 comments on commit c818a31

Please sign in to comment.