Skip to content

Commit

Permalink
bparser reworking
Browse files Browse the repository at this point in the history
- deque of sent_requests plus modified binary search to find RID's
- bparser callback only exposes message type
- ::respond method implemented in message type exposed in callback
  • Loading branch information
dr7ana committed Sep 22, 2023
1 parent 8e38f5f commit b43aa48
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 111 deletions.
12 changes: 6 additions & 6 deletions include/quic/connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,19 +68,19 @@ namespace oxen::quic

public:
template <typename StreamT = Stream, typename... Args, std::enable_if_t<std::is_base_of_v<Stream, StreamT>, int> = 0>
std::shared_ptr<Stream> queue_stream(Args&&... args)
std::shared_ptr<StreamT> queue_stream(Args&&... args)
{
return queue_stream_impl([&](Connection& c, Endpoint& e) {
return std::static_pointer_cast<StreamT>(queue_stream_impl([&](Connection& c, Endpoint& e) {
return std::make_shared<StreamT>(c, e, std::forward<Args>(args)...);
});
}));
}

template <typename StreamT = Stream, typename... Args, std::enable_if_t<std::is_base_of_v<Stream, StreamT>, int> = 0>
std::shared_ptr<Stream> get_new_stream(Args&&... args)
std::shared_ptr<StreamT> get_new_stream(Args&&... args)
{
return get_new_stream_impl([&](Connection& c, Endpoint& e) {
return std::static_pointer_cast<StreamT>(get_new_stream_impl([&](Connection& c, Endpoint& e) {
return std::make_shared<StreamT>(c, e, std::forward<Args>(args)...);
});
}));
}

template <
Expand Down
138 changes: 86 additions & 52 deletions include/quic/parser.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,38 +18,57 @@ namespace oxen::quic
// Application error
inline constexpr uint64_t BPARSER_EXCEPTION = (1ULL << 60) + 69;

class bparser;

struct message
{
int64_t req_id;
std::string data;
std::string_view req_type;
std::string_view req_id;
std::string_view endpoint;
std::string_view req_body;
bparser& return_sender;

bool error{false};

message(std::string req, bool is_error = false) : data{std::move(req)}, error{is_error}
message(bparser& bp, std::string req, bool is_error = false) :
data{std::move(req)}, error{is_error}, return_sender{bp}
{
oxenc::bt_list_consumer btlc(data);

req_type = btlc.consume_string_view();
req_id = btlc.consume_string_view();
req_id = btlc.consume_integer<int64_t>();

if (req_type == "Q" || req_type == "C")
endpoint = btlc.consume_string_view();

req_body = btlc.consume_string_view();
}

std::string rid() { return std::string{req_id}; }
void respond(int64_t rid, std::string body, bool error = false);

// To be used to determine if the message was a result of an error as such:
//
// void f(const message& m)
// {
// if (not m.error)
// { // success logic }
// ... // is identical to:
// if (m)
// { // success logic }
// }
operator bool() const { return not error; }

int64_t rid() { return req_id; }
std::string_view view() { return {data}; }
};

