Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add send callback feature + change README sample #63

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 0 additions & 4 deletions .gitignore

This file was deleted.

4 changes: 3 additions & 1 deletion README.md
Expand Up @@ -17,7 +17,9 @@ var engine = require('engine.io')
, server = engine.listen(80)

server.on('connection', function (socket) {
socket.send('utf 8 string');
socket.send('utf 8 string', function(transport){
//the message was sent successfully
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we want to show this so prominently. It's kindof an advanced-ish feature

});
});
```

Expand Down
42 changes: 37 additions & 5 deletions lib/socket.js
Expand Up @@ -23,7 +23,8 @@ function Socket (id, server, transport) {
this.upgraded = false;
this.readyState = 'opening';
this.writeBuffer = [];

this.packetSeq = 0;
this.packetsFn = {};
this.setTransport(transport);
this.onOpen();
}
Expand Down Expand Up @@ -95,6 +96,8 @@ Socket.prototype.onPacket = function (packet) {
};

Socket.prototype.onError = function (err) {
this.packetsFn = {};
this.packetSeq = 0;
debug('transport error');
this.onClose('transport error', err);
};
Expand Down Expand Up @@ -132,6 +135,8 @@ Socket.prototype.setTransport = function (transport) {
this.transport.on('packet', this.onPacket.bind(this));
this.transport.on('drain', this.flush.bind(this));
this.transport.once('close', this.onClose.bind(this, 'transport close'));
//this function will manage packet events (also message callbacks)
this.setupSendCallback();
};

/**
Expand Down Expand Up @@ -207,22 +212,43 @@ Socket.prototype.clearTransport = function () {

Socket.prototype.onClose = function (reason, description) {
if ('closed' != this.readyState) {
this.packetsFn = {};
this.packetSeq = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bad indentation

this.clearTransport();
this.readyState = 'closed';
this.emit('close', reason, description);
}
};

/**
* Setup and manage send callback
*
* @api private
*/

Socket.prototype.setupSendCallback = function () {
var self = this;
//the message was sent successfully, execute the callback
this.transport.on("drain", function(seq){
if('function' == typeof self.packetsFn[seq]) {
debug('executing send callback');
self.packetsFn[seq](self.transport);
delete self.packetsFn[seq];
}
});
};

/**
* Sends a message packet.
*
* @param {String} message
* @param {Function} callback
* @return {Socket} for chaining
* @api public
*/

Socket.prototype.send = function (data) {
this.sendPacket('message', data);
Socket.prototype.send = function (data, callback) {
this.sendPacket('message', data, callback);
return this;
};

Expand All @@ -234,10 +260,16 @@ Socket.prototype.send = function (data) {
* @api private
*/

Socket.prototype.sendPacket = function (type, data) {
Socket.prototype.sendPacket = function (type, data, callback) {
if ('closing' != this.readyState) {
debug('sending packet "%s" (%s)', type, data);
this.writeBuffer.push({ type: type, data: data });
this.writeBuffer.push({ seq: this.packetSeq, type: type, data: data });
//add send callback to object
if('undefined' != typeof callback){
this.packetsFn[this.packetSeq] = callback;
}
//increase the packet acks
this.packetSeq++;
this.flush();
}
};
Expand Down
21 changes: 19 additions & 2 deletions lib/transports/polling.js
Expand Up @@ -21,6 +21,8 @@ module.exports = Polling;

function Polling (req) {
Transport.call(this, req);
//save sequence ids state
this.seqIds = [];
};

/**
Expand Down Expand Up @@ -96,7 +98,16 @@ Polling.prototype.onPollRequest = function (req, res) {
req.on('close', onClose);

this.writable = true;
this.emit('drain');
//decide to send empty `drain` or not
if (req.query.sid && this.writable && this.seqIds.length > 0) {
debug("acks received from client, emit drain");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

single quotes

var singleItem = this.seqIds.splice(0,1);
for (var i = 0, l = singleItem[0].length; i < l; ++i) {
this.emit('drain', singleItem[0][i]);
}
} else {
this.emit('drain');
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary that the polling transport reports what sequence id it's flushed? If you think about it, the Socket should know this, as the polling buffer won't keep growing, the writeBuffer at the socket level will.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, Actually any transport has two type of drain now, with sequence id and without sequence id. When the drain emits, the listener get one parameter (sequence id) in callback and then we going to search that is there any callback function for this sequence id or not, if yes, we execute it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure but the transports don't really need to know about sequence ids.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean always emit an empty drain event on polling transports? (Please clarify all transports or just polling)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as Socket goes, it doesn't matter if it's polling or websocket, it just needs to wait on the drain event, for which it should know what the sequence id is.

}

// if we're still writable but had a pending close, trigger an empty send
if (this.writable && this.shouldClose) {
Expand Down Expand Up @@ -195,7 +206,13 @@ Polling.prototype.send = function (packets) {
this.shouldClose();
this.shouldClose = null;
}

var pushTo = [];
for (var i = 0, l = packets.length; i < l; ++i) {
if (typeof packets[i].seq != "undefined" && packets[i].type == "message") {
pushTo.push(packets[i].seq);
}
}
if (pushTo.length > 0) this.seqIds.push(pushTo);
this.write(parser.encodePayload(packets));
};

Expand Down
19 changes: 18 additions & 1 deletion lib/transports/websocket.js
Expand Up @@ -67,15 +67,32 @@ WebSocket.prototype.onData = function (data) {
*/

WebSocket.prototype.send = function (packets) {

//save packet item state in array
var packetItems = [];
for (var i = 0, l = packets.length; i < l; i++) {
var data = parser.encodePacket(packets[i]);
debug('writing', data);
this.writable = false;
var self = this;

//TODO: review this part again
packetItems.push(packets[i]);
this.socket.send(data, function (err){
if (err) return self.onError('write error', err.stack);

var packetSeqId = null;
if(packetItems.length > 0) {
packetSeqId = packetItems[0].seq || null;
packetItems.splice(0,1);
}

self.writable = true;
self.emit('drain');
if(packetSeqId != null) {
self.emit('drain', packetSeqId);
} else {
self.emit('drain');
}
});
}
};
Expand Down
139 changes: 139 additions & 0 deletions test/server.js
Expand Up @@ -389,6 +389,8 @@ describe('server', function () {
});
});



Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unneeded \n's

it('should arrive from server to client (multiple)', function (done) {
var engine = listen({ allowUpgrades: false }, function (port) {
var socket = new eioc.Socket('ws://localhost:%d'.s(port))
Expand Down Expand Up @@ -493,6 +495,143 @@ describe('server', function () {
});
});

describe('send', function(){
describe('callback', function(){
it('should execute when message sent (polling)', function (done) {
var engine = listen({ allowUpgrades: false }, function (port) {
var socket = new eioc.Socket('ws://localhost:%d'.s(port), { transports: ['polling'] }),
i = 0;
engine.on('connection', function (conn) {
conn.send('a', function(transport) {
i++;
});
});
socket.on('open', function () {
socket.on('message', function (msg) {
i++;
});
});

setTimeout(function(){
expect(i).to.be(2);
done();
},10);
});
});

it('should execute when message sent (websocket)', function (done) {
var engine = listen({ allowUpgrades: false }, function (port) {
var socket = new eioc.Socket('ws://localhost:%d'.s(port), { transports: ['websocket'] }),
i = 0;
engine.on('connection', function (conn) {
conn.send('a', function(transport) {
i++;
});
});
socket.on('open', function () {
socket.on('message', function (msg) {
i++;
});
});

setTimeout(function(){
expect(i).to.be(2);
done();
},10);
});
});

it('should execute once for each send', function (done) {
var engine = listen(function (port) {
var socket = new eioc.Socket('ws://localhost:%d'.s(port)),
i = 0,
j = 0;
engine.on('connection', function (conn) {
conn.send('b', function (transport) {
j++;
});

conn.send('a', function (transport) {
i++;
});

});
socket.on('open', function () {
socket.on('message', function (msg) {
if (msg == "a") {
i++;
} else if (msg == "b") {
j++;
}
});
});

setTimeout(function () {
expect(i).to.be(2);
expect(j).to.be(2);
done();
}, 50);
});
});

it('should execute in mutlipart packet', function (done) {
var engine = listen(function (port) {
var socket = new eioc.Socket('ws://localhost:%d'.s(port)),
i = 0;
engine.on('connection', function (conn) {
conn.send('b', function (transport) {
i++;
});

conn.send('a', function(transport) {
i++;
});

});
socket.on('open', function () {
socket.on('message', function (msg) {
i++;
});
});

setTimeout(function () {
expect(i).to.be(4);
done();
}, 50);
});
});

it('should execute in separate message', function (done) {
var engine = listen(function (port) {
var socket = new eioc.Socket('ws://localhost:%d'.s(port), { transports: ['websocket'] }),
i = 0;
engine.on('connection', function (conn) {

conn.send('a', function(transport) {
i++;

conn.send('b', function (transport) {
i++;
});
});
});

socket.on('open', function () {
socket.on('message', function (msg) {
i++;
});
});

setTimeout(function () {
expect(i).to.be(4);
done();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indent

}, 10);
});
});

});
});

describe('upgrade', function () {
it('should upgrade', function (done) {
var engine = listen(function (port) {
Expand Down