Permalink
Browse files

This will do the right thing, way way messy in the end.

  • Loading branch information...
1 parent 6c65456 commit 3242da87adb1ce609578161e9ae8f349683c8882 @daicoden daicoden committed Nov 10, 2013
Showing with 204 additions and 25 deletions.
  1. +191 −19 assets/javascripts/vendor/peer.js
  2. +13 −6 views/test.jade
View
210 assets/javascripts/vendor/peer.js
@@ -729,7 +729,7 @@ function Reliable(dc, debug) {
if (!(this instanceof Reliable)) return new Reliable(dc);
this._dc = dc;
- util.debug = true;
+ util.debug = false;
this.DATA = 1;
this.ACK = 2;
@@ -741,52 +741,136 @@ function Reliable(dc, debug) {
this.PACKET_COUNT_IN_MESSAGE_FIELD = 4;
this.PACKET_DATA_FIELD = 5;
+ this.PACKETS_TO_RESEND_FIELD = 3;
+
// Messages sent/received so far.
// DATA
- // packetId: { 1, <messageId>, <messagePacketStart>, <messagePacketEnd>, <packetId>, <data> }
+ // packetId: { 1, <messageId>, <packetId>, <firstPacket>, <totalPacketsInMessage>, <data> }
// ACK
- // packetId: { 2, <packetId> }
+ // packetId: { 2, <messageId>, <packetId>, <numberToResend> }
this._outgoing = {};
+ this._ackOutgoing = {};
this._incoming = {};
// Window size.
this._window = 40;
// MTU.
this._mtu = 256;
// Max Chunk Size Without Yielding
- this._maxChunkSize = 10
+ //this._maxChunkSize = 10
// Interval for setInterval. In ms.
this._interval = 0;
+ this._ackInterval = 2;
+
+ // We could be a lot smarter and calculate it but this is here for now
+ this._packetCountToResend = 5;
this._nextPacketId = 0;
this._nextMessageId = 0;
+ this._ackedPacketId = -1;
+ this._ackedMessageId = -1;
+
+ this._receivedPacketId = -1;
+ this._receivedPacketCache = {}
+ this._receivedMessageId = -1;
+ this._resendTimeout = null;
+ this._resendInterval = 100;
+
+ this._ackQueue = []
+ this._ackTimeout = null;
+
+
+ //hacking now
+ this._stuckCount = 0;
+
// Outgoing message queue.
this._sendQueue = [];
this._setupDC();
- this._orderedDeliveryMessageId = 0;
+ this._active = false;
+
util.log('These settings mean the client can lag behind by ' + (60*500)/(Math.pow(2,10)) + "Kb")
};
+Reliable.prototype._deactivate = function() {
+ this._active = false;
+ if (this._resendTimeout) {
+ clearTimeout(this._resendTimeout);
+ this._resendTimeout = false
+ }
+}
+
+Reliable.prototype._activate = function() {
+ this._active = true;
+ var self = this;
+ //this._resendTimeout = setTimeout(function() { self._resend() }, this._resendInterval);
+}
+
+Reliable.prototype._resend = function() {
+ var queue = []
+ var self = this;
+ var message = this._outgoing[this._ackedMessageId];
+ var testPacket = message.packets[0];
+ var wantedPacketNumber = this._ackedPacketId - testPacket[this.FIRST_PACKET_ID_IN_MESSAGE_FIELD];
+ var totalPacketsInMessage = testPacket[this.PACKET_COUNT_IN_MESSAGE_FIELD] - wantedPacketNumber;
+
+ var toTake = 10;
+ if(totalPacketsInMessage >= 0) {
+ var subset = message.packets.slice(wantedPacketNumber, toTake + wantedPacketNumber)
+ for(i in subset) {
+ queue.push(subset[i])
+ }
+ }
+
+ var messageId = this._ackedMessageId;
+ while(queue.length < 10) {
+ messageId += 1
+
+ message = this._outgoing[messageId]
+ if(!message) { break }
+
+ var subset = message.packets.slice(0, 10 - queue.length)
+ for(i in subset) {
+ queue.push(subset[i])
+ }
+ }
+
+ console.log('WeStuck?, message: ' + this._ackedMessageId + ' packet: ' + this._ackedPacketId + ' resending')
+ this._enqueueForDelivery(queue);
+
+}
+
+Reliable.prototype._rescheduleResend = function() {
+ if(this._resendTimeout) {
+ clearTimeout(this._resendInterval)
+ }
+
+ var self = this;
+ //this._resendTimeout = setTimeout(function() { self._resend() }, this._resendInterval);
+}
+
////////////////////////////////////////////////////////////////////////////////
// Application Synchronous Sending
// Send a message reliably.
Reliable.prototype.send = function(msg) {
+ if(!this._active) {
+ this._activate();
+ }
// Determine if chunking is necessary.
var payload = util.pack(msg);
var self = this;
var messageId = this._queuePackets({expectAck: true}, function(messageId, firstPacketId) {
+ if (util.debug) {
+ console.time('SendMessage:' + messageId);
+ }
+
return self._buildPackets(payload, messageId, firstPacketId);
});
- if (util.debug) {
- console.time('SendMessage:' + messageId);
- }
-
// Send prelim window.
this._sendMessage(messageId);
@@ -799,10 +883,11 @@ Reliable.prototype.send = function(msg) {
// outgoing after delivery. This is used for resending missed packets
Reliable.prototype._queuePackets = function(opts, packetProvider) {
var messageId = this._nextMessageId;
- packets = packetProvider(messageId, this._nextPacketId);
+ var packets = packetProvider(messageId, this._nextPacketId);
this._outgoing[messageId] = {packets: packets, expectAck: (opts.expectAck || true)};
this._nextMessageId += 1;
- this._nextPacketId += packets[this.PACKET_COUNT_IN_MESSAGE_FIELD];
+ // Just use first packet for this
+ this._nextPacketId += packets[0][this.PACKET_COUNT_IN_MESSAGE_FIELD];
return messageId;
}
@@ -918,8 +1003,8 @@ Reliable.prototype._handleMessage = function(msg) {
switch (msg[this.MESSAGE_TYPE_FIELD]) {
case this.DATA:
var messageId = msg[this.MESSAGE_ID_FIELD];
- var messagePacketNumber = msg[this.PACKET_ID_FIELD] -
- msg[this.FIRST_PACKET_ID_IN_MESSAGE_FIELD];
+ var packetId = msg[this.PACKET_ID_FIELD];
+ var messagePacketNumber = packetId - msg[this.FIRST_PACKET_ID_IN_MESSAGE_FIELD];
var toReceive = this._incoming[messageId];
if (!toReceive) {
@@ -931,18 +1016,72 @@ Reliable.prototype._handleMessage = function(msg) {
toReceive.packets[messagePacketNumber] = msg;
} else{
// Duplicate
- // TODO figure out what to do about this
+ return;
}
if (toReceive.isReceived) {
+ if (util.debug) {
+ console.time('ReceivedMessage:' + packetId);
+ }
+ console.log('received ' + packetId + ' need ' + (this._receivedPacketId + 1))
+
this._collectDeliverableIds(messageId, function(deliveryId) {
self._assembleAndNotify(deliveryId);
delete self._incoming[deliveryId];
+
+ if (util.debug) {
+ console.timeEnd('ReceivedMessage:' + messageId);
+ }
})
}
+
+ if (this._receivedPacketId == packetId - 1 || this._receivedPacketCache[this._receivedPacketId]) {
+ this._receivedPacketId += 1;
+
+ while(this._receivedPacketCache[this._receivedPacketId]) {
+ delete this._receivedPacketCache[this._receivedPacketId];
+ this._receivedPacketId += 1;
+ }
+
+ this._scheduleAck(true, messageId, packetId);
+ } else {
+ this._receivedPacketCache[packetId] = true
+ this._scheduleAck(false, messageId, packetId);
+ }
+
break;
case this.ACK:
- TODO
+ var receivedMsg = msg[this.MESSAGE_ID_FIELD];
+ var receivedPacket = msg[this.PACKET_ID_FIELD];
+ var packetsToResend = msg[this.PACKETS_TO_RESEND_FIELD];
+
+ if (receivedPacket < this._ackedPacketId) {
+ return; //Old Ack
+ } else if (this._ackedPacketId == receivedPacket) {
+ // Might indicate the client missed a packet, scheduleResend
+ this._stuckCount += 1;
+ //TODO: Make this on a timeout when early errors break
+ if(this._stuckCount >= 20) {
+ this._resend()
+ this._stuckCount = 0
+ }
+ } else {
+ this._stuckCount = 0;
+ }
+ this._ackedPacketId = receivedPacket;
+
+ // There's a bug here - if we stop sending data we will never delete it from _outgoing...
+ // should fix this by adding totalPacketCount to the ack
+ while(this._ackedMessageId < receivedMsg - 1) {
+ delete this._outgoing[this._ackedMessageId];
+ this._ackedMessageId += 1;
+ }
+
+ if (this.receivedPacket == this.nextPacketId) {
+ // We have caught up
+ this._deactivate();
+ }
+
break;
default:
throw "Unknown Message Type: ", msg[this.MESSAGE_TYPE_FIELD]
@@ -960,15 +1099,48 @@ Reliable.prototype._newReceivable = function(packet) {
}
}
+//TODO: these +1 are here to keeep symetry with the receivedPacketId for acks.
+// We can definitly do this better
Reliable.prototype._collectDeliverableIds = function(receivedMessageId, deliverer) {
- while (this._incoming[this._orderedDeliveryMessageId] &&
- this._incoming[this._orderedDeliveryMessageId].isReceived()) {
+ while (this._incoming[this._receivedMessageId + 1] &&
+ this._incoming[this._receivedMessageId + 1].isReceived()) {
//TODO: Check the ordered preference from options
- deliverer(this._orderedDeliveryMessageId);
- this._orderedDeliveryMessageId += 1;
+ deliverer(this._receivedMessageId + 1);
+ this._receivedMessageId += 1;
+ console.log("Processing message " + (this._receivedMessageId) + ' need packet ' + (this._receivedPacketId + 1))
}
}
+Reliable.prototype._scheduleAck = function(success, messageId, packetId) {
+ var self = this;
+// if (success) {
+ // If are always wanting the next packet, but have the message we need
+ this._ackQueue.push([this.ACK, this._receivedMessageId, this._receivedPacketId + 1, 0]);
+// } else {
+// this._ackQueue.push([this.ACK, messageId, packetId, this._packetCountToResend]);
+// }
+
+ if (!this._ackTimeout) {
+ this._ackTimeout = setTimeout(function() {
+ var newQueue = [];
+ for (i in self._ackQueue) {
+ if (self._ackQueue[i][self.PACKET_ID_FIELD] < self.receivedPacketId) {
+ newQueue.push(self._ackQueue[i]);
+ } else {
+ newQueue.push([self.ACK, self._receivedMessageId, self._receivedPacketId, 0]);
+ }
+ }
+
+ for(var i = 0; i < newQueue.length; i += self._window) {
+ self._enqueueForDelivery(newQueue.slice(i, i + self._window));
+ }
+
+ delete self._ackQueue;
+ self._ackQueue = [];
+ self._ackTimeout = null;
+ }, this._ackInterval)
+ }
+}
// Sends ACK N, expecting Nth blob chunk for message ID.
Reliable.prototype._ack = function(id) {
View
19 views/test.jade
@@ -4,14 +4,21 @@ block script
script
S.pair(1, function(socket, master) {
conn = socket.conn
- conn.on('data', function(data) { console.log(data) });
+ count = 0
+ conn.on('data', function(data) {
+ if(data.n != count) { throw "Holy SHit" }
+ count += 1
+ console.log(data)
+ });
if (master) {
- for (var i = 0; i < 100; i++)
+ //for (var i = 0; i < 1; i++)
+ // conn.send({ n: i, payload: Array(250000) });
+ // for (var i = 0; i < 2; i++)
+ // conn.send({ n: i, payload: Array(250000) });
+ console.time("Here we go")
+ for (var i = 0; i < 1000; i++)
conn.send({ n: i, payload: Array(1) });
- for (var i = 0; i < 2; i++)
- conn.send({ n: i, payload: Array(250000) });
- for (var i = 0; i < 100; i++)
- conn.send({ n: i, payload: Array(2) });
+ console.timeEnd("Here we go")
}
});

0 comments on commit 3242da8

Please sign in to comment.