struct sent_request
{
// parsed request data
int64_t req_id;
std::string data;
std::string req_id;
bparser& return_sender;

// total length of the request; is at the beginning of the request
size_t total_len;
Expand All @@ -59,7 +78,7 @@ namespace oxen::quic

bool is_empty() const { return data.empty() && total_len == 0; }

explicit sent_request(std::string d, std::string rid) : req_id{std::move(rid)}
explicit sent_request(bparser& bp, std::string d, int64_t rid) : req_id{rid}, return_sender{bp}
{
total_len = d.length();
data.reserve(data.length() + total_len);
Expand All @@ -71,7 +90,9 @@ namespace oxen::quic
timeout = req_time + TIMEOUT;
}

message to_message() { return {data}; }
bool is_expired(std::chrono::steady_clock::time_point tp) { return timeout < tp; }

message to_message(bool error = false) { return {return_sender, data, error}; }

std::string_view view() { return {data}; }
std::string&& payload() { return std::move(data); }
Expand All @@ -81,7 +102,7 @@ namespace oxen::quic
{
private:
// outgoing requests awaiting response
std::map<std::chrono::steady_clock::time_point, std::shared_ptr<sent_request>> sent_reqs;
std::deque<std::shared_ptr<sent_request>> sent_reqs;

std::string buf;
std::string size_buf;
Expand All @@ -90,13 +111,8 @@ namespace oxen::quic

std::atomic<int64_t> next_rid{0};

std::function<void(Stream&, message)> recv_callback;

std::function<void(Stream&, uint64_t)> close_callback = [this](Stream& s, uint64_t ec) {
log::debug(bp_cat, "{} called", __PRETTY_FUNCTION__);
sent_reqs.clear();
s.close(io_error{ec});
};
friend class sent_request;
std::function<void(message)> recv_callback;

public:
template <typename... Opt>
Expand All @@ -105,33 +121,53 @@ namespace oxen::quic
((void)handle_bp_opt(std::forward<Opt>(opts)), ...);
}

void request(std::string endpoint, std::string body) override
~bparser() { sent_reqs.clear(); }

void request(std::string endpoint, std::string body)
{
log::debug(bp_cat, "{} called", __PRETTY_FUNCTION__);

auto req = make_request(std::move(endpoint), std::move(body));
send(req->view());

auto& sr = sent_reqs[req->timeout];
sr = std::move(req);
sent_reqs.push_back(std::move(req));
}

void command(std::string endpoint, std::string body) override
void command(std::string endpoint, std::string body)
{
log::debug(bp_cat, "{} called", __PRETTY_FUNCTION__);

auto req = make_command(std::move(endpoint), std::move(body));
send(req->payload());
}

void respond(std::string rid, std::string body, bool error = false) override
void respond(int64_t rid, std::string body, bool error = false)
{
log::debug(bp_cat, "{} called", __PRETTY_FUNCTION__);

auto req = make_response(std::move(rid), std::move(body), error);
auto req = make_response(rid, std::move(body), error);
send(req->payload());
}

void check_timeouts()
{
const auto& now = get_time();

do
{
auto& f = sent_reqs.front();

if (f->is_expired(now))
{
recv_callback(f->to_message(true));
sent_reqs.pop_front();
}
else
return;

} while (not sent_reqs.empty());
}

void receive(bstring_view data) override
{
log::info(bp_cat, "bparser recv data callback called!");
Expand All @@ -153,21 +189,8 @@ namespace oxen::quic
close_callback(*this, app_code);
}

void check_timeouts() override
{
const auto& now = get_time();

for (auto itr = sent_reqs.begin(); itr != sent_reqs.end();)
{
if (itr->first < now)
itr = sent_reqs.erase(itr);
else
return;
}
}

private:
void handle_bp_opt(std::function<void(Stream&, message)> recv_cb)
void handle_bp_opt(std::function<void(message)> recv_cb)
{
log::debug(bp_cat, "Bparser set user-provided recv callback!");
recv_callback = std::move(recv_cb);
Expand All @@ -179,18 +202,22 @@ namespace oxen::quic
close_callback = std::move(close_cb);
}

bool match(std::string_view rid)
bool match(int64_t rid)
{
log::trace(bp_cat, "{} called", __PRETTY_FUNCTION__);

for (auto& r : sent_reqs)
// Iterate using forward iterators, s.t. we go highest (newest) rids to lowest (oldest) rids.
// As a result, our comparator checks if the sent request ID is greater thanthan the target rid
auto itr = std::lower_bound(
sent_reqs.begin(), sent_reqs.end(), rid, [](std::shared_ptr<sent_request> sr, int64_t rid) {
return sr->req_id > rid;
});

if (itr != sent_reqs.end() and itr->get()->req_id == rid)
{
if (r.second->req_id == rid)
{
log::debug(bp_cat, "Successfully matched response to sent request!");
sent_reqs.erase(r.first);
return true;
}
log::debug(bp_cat, "Successfully matched response to sent request!");
sent_reqs.erase(itr);
return true;
}

return false;
Expand All @@ -209,7 +236,7 @@ namespace oxen::quic
}
}

recv_callback(*this, std::move(msg));
recv_callback(std::move(msg));
}

