Skip to content

Commit

Permalink
Merge pull request #341 from msgmaxim/lns
Browse files Browse the repository at this point in the history
Allow LNS requests for Session clients
  • Loading branch information
msgmaxim committed Mar 30, 2020
2 parents 99cd207 + 2832eb0 commit 9c2fcf3
Show file tree
Hide file tree
Showing 5 changed files with 187 additions and 106 deletions.
88 changes: 47 additions & 41 deletions httpserver/http_connection.cpp
Expand Up @@ -35,8 +35,6 @@ namespace http = boost::beast::http; // from <boost/beast/http.hpp>

/// +===========================================

static constexpr auto LOKI_EPHEMKEY_HEADER = "X-Loki-EphemKey";

static constexpr auto LOKI_FILE_SERVER_TARGET_HEADER =
"X-Loki-File-Server-Target";
static constexpr auto LOKI_FILE_SERVER_VERB_HEADER = "X-Loki-File-Server-Verb";
Expand All @@ -63,25 +61,22 @@ std::shared_ptr<request_t> build_post_request(const char* target,
}

void make_http_request(boost::asio::io_context& ioc,
const std::string& sn_address, uint16_t port,
const std::string& address, uint16_t port,
const std::shared_ptr<request_t>& req,
http_callback_t&& cb) {

error_code ec;
tcp::endpoint endpoint;
tcp::resolver resolver(ioc);
#ifdef INTEGRATION_TEST
tcp::resolver::iterator destination =
resolver.resolve("0.0.0.0", "http", ec);
#else

tcp::resolver::iterator destination =
resolver.resolve(sn_address, "http", ec);
#endif
resolver.resolve(address, "http", ec);

if (ec) {
LOKI_LOG(error,
"http: Failed to parse the IP address <{}>. Error code = {}. "
"Message: {}",
sn_address, ec.value(), ec.message());
address, ec.value(), ec.message());
return;
}
while (destination != tcp::resolver::iterator()) {
Expand Down Expand Up @@ -558,25 +553,31 @@ void connection_t::process_onion_req() {
// Need to make sure we are not blocking waiting for the response
delay_response_ = true;

auto on_response = [this](loki::Response res) {
auto on_response = [wself = std::weak_ptr<connection_t>{shared_from_this()}](loki::Response res) {
LOKI_LOG(debug, "Got an onion response as guard node");

auto self = wself.lock();
if (!self) {
LOKI_LOG(debug, "Connection is no longer valid, dropping onion response");
return;
}

if (res.status() == Status::OK) {
response_.result(http::status::ok);
self->response_.result(http::status::ok);

// OK here simply means that the response we got is
// coming from the target node as opposed to any other
// node on the path. The encrypted body will contain
// its own response status.

this->body_stream_ << res.message();
self->body_stream_ << res.message();
} else {
// res.status() is for us, should we only report a generic
// error to indicate onion request failure?
response_.result(static_cast<int>(res.status()));
self->response_.result(static_cast<int>(res.status()));
}

this->write_response();
self->write_response();
};

try {
Expand Down Expand Up @@ -867,22 +868,6 @@ void connection_t::process_swarm_req(boost::string_view target) {
} else if (target == "/swarms/ping_test/v1") {
LOKI_LOG(trace, "Received ping_test");
response_.result(http::status::ok);
} else if (target == "/swarms/proxy_exit") {
LOKI_LOG(debug,
"Processing proxy request: we are the destination node");

const auto it = req.find(LOKI_SENDER_KEY_HEADER);
/// TODO: handle the error better?
if (it != req.end()) {

const std::string key = {it->value().data(), it->value().size()};

auto res = request_handler_.process_proxy_exit(key, req.body());
this->set_response(res);
} else {
LOKI_LOG(debug, "Error: {} header is missing",
LOKI_SENDER_KEY_HEADER);
}
}
}

Expand Down Expand Up @@ -1109,29 +1094,50 @@ void connection_t::process_client_req_rate_limited() {
// to work, spamming us with "retrieve" requests. The workaround for now
// is to delay responding to the request for a few seconds

// Client requests can be asynchronous, so only respond in a callback
this->delay_response_ = true;

// TODO: remove this when we remove long-polling from (most) clients
if (lp_requested) {
LOKI_LOG(debug, "Received a long-polling request");
this->delay_response_ = true;

auto delay_timer = std::make_shared<boost::asio::steady_timer>(ioc_);

delay_timer->expires_after(std::chrono::seconds(2));
delay_timer->async_wait([self = shared_from_this(), delay_timer, plaintext = std::move(plain_text)](const error_code& ec) {

const auto res = self->request_handler_.process_client_req(plaintext);
self->request_handler_.process_client_req(plaintext, [wself = std::weak_ptr<connection_t>{self}](loki::Response res) {

LOKI_LOG(debug, "Respond to a long-polling client");
self->set_response(res);
self->write_response();
auto self = wself.lock();
if (!self) {
LOKI_LOG(debug, "Connection is no longer valid, dropping response");
return;
}

LOKI_LOG(debug, "Respond to a long-polling client");
self->set_response(res);
self->write_response();
});
});


} else {
const auto res = request_handler_.process_client_req(plain_text);
LOKI_LOG(debug, "Respond to a non-long polling client");
this->set_response(res);
}
request_handler_.process_client_req(
plain_text,
[wself = std::weak_ptr<connection_t>{shared_from_this()}](loki::Response res) {

// // A connection could have been destroyed by the deadline timer
auto self = wself.lock();
if (!self) {
LOKI_LOG(debug, "Connection is no longer valid, dropping proxy response");
return;
}

LOKI_LOG(debug, "Respond to a non-long polling client");
self->set_response(res);
self->write_response();
});
}

}

Expand All @@ -1155,7 +1161,7 @@ void connection_t::register_deadline() {
ec.message());
}

LOKI_LOG(debug, "Closing [connection_t] socket due to timeout");
LOKI_LOG(debug, "[{}] Closing [connection_t] socket due to timeout", self->conn_idx);
self->clean_up();
});
}
Expand Down
25 changes: 15 additions & 10 deletions httpserver/lmq_server.cpp
Expand Up @@ -82,20 +82,25 @@ void LokimqServer::handle_sn_proxy_exit(lokimq::Message& message) {
const auto& client_key = message.data[0];
const auto& payload = message.data[1];

auto &reply_tag = message.reply_tag;
auto &origin_pk = message.conn.pubkey();

// TODO: accept string_view?
auto res = request_handler_->process_proxy_exit(std::string(client_key), std::string(payload));
request_handler_->process_proxy_exit(std::string(client_key), std::string(payload), [this, origin_pk, reply_tag](loki::Response res) {

if (res.status() == Status::OK) {
if (res.status() == Status::OK) {

// TODO: we might want to delay reponding in the case of LP,
// unless the proxy delay is long enough
// TODO: we might want to delay reponding in the case of LP,
// unless the proxy delay is long enough

message.send_reply(res.message());
this->lokimq_->send(origin_pk, "REPLY", reply_tag, res.message());

} else {
// TODO: better handle this (unlikely) error
LOKI_LOG(debug, "Error: status is not OK for proxy_exit");
}
} else {
// TODO: better handle this (unlikely) error
LOKI_LOG(debug, "Error: status is not OK for proxy_exit");
}

});

}

Expand All @@ -107,7 +112,7 @@ void LokimqServer::handle_onion_request(lokimq::Message& message) {
auto &origin_pk = message.conn.pubkey();

auto on_response = [this, origin_pk, reply_tag](loki::Response res) mutable {
LOKI_LOG(debug, "on response: {}", to_string(res));
LOKI_LOG(trace, "on response: {}", to_string(res));

std::string status = std::to_string(static_cast<int>(res.status()));

Expand Down
3 changes: 2 additions & 1 deletion httpserver/main.cpp
Expand Up @@ -217,7 +217,8 @@ int main(int argc, char* argv[]) {
ioc, worker_ioc, options.port, lokimq_server, lokid_key_pair,
options.data_dir, lokid_client, options.force_start);

loki::RequestHandler request_handler(ioc, service_node, channel_encryption);
loki::RequestHandler request_handler(ioc, service_node, lokid_client,
channel_encryption);

lokimq_server.init(&service_node, &request_handler,
lokid_key_pair_x25519);
Expand Down

0 comments on commit 9c2fcf3

Please sign in to comment.