Skip to content

Commit

Permalink
Fixing windows build
Browse files Browse the repository at this point in the history
Ignore-this: 28b40178c845411b3bd12384a7bc21f0

darcs-hash:6150b040343e894462d1179a37176c61240ed946
  • Loading branch information
mempko committed Aug 11, 2015
1 parent 1db7fa6 commit 25292f0
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 56 deletions.
96 changes: 48 additions & 48 deletions src/network/udp_queue.cpp
Expand Up @@ -64,7 +64,7 @@ namespace fire
const size_t CHUNK_BASE = CHUNK_TOTAL_BASE + sizeof(chunk_total_type);
const size_t MESSAGE_BASE = CHUNK_BASE + sizeof(chunk_id_type);

//<mark> <sequence num> <chunk total> <chunk>
//<mark> <sequence num> <message_chunk total> <message_chunk>
const size_t HEADER_SIZE = MESSAGE_BASE;
const size_t UDP_CHuNK_SIZE = UDP_PACKET_SIZE - HEADER_SIZE; //in bytes

Expand Down Expand Up @@ -104,7 +104,7 @@ namespace fire
_writing = false;
}

void udp_connection::init_working(chunk& proto, util::bytes& data)
void udp_connection::init_working(message_chunk& proto, util::bytes& data)
{
REQUIRE_GREATER(proto.total_chunks, 0);

Expand Down Expand Up @@ -136,23 +136,23 @@ namespace fire
return wm.sent.count() == wm.proto.total_chunks;
}

void udp_connection::sent_chunk(const chunk& c)
void udp_connection::sent_chunk(const message_chunk& c)
{
REQUIRE(c.type != chunk::ack);
REQUIRE(c.type != message_chunk::ack);
REQUIRE_FALSE(c.resent);

auto& wm = _out_working[c.sequence];
wm.sent[c.chunk] = 1;
if(wm.queued > 0) wm.queued--;

bool robust = wm.proto.type == chunk::msg;
bool robust = wm.proto.type == message_chunk::msg;
if(robust) wm.in_flight++;
else if(all_sent(wm)) cleanup_message(c.sequence);
}

void udp_connection::validate_chunk(const chunk& c)
void udp_connection::validate_chunk(const message_chunk& c)
{
REQUIRE(c.type == chunk::ack);
REQUIRE(c.type == message_chunk::ack);

auto& w = _out_working;

Expand Down Expand Up @@ -183,14 +183,14 @@ namespace fire
cleanup_message(sequence_n);
}

void udp_connection::send_right_away(chunk& c)
void udp_connection::send_right_away(message_chunk& c)
{
_out_queue.emplace_push(c);
post_send();
}


chunk nth_chunk(size_t n, const chunk& prototype, const util::bytes& data)
message_chunk nth_chunk(size_t n, const message_chunk& prototype, const util::bytes& data)
{
REQUIRE_LESS(n, prototype.total_chunks);

Expand All @@ -200,7 +200,7 @@ namespace fire

CHECK_GREATER(size, 0);

chunk c = prototype;
message_chunk c = prototype;
c.chunk = n;
c.write_size = size;
c.write_data = data.data() + start;
Expand All @@ -211,10 +211,10 @@ namespace fire
return c;
}