void process_incoming(std::string_view req)
Expand Down Expand Up @@ -250,7 +277,7 @@ namespace oxen::quic
if (req.size() >= current_len)
{
buf += req.substr(0, current_len);
handle_input(message{std::move(buf)});
handle_input(message{*this, std::move(buf)});
req.remove_prefix(current_len);

current_len = 0;
Expand All @@ -268,7 +295,7 @@ namespace oxen::quic
{
buf += req.substr(0, r_size);
req.remove_prefix(r_size);
handle_input(message{std::move(buf)});
handle_input(message{*this, std::move(buf)});
current_len = 0;
continue;
}
Expand All @@ -281,7 +308,7 @@ namespace oxen::quic
std::shared_ptr<sent_request> make_request(std::string endpoint, std::string body)
{
oxenc::bt_list_producer btlp;
std::string rid = std::to_string(++next_rid);
auto rid = ++next_rid;

try
{
Expand All @@ -290,7 +317,7 @@ namespace oxen::quic
btlp.append(endpoint);
btlp.append(body);

auto req = std::make_shared<sent_request>(std::move(btlp).str(), rid);
auto req = std::make_shared<sent_request>(*this, std::move(btlp).str(), rid);
return req;
}
catch (...)
Expand All @@ -304,7 +331,7 @@ namespace oxen::quic
std::shared_ptr<sent_request> make_command(std::string endpoint, std::string body)
{
oxenc::bt_list_producer btlp;
std::string rid = std::to_string(++next_rid);
auto rid = ++next_rid;

try
{
Expand All @@ -313,7 +340,7 @@ namespace oxen::quic
btlp.append(endpoint);
btlp.append(body);

auto req = std::make_shared<sent_request>(std::move(btlp).str(), rid);
auto req = std::make_shared<sent_request>(*this, std::move(btlp).str(), rid);
return req;
}
catch (...)
Expand All @@ -324,7 +351,7 @@ namespace oxen::quic
return nullptr;
}

std::shared_ptr<sent_request> make_response(std::string rid, std::string body, bool error = false)
std::shared_ptr<sent_request> make_response(int64_t rid, std::string body, bool error = false)
{
oxenc::bt_list_producer btlp;

Expand All @@ -334,7 +361,7 @@ namespace oxen::quic
btlp.append(rid);
btlp.append(body);

auto req = std::make_shared<sent_request>(std::move(btlp).str(), rid);
auto req = std::make_shared<sent_request>(*this, std::move(btlp).str(), rid);
return req;
}
catch (...)
Expand Down Expand Up @@ -377,4 +404,11 @@ namespace oxen::quic
return pos + 1;
}
};

inline void message::respond(int64_t rid, std::string body, bool error)
{
log::debug(bp_cat, "{} called", __PRETTY_FUNCTION__);

return_sender.respond(rid, std::move(body), error);
}
} // namespace oxen::quic
24 changes: 0 additions & 24 deletions include/quic/stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,30 +250,6 @@ namespace oxen::quic
}

public:
virtual void command(std::string, std::string)
{
log::warning(log_cat, "{} called", __PRETTY_FUNCTION__);
throw std::runtime_error{"Stream objects should not be queried for RPC commands!"};
}

virtual void request(std::string, std::string)
{
log::warning(log_cat, "{} called", __PRETTY_FUNCTION__);
throw std::runtime_error{"Stream objects should not be queried for RPC requests!"};
}

virtual void respond(std::string, std::string, bool = false)
{
log::warning(log_cat, "{} called", __PRETTY_FUNCTION__);
throw std::runtime_error{"Stream objects should not respond to RPC requests!"};
}

virtual void check_timeouts()
{
log::warning(log_cat, "{} called", __PRETTY_FUNCTION__);
throw std::runtime_error{"Stream objects do not have RPC requests to check timeouts!"};
}

/// Sends data in chunks: `next_chunk` is some callable (e.g. lambda) that will be called
/// with a const reference to the stream instance as needed to obtain the next chunk of data
/// until it returns an empty container, at which point `done(stream)` will be called.
Expand Down
4 changes: 2 additions & 2 deletions src/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -825,7 +825,7 @@ namespace oxen::quic

if (auto itr = stream_queue.find(id); itr != stream_queue.end())
{
log::critical(log_cat, "Taking ready stream from on deck and assigning stream ID {}!", id);
log::debug(log_cat, "Taking ready stream from on deck and assigning stream ID {}!", id);

auto& s = itr->second;
s->set_ready();
Expand Down Expand Up @@ -910,7 +910,7 @@ namespace oxen::quic
return 0;
}

log::critical(log_cat, "Stream (ID: {}) received data: {}", id, buffer_printer{data});
log::debug(log_cat, "Stream (ID: {}) received data: {}", id, buffer_printer{data});

bool good = false;
try
Expand Down

0 comments on commit b43aa48

Please sign in to comment.