Skip to content

Commit

Permalink
DebugServer and limiting changes
Browse files Browse the repository at this point in the history
- Adding remote debugging telnet-able server
- If a token hits the rate limit, blacklist it until the next hour
- If we're rate limited by Google, write a lock file and don't
  restart until lock file is cleared
  • Loading branch information
Mike Krieger committed Feb 17, 2012
1 parent 537dec4 commit 18c6a2d
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 45 deletions.
6 changes: 5 additions & 1 deletion exampleConfig.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,16 @@
* password (Google account password)
* source (Google source, like 'com-your-app')
* port (port to bind to)
* address (address to bind to)
* debugServerPort (port to bind for stats / debug server)
* debugServeraddress (address to bind for stats / debug server)
* serverCallbackHost / serverCallbackPort / serverCallbackPath (if specified, will be used
* to send a POST back to a service in order to handle bad tokens)
*
*/

{
port: 8120
port: 8120,
address: "127.0.0.1"
}

198 changes: 154 additions & 44 deletions node2dm.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,21 @@ var dgram = require('dgram')
, querystring = require('querystring')
, emitter = require('events').EventEmitter
, config = require('./config')
, fs = require('fs')
, net = require('net')


function C2DMMessage(deviceToken, collapseKey, notification) {
this.deviceToken = deviceToken;
this.collapseKey = collapseKey;
this.notification = notification;

return this;
}

