Skip to content

Commit

Permalink
nasdaq/moldudp64: Sequence number gap handling
Browse files Browse the repository at this point in the history
  • Loading branch information
penberg committed May 23, 2016
1 parent ebbf89c commit 78f976c
Showing 1 changed file with 51 additions and 8 deletions.
59 changes: 51 additions & 8 deletions include/helix/nasdaq/moldudp64.hh
Expand Up @@ -16,15 +16,24 @@
#include "helix/helix.hh"
#include "helix/net.hh"

#include <experimental/optional>

namespace helix {

namespace nasdaq {

enum moldudp64_state {
synchronized,
gap_fill,
};

template<typename Handler>
class moldudp64_session : public session {
Handler _handler;
send_callback _send_cb;
uint64_t _seq_num;
uint64_t _expected_seq_no = 1;
moldudp64_state _state = moldudp64_state::synchronized;
std::experimental::optional<uint64_t> _sync_to_seq_no;
public:
explicit moldudp64_session(void *data);

Expand All @@ -37,12 +46,14 @@ public:
virtual void set_send_callback(send_callback callback) override;

virtual size_t process_packet(const net::packet_view& packet) override;

private:
void retransmit_request(uint64_t seq_no, uint64_t expected_seq_no);
};

template<typename Handler>
moldudp64_session<Handler>::moldudp64_session(void* data)
: session{data}
, _seq_num{1}
{
}

Expand Down Expand Up @@ -78,24 +89,56 @@ size_t moldudp64_session<Handler>::process_packet(const net::packet_view& packet
assert(packet.len() >= sizeof(moldudp64_header));

auto* header = packet.cast<moldudp64_header>();
auto sequence_number = be64toh(header->SequenceNumber);
if (sequence_number != _seq_num) {
throw std::runtime_error(std::string("invalid sequence number: ") + std::to_string(sequence_number) + ", expected: " + std::to_string(_seq_num));
auto recv_seq_no = be64toh(header->SequenceNumber);
if (recv_seq_no < _expected_seq_no) {
return packet.len();
}
if (_expected_seq_no < recv_seq_no) {
_sync_to_seq_no = recv_seq_no;
if (_state == moldudp64_state::synchronized) {
_state = moldudp64_state::gap_fill;
retransmit_request(recv_seq_no, _expected_seq_no);
}
return packet.len();
}
p += sizeof(moldudp64_header);

for (int i = 0; i < be16toh(header->MessageCount); i++) {
auto* msg_block = reinterpret_cast<const moldudp64_message_block*>(p);
p += sizeof(moldudp64_message_block);
auto message_length = be16toh(msg_block->MessageLength);
_handler.process_packet(net::packet_view{p, message_length});
p += message_length;
_seq_num++;
_expected_seq_no++;
}
if (_state == moldudp64_state::gap_fill) {
if (_expected_seq_no >= _sync_to_seq_no.value()) {
_state = moldudp64_state::synchronized;
_sync_to_seq_no = std::experimental::nullopt;
} else {
retransmit_request(recv_seq_no, _expected_seq_no);
}
}

return p - packet.buf();
}

template<typename Handler>
void moldudp64_session<Handler>::retransmit_request(uint64_t seq_no, uint64_t expected_seq_no)
{
if (!bool(_send_cb)) {
throw std::runtime_error(std::string("invalid sequence number: ") + std::to_string(seq_no) + ", expected: " + std::to_string(expected_seq_no));
}
uint64_t message_count = 0xfffe;

moldudp64_request_packet request_packet;
request_packet.SequenceNumber = htobe64(expected_seq_no);
request_packet.MessageCount = htobe16(message_count);

char *base = reinterpret_cast<char*>(&request_packet);
size_t len = sizeof(request_packet);

_send_cb(base, len);
}

}

}

0 comments on commit 78f976c

Please sign in to comment.