bool udp_connection::get_next_chunk(working_message& wm, chunk& queued_chunk)
bool udp_connection::get_next_chunk(working_message& wm, message_chunk& queued_chunk)
{
REQUIRE_GREATER(wm.proto.total_chunks, 0);
bool robust = wm.proto.type == chunk::msg;
bool robust = wm.proto.type == message_chunk::msg;

auto in_flight = robust ? wm.in_flight : 0;

Expand Down Expand Up @@ -260,7 +260,7 @@ namespace fire
if(!incr_next_message()) return;
auto end = _next_message;

chunk c;
message_chunk c;
chunk_id_type resend_id;
do
{
Expand All @@ -275,7 +275,7 @@ namespace fire
//check to see if there are resends
else if(r.resends.pop(resend_id))
{
chunk mc = nth_chunk(resend_id, wm.proto, wm.data);
message_chunk mc = nth_chunk(resend_id, wm.proto, wm.data);
mc.resent = true;
_out_queue.emplace_push(mc);
break;
Expand All @@ -296,11 +296,11 @@ namespace fire
return r;
}

chunk create_prototype(sequence_type sequence, const endpoint_message& m)
message_chunk create_prototype(sequence_type sequence, const endpoint_message& m)
{
chunk c;
message_chunk c;
c.valid = true;
c.type = m.robust ? chunk::msg : chunk::qmsg;
c.type = m.robust ? message_chunk::msg : message_chunk::qmsg;
c.host = m.ep.address;
c.port = m.ep.port;
c.sequence = sequence;
Expand All @@ -322,7 +322,7 @@ namespace fire
//update sequence
_sequence++;

chunk proto = create_prototype(_sequence, m);
message_chunk proto = create_prototype(_sequence, m);
init_working(proto, m.data);
}

Expand Down Expand Up @@ -448,16 +448,16 @@ namespace fire
v = (v2 << 8) | v1;
}

void encode_udp_wire(u::bytes& r, const chunk& ch)
void encode_udp_wire(u::bytes& r, const message_chunk& ch)
{
r.resize(HEADER_SIZE + ch.write_size);

//set mark
switch(ch.type)
{
case chunk::msg: r[0] = '!'; break;
case chunk::qmsg: r[0] = '='; break;
case chunk::ack: r[0] = '@'; break;
case message_chunk::msg: r[0] = '!'; break;
case message_chunk::qmsg: r[0] = '='; break;
case message_chunk::ack: r[0] = '@'; break;
default: CHECK(false && "missed case");
}

Expand All @@ -467,28 +467,28 @@ namespace fire
//write total chunks
write_be_u16(r, CHUNK_TOTAL_BASE, ch.total_chunks);

//write chunk number
//write message_chunk number
write_be_u16(r, CHUNK_BASE, ch.chunk);

//write message
if(ch.write_size > 0 && ch.write_data != nullptr)
std::copy(ch.write_data, ch.write_data + ch.write_size, r.begin() + MESSAGE_BASE);
}

chunk decode_udp_wire(const u::bytes& b)
message_chunk decode_udp_wire(const u::bytes& b)
{
REQUIRE_GREATER_EQUAL(b.size(), HEADER_SIZE);

chunk ch;
message_chunk ch;
ch.valid = false;

//read mark
const char mark = b[0];
switch(mark)
{
case '!': ch.type = chunk::msg; break;
case '=': ch.type = chunk::qmsg; break;
case '@': ch.type = chunk::ack; break;
case '!': ch.type = message_chunk::msg; break;
case '=': ch.type = message_chunk::qmsg; break;
case '@': ch.type = message_chunk::ack; break;
default: return ch;
}

Expand All @@ -504,7 +504,7 @@ namespace fire
//total_chunks should be a unsigned short
CHECK_LESS_EQUAL(ch.total_chunks, MAX_CHUNKS);

//read chunk number
//read message_chunk number
if(b.size() < CHUNK_BASE + sizeof(chunk_id_type)) return ch;
read_be_u16(b, CHUNK_BASE, ch.chunk);

Expand Down Expand Up @@ -532,21 +532,21 @@ namespace fire
if(_out_queue.empty()) return;

//encode bytes to wire format
chunk chunk;
bool got = _out_queue.pop(chunk);
message_chunk message_chunk;
bool got = _out_queue.pop(message_chunk);
CHECK(got);

encode_udp_wire(_out_buffer, chunk);
encode_udp_wire(_out_buffer, message_chunk);
_stats.bytes_sent += _out_buffer.size();

//async send chunk
udp::endpoint ep(address::from_string(chunk.host), chunk.port);
//async send message_chunk
udp::endpoint ep(address::from_string(message_chunk.host), message_chunk.port);
_socket->async_send_to(ba::buffer(_out_buffer.data(), _out_buffer.size()), ep,
boost::bind(&udp_connection::handle_write, this, ba::placeholders::error));

//ignore acks or resends
if(!chunk.resent && chunk.type != chunk::ack)
sent_chunk(chunk);
if(!message_chunk.resent && message_chunk.type != message_chunk::ack)
sent_chunk(message_chunk);

}

Expand Down Expand Up @@ -580,9 +580,9 @@ namespace fire
boost::asio::placeholders::bytes_transferred));
}