function C2DMReceiver(config, connection) {

this.server = dgram.createSocket('udp4', function (msg, rinfo) {

var msgParts = msg.toString().match(/^([^:]+):([^:]+):(.*)$/);
util.log(msgParts);
if (!msgParts) {
util.log("Invalid message");
return;
Expand All @@ -28,15 +27,11 @@ function C2DMReceiver(config, connection) {
var collapseKey = msgParts[2];
var notification = msgParts[3];

util.log(token);
util.log(collapseKey);
util.log(notification);
var c2dmMessage = new C2DMMessage(token, collapseKey, notification);
connection.submitMessage(c2dmMessage);
connection.notifyDevice(c2dmMessage);
});
this.server.bind(config.port || 8120);
util.log("server is up");
return this;
}


Expand Down Expand Up @@ -65,7 +60,55 @@ function C2DMConnection(config) {
var retryAfter = 0;
var authInProgress = false;

this.onError = function(err) {
// if we exceed device quota for an ID,
// place token in this group; it will
// get cleared every 60 minutes
this.rateLimitedTokens = {};

// on fail, queue up here
var pendingMessages = [];
var totalMessages = 0;
var totalErrors = 0;
var authTokenTime = null;
var startupTime = Math.round(new Date().getTime() / 1000);

this.requeueMessage = function(message) {
pendingMessages.push(message);
}

this.clearPendingMessages = function() {
var numMessages = pendingMessages.length;
for (var i = 0; i < numMessages; i++) {
var message = pendingMessages.shift();
self.submitMessage(message);
};
}

// clear rate limited every hour
setInterval(function() {
self.rateLimitedTokens = {};
}, 60 * 60 * 1000);

// ensure log-in every 10 seconds
function loginIfNotAuthenticated() {
if (!self.currentAuthorizationToken) {
self.authenticate();
}
}

setInterval(function() {
loginIfNotAuthenticated();
}, 5 * 1000);

this.on('loginComplete', function() {
self.clearPendingMessages();
});

this.on('retryAfterExpired', function() {
self.clearPendingMessages();
});

this.onError = function(message, err) {
var errMessage = err.match(/Error=(.+)$/);
if (!errMessage) {
util.log("Unknown error: " + err);
Expand All @@ -74,11 +117,19 @@ function C2DMConnection(config) {
util.log(googleError);
switch (googleError) {
case "QuotaExceeded":
// back off..
util.log("WARNING: Google Quota Exceeded");
// write a lock file; will require manual intervention
fs.open('./quota.lock', 'w', '0666', function(e, id) {
fs.write(id, 'locked at ' + new Date().toString(), null, 'utf8', function() {
fs.close(id, function() {
process.exit(1);
});
});
});
break;

case "DeviceQuotaExceeded":
// blacklist device
this.rateLimitedTokens[message.deviceToken] = true;
break;

case "InvalidRegistration":
Expand All @@ -90,6 +141,7 @@ function C2DMConnection(config) {
break;

case "MessageTooBig":
util.log("ERROR: message too big");
break;

}
Expand All @@ -98,11 +150,14 @@ function C2DMConnection(config) {

this.sendRequest = function(message) {
if (blockedFromSending) {
setTimeout(function(){
this.submitRequest(message);
}, retryAfter * 1000);
self.requeueMessage(message);
return;
}
if (self.rateLimitedTokens[message.deviceToken]) {
util.log("not sending; this token has been rate limited");
return;
}

var c2dmPostBody = {
registration_id: message.deviceToken,
collapse_key: message.collapseKey,
Expand All @@ -111,36 +166,40 @@ function C2DMConnection(config) {

var stringBody = querystring.stringify(c2dmPostBody);
var requestOptions = {
'host': this.c2dmServerOptions.host,
'path': this.c2dmServerOptions.path,
'host': self.c2dmServerOptions.host,
'path': self.c2dmServerOptions.path,
'method': 'POST',
'headers': {
'Content-Length': stringBody.length,
'Content-Type': 'application/x-www-form-urlencoded'
}
};
requestOptions['headers']['Authorization'] = 'GoogleLogin auth=' + this.currentAuthorizationToken;
requestOptions['headers']['Authorization'] = 'GoogleLogin auth=' + self.currentAuthorizationToken;

var postRequest = https.request(requestOptions, function(response) {
if (response.statusCode == 401) {
// we need to reauthenticate
self.currentAuthorizationToken = null;
// requeue message
self.submitMessage(message);
self.requeueMessage(message);
} else if (response.statusCode == 503) {
retryAfter = parseInt(response.headers['Retry-After'], 10) || 10;
blockedFromSending = true;
self.requeueMessage(message);
setTimeout(function() {
blockedFromSending = false;
blockedFromSending = true;
self.emit('retryAfterExpired');
}, retryAfter * 1000);
} else if (response.statusCode == 200) {
response.setEncoding('utf-8');
var buffer = '';
response.on('data', function(chunk) {
util.log('response: ' + chunk);
var returnedID = chunk.match(/id=/);
buffer += chunk;
});
response.on('end', function(end) {
var returnedID = buffer.match(/id=/);
if (!returnedID) {
self.onError(chunk);
} else {
util.log("message sent successfully!");
self.onError(message, buffer);
}
});
}
Expand All @@ -155,31 +214,23 @@ function C2DMConnection(config) {
}

this.submitMessage = function(message) {
if (this.currentAuthorizationToken && this.currentAuthorizationToken.length) {
this.sendRequest(message);
if (self.currentAuthorizationToken) {
self.sendRequest(message);
} else {
this.authenticate();
this.once('loginComplete', function() {
this.sendRequest(message);
});
self.requeueMessage(message);
}
}

this.notifyDevice = function(message) {
totalMessages++;
self.submitMessage(message);
};

this.authenticate = function() {
if (authInProgress) {
return;
}
if (this.authFails) {
// make this exponential
util.log("Sleeping because of fails: " + this.authFails);
setTimeout(function() {
self.authenticate();
}, this.authFails * 10 * 1000);
return;
}


util.log('auth-ing with google');
authInProgress = true;

var loginBody = {
Expand All @@ -191,7 +242,7 @@ function C2DMConnection(config) {
}
var loginBodyString = querystring.stringify(loginBody);
this.loginOptions['headers']['Content-Length'] = loginBodyString.length;
var loginReq = https.request(this.loginOptions, function(res) {
var loginReq = https.request(self.loginOptions, function(res) {
res.setEncoding('utf-8');
var buffer = '';
res.on('data', function(data) {
Expand All @@ -201,6 +252,7 @@ function C2DMConnection(config) {
var token = buffer.match(/Auth=(.+)[$|\n]/);
if (token) {
self.currentAuthorizationToken = token[1];
authTokenTime = Math.round(new Date().getTime() / 1000);
self.authFails = 0;
self.emit('loginComplete');
} else {
Expand All @@ -217,11 +269,69 @@ function C2DMConnection(config) {
loginReq.write(loginBodyString);
loginReq.end();
};
return this;

this.debugServer = net.createServer(function(stream) {
stream.setEncoding('ascii');

stream.on('data', function(data) {
var commandLine = data.trim().split(" ");
var command = commandLine.shift();
switch (command) {
case "help":
stream.write("Commands: stats authtoken");
break;

case "authtoken":
if (self.currentAuthorizationToken) {
stream.write("token: " + self.currentAuthorizationToken + "\n");
}
break;

case "stats":
var now = Math.round(new Date().getTime() / 1000);
var elapsed = now - startupTime;

var tokenAge = now - authTokenTime;

stream.write("uptime: " + elapsed + "\n");
stream.write("messages_sent: " + totalMessages + "\n");
stream.write("messages_queued: " + pendingMessages.length + "\n");
stream.write("total_errors: " + totalErrors + "\n");
stream.write("rate_limited_tokens: " + Object.keys(self.rateLimitedTokens).length + "\n");
var loggedInStatus = (self.currentAuthorizationToken ? "true" : "false");
stream.write("logged_in_to_c2dm: " + loggedInStatus + "\n");
if (self.currentAuthorizationToken) {
stream.write("token_age: " + tokenAge + "\n");
}

var memoryUsage = process.memoryUsage();
for (var property in memoryUsage) {
stream.write("memory_" + property + ": " + memoryUsage[property] + "\n");
}
break;

default:
stream.write("Invalid command\n");
break;
};
});

});
this.debugServer.listen(config.debugServerPort || 8121);

}

util.inherits(C2DMConnection, emitter);


var connection = new C2DMConnection(config);
var receiver = new C2DMReceiver(config, connection);
// check for a lock file; if it's there,
// don't start until removed
fs.stat('quota.lock', function(err, stats) {
if (!err) {
util.log("Can't start; quota.lock present");
process.exit(1);
}

var connection = new C2DMConnection(config);
var receiver = new C2DMReceiver(config, connection);
});

0 comments on commit 18c6a2d

Please sign in to comment.