Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ if (UNIX)
)
find_package(Threads REQUIRED)
target_link_libraries(lslboost PUBLIC Threads::Threads)
target_compile_features(lslboost PUBLIC cxx_std_11)
target_compile_features(lslboost PUBLIC cxx_std_11 cxx_lambda_init_captures)
else () # WIN32
target_sources(lslboost PRIVATE
lslboost/libs/serialization/src/codecvt_null.cpp
Expand Down
19 changes: 8 additions & 11 deletions src/resolve_attempt_udp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,25 +70,23 @@ void resolve_attempt_udp::begin() {
// also initiate the cancel event, if desired
if (cancel_after_ != FOREVER) {
cancel_timer_.expires_after(timeout_sec(cancel_after_));
auto keepalive(shared_from_this());
cancel_timer_.async_wait([keepalive, this](err_t err) {
cancel_timer_.async_wait([shared_this = shared_from_this(), this](err_t err) {
if (!err) do_cancel();
});
}
}

void resolve_attempt_udp::cancel() {
auto keepalive(shared_from_this());
post(io_, [keepalive]() { keepalive->do_cancel(); });
post(io_, [shared_this = shared_from_this()]() { shared_this->do_cancel(); });
}


// === receive loop ===

void resolve_attempt_udp::receive_next_result() {
auto keepalive(shared_from_this());
recv_socket_.async_receive_from(buffer(resultbuf_), remote_endpoint_,
[keepalive, this](err_t err, size_t len) { handle_receive_outcome(err, len); });
[shared_this = shared_from_this()](
err_t err, size_t len) { shared_this->handle_receive_outcome(err, len); });
}

void resolve_attempt_udp::handle_receive_outcome(error_code err, std::size_t len) {
Expand Down Expand Up @@ -153,12 +151,11 @@ void resolve_attempt_udp::send_next_query(endpoint_list::const_iterator next) {
? broadcast_socket_
: (ep.address().is_multicast() ? multicast_socket_ : unicast_socket_);
// and send the query over it
auto keepalive(shared_from_this());
sock.async_send_to(
lslboost::asio::buffer(query_msg_), ep, [keepalive, this, next](err_t err, size_t) {
if (!cancelled_ && err != error::operation_aborted &&
sock.async_send_to(lslboost::asio::buffer(query_msg_), ep,
[shared_this = shared_from_this(), next](err_t err, size_t) {
if (!shared_this->cancelled_ && err != error::operation_aborted &&
err != error::not_connected && err != error::not_socket)
send_next_query(next);
shared_this->send_next_query(next);
});
} else
// otherwise just go directly to the next query
Expand Down
3 changes: 1 addition & 2 deletions src/resolver_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,7 @@ void resolver_impl::resolve_continuous(const std::string &query, double forget_a
// start a wave of resolve packets
next_resolve_wave();
// spawn a thread that runs the IO operations
auto io_keepalive(io_);
background_io_ = std::make_shared<lslboost::thread>([io_keepalive]() { io_keepalive->run(); });
background_io_ = std::make_shared<lslboost::thread>([shared_io = io_]() { shared_io->run(); });
}

std::vector<stream_info_impl> resolver_impl::results(uint32_t max_results) {
Expand Down
63 changes: 32 additions & 31 deletions src/tcp_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,7 @@ void tcp_server::end_serving() {
shutdown_ = true;
// issue closure of the server socket; this will result in a cancellation of the associated IO
// operations
auto keepalive(acceptor_);
post(*io_, [keepalive]() { keepalive->close(); });
post(*io_, [shared_acceptor = acceptor_]() { shared_acceptor->close(); });
// issue closure of all active client session sockets; cancels the related outstanding IO jobs
close_inflight_sockets();
// also notify any transfer threads that are blocked waiting for a sample by sending them one (=
Expand All @@ -199,9 +198,10 @@ void tcp_server::accept_next_connection() {
std::shared_ptr<client_session> newsession{
std::make_shared<client_session>(shared_from_this())};
// accept a connection on the session's socket
auto keepalive(shared_from_this());
acceptor_->async_accept(*newsession->socket(),
[keepalive, newsession, this](err_t err) { handle_accept_outcome(newsession, err); });
acceptor_->async_accept(
*newsession->socket(), [shared_this = shared_from_this(), newsession, this](err_t err) {
shared_this->handle_accept_outcome(newsession, err);
});
} catch (std::exception &e) {
LOG_F(ERROR, "Error during tcp_server::accept_next_connection: %s", e.what());
}
Expand Down Expand Up @@ -267,9 +267,10 @@ void client_session::begin_processing() {
serv_->register_inflight_socket(sock_);
registered_ = true;
// read the request line
auto keepalive(shared_from_this());
async_read_until(*sock_, requestbuf_, "\r\n",
[keepalive, this](err_t err, size_t) { handle_read_command_outcome(err); });
async_read_until(
*sock_, requestbuf_, "\r\n", [shared_this = shared_from_this()](err_t err, size_t) {
shared_this->handle_read_command_outcome(err);
});
} catch (std::exception &e) {
LOG_F(ERROR, "Error during client_session::begin_processing: %s", e.what());
}
Expand All @@ -282,29 +283,31 @@ void client_session::handle_read_command_outcome(error_code err) {
std::string method;
getline(requeststream_, method);
method = trim(method);
auto keepalive(shared_from_this());
if (method == "LSL:shortinfo")
// shortinfo request: read the content query string
async_read_until(*sock_, requestbuf_, "\r\n",
[keepalive, this](err_t err, std::size_t) { handle_read_query_outcome(err); });
[shared_this = shared_from_this()](
err_t err, std::size_t) { shared_this->handle_read_query_outcome(err); });
if (method == "LSL:fullinfo")
// fullinfo request: reply right away
async_write(*sock_, lslboost::asio::buffer(serv_->fullinfo_msg_),
[keepalive, this](err_t, std::size_t) { });
[shared_this = shared_from_this()](err_t, std::size_t) {});
if (method == "LSL:streamfeed")
// streamfeed request (1.00): read feed parameters
async_read_until(
*sock_, requestbuf_, "\r\n", [keepalive, this](err_t err, std::size_t) {
handle_read_feedparams(100, "", err);
async_read_until(*sock_, requestbuf_, "\r\n",
[shared_this = shared_from_this()](err_t err, std::size_t) {
shared_this->handle_read_feedparams(100, "", err);
});
if (method.compare(0, 15, "LSL:streamfeed/") == 0) {
// streamfeed request with version: read feed parameters
std::vector<std::string> parts = splitandtrim(method, ' ', true);
int request_protocol_version = std::stoi(parts[0].substr(15));
std::string request_uid = (parts.size() > 1) ? parts[1] : "";
async_read_until(*sock_, requestbuf_, "\r\n\r\n", [=](err_t err, std::size_t) {
keepalive->handle_read_feedparams(request_protocol_version, request_uid, err);
});
async_read_until(*sock_, requestbuf_, "\r\n\r\n",
[shared_this = shared_from_this(),
request_protocol_version = std::stoi(parts[0].substr(15)),
request_uid = (parts.size() > 1) ? parts[1] : ""](err_t err, std::size_t) {
shared_this->handle_read_feedparams(
request_protocol_version, request_uid, err);
});
}
}
} catch (std::exception &e) {
Expand All @@ -321,9 +324,8 @@ void client_session::handle_read_query_outcome(error_code err) {
query = trim(query);
if (serv_->info_->matches_query(query)) {
// matches: reply (otherwise just close the stream)
auto keepalive(shared_from_this());
async_write(*sock_, lslboost::asio::buffer(serv_->shortinfo_msg_),
[keepalive](err_t, std::size_t) {
[shared_this = shared_from_this()](err_t, std::size_t) {
/* keep the client_session alive until the shortinfo is sent completely*/
});
} else
Expand All @@ -336,9 +338,8 @@ void client_session::handle_read_query_outcome(error_code err) {

void client_session::send_status_message(const std::string &str) {
auto msg(std::make_shared<std::string>(str));
auto keepalive(shared_from_this());
async_write(*sock_, lslboost::asio::buffer(*msg),
[msg, keepalive](
[msg, shared_this = shared_from_this()](
err_t, std::size_t) { /* keep objects alive until the message is sent */ });
}

Expand Down Expand Up @@ -487,10 +488,10 @@ void client_session::handle_read_feedparams(
else
*outarch_ << *temp;
// send off the newly created feedheader
auto keepalive(shared_from_this());
async_write(*sock_, feedbuf_.data(), [keepalive, this](err_t err, size_t len) {
handle_send_feedheader_outcome(err, len);
});
async_write(
*sock_, feedbuf_.data(), [shared_this = shared_from_this()](err_t err, size_t len) {
shared_this->handle_send_feedheader_outcome(err, len);
});
DLOG_F(4, "%p sent test pattern samples", this);
}
} catch (std::exception &e) {
Expand Down Expand Up @@ -544,10 +545,10 @@ void client_session::transfer_samples_thread(std::shared_ptr<client_session>) {
// send off the chunk that we aggregated so far
lslboost::unique_lock<lslboost::mutex> lock(completion_mut_);
transfer_completed_ = false;
auto keepalive(shared_from_this());
async_write(*sock_, feedbuf_.data(), [keepalive, this](err_t err, size_t len) {
handle_chunk_transfer_outcome(err, len);
});
async_write(*sock_, feedbuf_.data(),
[shared_this = shared_from_this()](err_t err, size_t len) {
shared_this->handle_chunk_transfer_outcome(err, len);
});
// wait for the completion condition
completion_cond_.wait(lock, [this]() { return transfer_completed_; });
// handle transfer outcome
Expand Down
18 changes: 8 additions & 10 deletions src/udp_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,9 @@ void udp_server::end_serving() {

void udp_server::request_next_packet() {
DLOG_F(5, "udp_server::request_next_packet");
auto keepalive(shared_from_this());
socket_->async_receive_from(lslboost::asio::buffer(buffer_), remote_endpoint_,
[keepalive, this](err_t err, std::size_t len) { handle_receive_outcome(err, len); });
[shared_this = shared_from_this()](
err_t err, std::size_t len) { shared_this->handle_receive_outcome(err, len); });
}

void udp_server::handle_receive_outcome(error_code err, std::size_t len) {
Expand Down Expand Up @@ -129,11 +129,10 @@ void udp_server::handle_receive_outcome(error_code err, std::size_t len) {
udp::endpoint return_endpoint(remote_endpoint_.address(), return_port);
string_p replymsg(
std::make_shared<std::string>((query_id += "\r\n") += shortinfo_msg_));
auto keepalive(shared_from_this());
socket_->async_send_to(lslboost::asio::buffer(*replymsg), return_endpoint,
[keepalive, replymsg, this](err_t err, std::size_t len) {
if (err != error::operation_aborted && err != error::shut_down)
request_next_packet();
[shared_this = shared_from_this(), replymsg](err_t err_, std::size_t) {
if (err_ != error::operation_aborted && err_ != error::shut_down)
shared_this->request_next_packet();
});
return;
} else {
Expand All @@ -152,11 +151,10 @@ void udp_server::handle_receive_outcome(error_code err, std::size_t len) {
reply.precision(16);
reply << " " << wave_id << " " << t0 << " " << t1 << " " << lsl_clock();
string_p replymsg(std::make_shared<std::string>(reply.str()));
auto keepalive(shared_from_this());
socket_->async_send_to(lslboost::asio::buffer(*replymsg), remote_endpoint_,
[keepalive, replymsg, this](err_t err, std::size_t len) {
if (err != error::operation_aborted && err != error::shut_down)
request_next_packet();
[shared_this = shared_from_this(), replymsg](err_t err_, std::size_t) {
if (err_ != error::operation_aborted && err_ != error::shut_down)
shared_this->request_next_packet();
});
return;
} else {
Expand Down