bool insert_chunk(const chunk& c, working_messages& w, u::bytes& complete_message)
bool insert_chunk(const message_chunk& c, working_messages& w, u::bytes& complete_message)
{
REQUIRE(c.type == chunk::msg || c.type == chunk::qmsg);
REQUIRE(c.type == message_chunk::msg || c.type == message_chunk::qmsg);
if(c.total_chunks == 0) return false;

auto& wm = w[c.sequence];
Expand All @@ -606,7 +606,7 @@ namespace fire
if(c.total_chunks != wm.proto.total_chunks) return false;
if(wm.set[chunk_n]) return false;

//potentially resize if we get the last chunk
//potentially resize if we get the last message_chunk
if(c.chunk == wm.proto.total_chunks - 1)
{
auto extra = UDP_CHuNK_SIZE - c.data.size();
Expand All @@ -615,7 +615,7 @@ namespace fire
CHECK_GREATER_EQUAL(extra, 0);
wm.data.resize(wm.data.size() - extra);
}
//only the last chunk can be less than the UDP_CHUNK_SIZE. Otherwise something is wrong
//only the last message_chunk can be less than the UDP_CHUNK_SIZE. Otherwise something is wrong
else if(c.data.size() != UDP_CHuNK_SIZE) return false;

const size_t insert_spot = c.chunk * UDP_CHuNK_SIZE;
Expand Down Expand Up @@ -651,7 +651,7 @@ namespace fire
_stats.bytes_recv += transferred;

//decode message
chunk c;
message_chunk c;

if(_work_buffer.size() >= HEADER_SIZE)
c = decode_udp_wire(_work_buffer);
Expand All @@ -661,13 +661,13 @@ namespace fire
//add message to in queue if we got complete message
endpoint ep = { UDP, _in_endpoint.address().to_string(), _in_endpoint.port()};

if(c.type != chunk::ack)
if(c.type != message_chunk::ack)
{
const bool robust = c.type == chunk::msg;
const bool robust = c.type == message_chunk::msg;
if(robust)
{
chunk ack;
ack.type = chunk::ack;
message_chunk ack;
ack.type = message_chunk::ack;
ack.host = ep.address;
ack.port = ep.port;
ack.sequence = c.sequence;
Expand All @@ -679,9 +679,9 @@ namespace fire
send_right_away(ack);
}

//insert chunk to message buffer
//insert message_chunk to message buffer
bool inserted = insert_chunk(c, _in_working, _work_buffer);
//chunk is no longer valid after insert_chunk call because a move is done.
//message_chunk is no longer valid after insert_chunk call because a move is done.

if(inserted)
{
Expand Down Expand Up @@ -743,7 +743,7 @@ namespace fire

//check if we should try to resend
bool skip = wm.ticks <= RESEND_TICK_THRESHOLD;
bool robust = wm.proto.type == chunk::msg;
bool robust = wm.proto.type == message_chunk::msg;

//walk working message and resend all chunks that never got
//acked
Expand Down
16 changes: 8 additions & 8 deletions src/network/udp_queue.hpp
Expand Up @@ -58,7 +58,7 @@ namespace fire
using chunk_total_type = uint16_t;
using chunk_id_type = uint16_t;

struct chunk
struct message_chunk
{
bool valid = false;
std::string host;
Expand All @@ -78,7 +78,7 @@ namespace fire

struct working_message
{
chunk proto;
message_chunk proto;
util::bytes data;
boost::dynamic_bitset<> set;
boost::dynamic_bitset<> sent;
Expand Down Expand Up @@ -110,7 +110,7 @@ namespace fire
size_t bytes_recv = 0;
};

using chunk_queue = util::queue<chunk>;
using chunk_queue = util::queue<message_chunk>;

class udp_queue;
class udp_connection
Expand All @@ -134,15 +134,15 @@ namespace fire

private:
void add_to_working_set(endpoint_message m);
void init_working(chunk& proto, util::bytes& data);
void send_right_away(chunk& c);
bool get_next_chunk(working_message&, chunk& queued_chunk);
void init_working(message_chunk& proto, util::bytes& data);
void send_right_away(message_chunk& c);
bool get_next_chunk(working_message&, message_chunk& queued_chunk);
void cleanup_message(sequence_type sequence);
void validate_chunk(const chunk& c);
void validate_chunk(const message_chunk& c);
void queue_resend(message_ring_item&, chunk_id_type c);
void queue_next_chunk();
bool incr_next_message();
void sent_chunk(const chunk& c);
void sent_chunk(const message_chunk& c);
size_t resend(message_ring_item&);
void resend();
void post_send();
Expand Down

0 comments on commit 25292f0

Please sign in to comment.