Skip to content

Commit

Permalink
Always relay messages in batches
Browse files Browse the repository at this point in the history
  • Loading branch information
msgmaxim committed Oct 2, 2019
1 parent 4df8b0d commit ce63c08
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 40 deletions.
70 changes: 32 additions & 38 deletions httpserver/service_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,8 @@ ServiceNode::ServiceNode(boost::asio::io_context& ioc,
swarm_update_timer_(ioc), lokid_ping_timer_(ioc),
stats_cleanup_timer_(ioc), pow_update_timer_(worker_ioc),
check_version_timer_(worker_ioc), peer_ping_timer_(ioc),
lokid_key_pair_(lokid_key_pair), lokid_client_(lokid_client),
force_start_(force_start) {
relay_timer_(ioc), lokid_key_pair_(lokid_key_pair),
lokid_client_(lokid_client), force_start_(force_start) {

char buf[64] = {0};
if (char const* dest =
Expand Down Expand Up @@ -487,36 +487,6 @@ void ServiceNode::reset_listeners() {
pk_to_listeners.clear();
}

/// initiate a /swarms/push request
void ServiceNode::push_message(const message_t& msg) {

if (!swarm_)
return;

const auto& others = swarm_->other_nodes();

LOKI_LOG(trace, "push_message to {} other nodes", others.size());

std::string body;
serialize_message(body, msg);

#ifndef DISABLE_SNODE_SIGNATURE
const auto hash = hash_data(body);
const auto signature = generate_signature(hash, lokid_key_pair_);
#endif

auto req = make_push_request(std::move(body));

#ifndef DISABLE_SNODE_SIGNATURE
attach_signature(req, signature);
#endif

for (const auto& address : others) {
/// send a request asynchronously
relay_data_reliable(req, address);
}
}

/// do this asynchronously on a different thread? (on the same thread?)
bool ServiceNode::process_store(const message_t& msg) {

Expand All @@ -529,12 +499,12 @@ bool ServiceNode::process_store(const message_t& msg) {

all_stats_.bump_store_requests();

/// store to the database
save_if_new(msg);
/// store in the database
this->save_if_new(msg);

/// initiate a /swarms/push request;
/// (done asynchronously)
this->push_message(msg);
// Instead of sending the messages immediatly, store them in a buffer
// and periodically send all messages from there as batches
this->relay_buffer_.push_back(msg);

return true;
}
Expand Down Expand Up @@ -630,6 +600,12 @@ void ServiceNode::on_swarm_update(const block_update_t& bu) {
static bool active = false;
if (!active) {
LOKI_LOG(info, "Storage server is now active!");

constexpr std::chrono::milliseconds RELAY_INTERVAL = 200ms;
relay_timer_.expires_after(RELAY_INTERVAL);
relay_timer_.async_wait(
boost::bind(&ServiceNode::relay_buffered_messages, this));

active = true;
}
}
Expand All @@ -652,6 +628,23 @@ void ServiceNode::on_swarm_update(const block_update_t& bu) {
initiate_peer_test();
}

void ServiceNode::relay_buffered_messages() {

// Should we wait for the response first?
constexpr std::chrono::milliseconds RELAY_INTERVAL = 500ms;
relay_timer_.expires_after(RELAY_INTERVAL);
relay_timer_.async_wait(
boost::bind(&ServiceNode::relay_buffered_messages, this));

if (relay_buffer_.empty())
return;

LOKI_LOG(debug, "Relaying {} messages from buffer", relay_buffer_.size());

this->relay_messages(relay_buffer_, swarm_->other_nodes());
relay_buffer_.clear();
}

void ServiceNode::check_version_timer_tick() {

check_version_timer_.expires_after(VERSION_CHECK_INTERVAL);
Expand Down Expand Up @@ -1480,7 +1473,8 @@ void ServiceNode::bootstrap_swarms(
}
}

void ServiceNode::relay_messages(const std::vector<storage::Item>& messages,
template <typename Message>
void ServiceNode::relay_messages(const std::vector<Message>& messages,
const std::vector<sn_record_t>& snodes) const {
std::vector<std::string> data = serialize_messages(messages);

Expand Down
12 changes: 10 additions & 2 deletions httpserver/service_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,19 @@ class ServiceNode {

boost::asio::steady_timer peer_ping_timer_;

/// Used to periodially send messages from relay_buffer_
boost::asio::steady_timer relay_timer_;

/// map pubkeys to a list of connections to be notified
std::unordered_map<pub_key_t, listeners_t> pk_to_listeners;

loki::lokid_key_pair_t lokid_key_pair_;

reachability_records_t reach_records_;

void push_message(const message_t& msg);
/// Container for recently received messages directly from
/// clients;
std::vector<message_t> relay_buffer_;

void save_if_new(const message_t& msg);

Expand Down Expand Up @@ -168,7 +173,8 @@ class ServiceNode {
void relay_data_reliable(const std::shared_ptr<request_t>& req,
const sn_record_t& address) const;

void relay_messages(const std::vector<storage::Item>& messages,
template <typename Message>
void relay_messages(const std::vector<Message>& messages,
const std::vector<sn_record_t>& snodes) const;

/// Request swarm structure from the deamon and reset the timer
Expand All @@ -178,6 +184,8 @@ class ServiceNode {

void ping_peers_tick();

void relay_buffered_messages();

/// Check the latest version from DNS text record
void check_version_timer_tick();
/// Update PoW difficulty from DNS text record
Expand Down

0 comments on commit ce63c08

Please sign in to comment.