diff --git a/src/network/udp_queue.cpp b/src/network/udp_queue.cpp index 5786b64a..b2ea14f0 100644 --- a/src/network/udp_queue.cpp +++ b/src/network/udp_queue.cpp @@ -64,7 +64,7 @@ namespace fire const size_t CHUNK_BASE = CHUNK_TOTAL_BASE + sizeof(chunk_total_type); const size_t MESSAGE_BASE = CHUNK_BASE + sizeof(chunk_id_type); - // + // const size_t HEADER_SIZE = MESSAGE_BASE; const size_t UDP_CHuNK_SIZE = UDP_PACKET_SIZE - HEADER_SIZE; //in bytes @@ -104,7 +104,7 @@ namespace fire _writing = false; } - void udp_connection::init_working(chunk& proto, util::bytes& data) + void udp_connection::init_working(message_chunk& proto, util::bytes& data) { REQUIRE_GREATER(proto.total_chunks, 0); @@ -136,23 +136,23 @@ namespace fire return wm.sent.count() == wm.proto.total_chunks; } - void udp_connection::sent_chunk(const chunk& c) + void udp_connection::sent_chunk(const message_chunk& c) { - REQUIRE(c.type != chunk::ack); + REQUIRE(c.type != message_chunk::ack); REQUIRE_FALSE(c.resent); auto& wm = _out_working[c.sequence]; wm.sent[c.chunk] = 1; if(wm.queued > 0) wm.queued--; - bool robust = wm.proto.type == chunk::msg; + bool robust = wm.proto.type == message_chunk::msg; if(robust) wm.in_flight++; else if(all_sent(wm)) cleanup_message(c.sequence); } - void udp_connection::validate_chunk(const chunk& c) + void udp_connection::validate_chunk(const message_chunk& c) { - REQUIRE(c.type == chunk::ack); + REQUIRE(c.type == message_chunk::ack); auto& w = _out_working; @@ -183,14 +183,14 @@ namespace fire cleanup_message(sequence_n); } - void udp_connection::send_right_away(chunk& c) + void udp_connection::send_right_away(message_chunk& c) { _out_queue.emplace_push(c); post_send(); } - chunk nth_chunk(size_t n, const chunk& prototype, const util::bytes& data) + message_chunk nth_chunk(size_t n, const message_chunk& prototype, const util::bytes& data) { REQUIRE_LESS(n, prototype.total_chunks); @@ -200,7 +200,7 @@ namespace fire CHECK_GREATER(size, 0); - chunk c = prototype; + message_chunk c = prototype; c.chunk = n; c.write_size = size; c.write_data = data.data() + start; @@ -211,10 +211,10 @@ namespace fire return c; } - bool udp_connection::get_next_chunk(working_message& wm, chunk& queued_chunk) + bool udp_connection::get_next_chunk(working_message& wm, message_chunk& queued_chunk) { REQUIRE_GREATER(wm.proto.total_chunks, 0); - bool robust = wm.proto.type == chunk::msg; + bool robust = wm.proto.type == message_chunk::msg; auto in_flight = robust ? wm.in_flight : 0; @@ -260,7 +260,7 @@ namespace fire if(!incr_next_message()) return; auto end = _next_message; - chunk c; + message_chunk c; chunk_id_type resend_id; do { @@ -275,7 +275,7 @@ namespace fire //check to see if there are resends else if(r.resends.pop(resend_id)) { - chunk mc = nth_chunk(resend_id, wm.proto, wm.data); + message_chunk mc = nth_chunk(resend_id, wm.proto, wm.data); mc.resent = true; _out_queue.emplace_push(mc); break; @@ -296,11 +296,11 @@ namespace fire return r; } - chunk create_prototype(sequence_type sequence, const endpoint_message& m) + message_chunk create_prototype(sequence_type sequence, const endpoint_message& m) { - chunk c; + message_chunk c; c.valid = true; - c.type = m.robust ? chunk::msg : chunk::qmsg; + c.type = m.robust ? message_chunk::msg : message_chunk::qmsg; c.host = m.ep.address; c.port = m.ep.port; c.sequence = sequence; @@ -322,7 +322,7 @@ namespace fire //update sequence _sequence++; - chunk proto = create_prototype(_sequence, m); + message_chunk proto = create_prototype(_sequence, m); init_working(proto, m.data); } @@ -448,16 +448,16 @@ namespace fire v = (v2 << 8) | v1; } - void encode_udp_wire(u::bytes& r, const chunk& ch) + void encode_udp_wire(u::bytes& r, const message_chunk& ch) { r.resize(HEADER_SIZE + ch.write_size); //set mark switch(ch.type) { - case chunk::msg: r[0] = '!'; break; - case chunk::qmsg: r[0] = '='; break; - case chunk::ack: r[0] = '@'; break; + case message_chunk::msg: r[0] = '!'; break; + case message_chunk::qmsg: r[0] = '='; break; + case message_chunk::ack: r[0] = '@'; break; default: CHECK(false && "missed case"); } @@ -467,7 +467,7 @@ namespace fire //write total chunks write_be_u16(r, CHUNK_TOTAL_BASE, ch.total_chunks); - //write chunk number + //write message_chunk number write_be_u16(r, CHUNK_BASE, ch.chunk); //write message @@ -475,20 +475,20 @@ namespace fire std::copy(ch.write_data, ch.write_data + ch.write_size, r.begin() + MESSAGE_BASE); } - chunk decode_udp_wire(const u::bytes& b) + message_chunk decode_udp_wire(const u::bytes& b) { REQUIRE_GREATER_EQUAL(b.size(), HEADER_SIZE); - chunk ch; + message_chunk ch; ch.valid = false; //read mark const char mark = b[0]; switch(mark) { - case '!': ch.type = chunk::msg; break; - case '=': ch.type = chunk::qmsg; break; - case '@': ch.type = chunk::ack; break; + case '!': ch.type = message_chunk::msg; break; + case '=': ch.type = message_chunk::qmsg; break; + case '@': ch.type = message_chunk::ack; break; default: return ch; } @@ -504,7 +504,7 @@ namespace fire //total_chunks should be a unsigned short CHECK_LESS_EQUAL(ch.total_chunks, MAX_CHUNKS); - //read chunk number + //read message_chunk number if(b.size() < CHUNK_BASE + sizeof(chunk_id_type)) return ch; read_be_u16(b, CHUNK_BASE, ch.chunk); @@ -532,21 +532,21 @@ namespace fire if(_out_queue.empty()) return; //encode bytes to wire format - chunk chunk; - bool got = _out_queue.pop(chunk); + message_chunk message_chunk; + bool got = _out_queue.pop(message_chunk); CHECK(got); - encode_udp_wire(_out_buffer, chunk); + encode_udp_wire(_out_buffer, message_chunk); _stats.bytes_sent += _out_buffer.size(); - //async send chunk - udp::endpoint ep(address::from_string(chunk.host), chunk.port); + //async send message_chunk + udp::endpoint ep(address::from_string(message_chunk.host), message_chunk.port); _socket->async_send_to(ba::buffer(_out_buffer.data(), _out_buffer.size()), ep, boost::bind(&udp_connection::handle_write, this, ba::placeholders::error)); //ignore acks or resends - if(!chunk.resent && chunk.type != chunk::ack) - sent_chunk(chunk); + if(!message_chunk.resent && message_chunk.type != message_chunk::ack) + sent_chunk(message_chunk); } @@ -580,9 +580,9 @@ namespace fire boost::asio::placeholders::bytes_transferred)); } - bool insert_chunk(const chunk& c, working_messages& w, u::bytes& complete_message) + bool insert_chunk(const message_chunk& c, working_messages& w, u::bytes& complete_message) { - REQUIRE(c.type == chunk::msg || c.type == chunk::qmsg); + REQUIRE(c.type == message_chunk::msg || c.type == message_chunk::qmsg); if(c.total_chunks == 0) return false; auto& wm = w[c.sequence]; @@ -606,7 +606,7 @@ namespace fire if(c.total_chunks != wm.proto.total_chunks) return false; if(wm.set[chunk_n]) return false; - //potentially resize if we get the last chunk + //potentially resize if we get the last message_chunk if(c.chunk == wm.proto.total_chunks - 1) { auto extra = UDP_CHuNK_SIZE - c.data.size(); @@ -615,7 +615,7 @@ namespace fire CHECK_GREATER_EQUAL(extra, 0); wm.data.resize(wm.data.size() - extra); } - //only the last chunk can be less than the UDP_CHUNK_SIZE. Otherwise something is wrong + //only the last message_chunk can be less than the UDP_CHUNK_SIZE. Otherwise something is wrong else if(c.data.size() != UDP_CHuNK_SIZE) return false; const size_t insert_spot = c.chunk * UDP_CHuNK_SIZE; @@ -651,7 +651,7 @@ namespace fire _stats.bytes_recv += transferred; //decode message - chunk c; + message_chunk c; if(_work_buffer.size() >= HEADER_SIZE) c = decode_udp_wire(_work_buffer); @@ -661,13 +661,13 @@ namespace fire //add message to in queue if we got complete message endpoint ep = { UDP, _in_endpoint.address().to_string(), _in_endpoint.port()}; - if(c.type != chunk::ack) + if(c.type != message_chunk::ack) { - const bool robust = c.type == chunk::msg; + const bool robust = c.type == message_chunk::msg; if(robust) { - chunk ack; - ack.type = chunk::ack; + message_chunk ack; + ack.type = message_chunk::ack; ack.host = ep.address; ack.port = ep.port; ack.sequence = c.sequence; @@ -679,9 +679,9 @@ namespace fire send_right_away(ack); } - //insert chunk to message buffer + //insert message_chunk to message buffer bool inserted = insert_chunk(c, _in_working, _work_buffer); - //chunk is no longer valid after insert_chunk call because a move is done. + //message_chunk is no longer valid after insert_chunk call because a move is done. if(inserted) { @@ -743,7 +743,7 @@ namespace fire //check if we should try to resend bool skip = wm.ticks <= RESEND_TICK_THRESHOLD; - bool robust = wm.proto.type == chunk::msg; + bool robust = wm.proto.type == message_chunk::msg; //walk working message and resend all chunks that never got //acked diff --git a/src/network/udp_queue.hpp b/src/network/udp_queue.hpp index 180488c4..9b79365f 100644 --- a/src/network/udp_queue.hpp +++ b/src/network/udp_queue.hpp @@ -58,7 +58,7 @@ namespace fire using chunk_total_type = uint16_t; using chunk_id_type = uint16_t; - struct chunk + struct message_chunk { bool valid = false; std::string host; @@ -78,7 +78,7 @@ namespace fire struct working_message { - chunk proto; + message_chunk proto; util::bytes data; boost::dynamic_bitset<> set; boost::dynamic_bitset<> sent; @@ -110,7 +110,7 @@ namespace fire size_t bytes_recv = 0; }; - using chunk_queue = util::queue; + using chunk_queue = util::queue; class udp_queue; class udp_connection @@ -134,15 +134,15 @@ namespace fire private: void add_to_working_set(endpoint_message m); - void init_working(chunk& proto, util::bytes& data); - void send_right_away(chunk& c); - bool get_next_chunk(working_message&, chunk& queued_chunk); + void init_working(message_chunk& proto, util::bytes& data); + void send_right_away(message_chunk& c); + bool get_next_chunk(working_message&, message_chunk& queued_chunk); void cleanup_message(sequence_type sequence); - void validate_chunk(const chunk& c); + void validate_chunk(const message_chunk& c); void queue_resend(message_ring_item&, chunk_id_type c); void queue_next_chunk(); bool incr_next_message(); - void sent_chunk(const chunk& c); + void sent_chunk(const message_chunk& c); size_t resend(message_ring_item&); void resend(); void post_send();