Permalink
Browse files

Add per-host queues.

  • Loading branch information...
1 parent 4dd2e97 commit e095167a79ccc66e8d2b5ed3c54c47efa49c8aeb @mbostock committed May 24, 2011
Showing with 94 additions and 42 deletions.
  1. +18 −42 lib/mappy/cache.js
  2. +76 −0 lib/mappy/queue.js
View
@@ -1,19 +1,15 @@
var http = require("http"),
- url = require("url"),
- util = require("util");
+ util = require("util"),
+ queue = require("./queue");
-var map = {},
+var cache = {},
head = null,
tail = null,
- size = 512,
- n = 0;
-
-var headers = {
- "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_6_7) AppleWebKit/534.36 (KHTML, like Gecko) Chrome/13.0.767.1 Safari/534.36"
-};
+ size = 0,
+ maxSize = 512;
module.exports = function(key, callback) {
- var value = map[key];
+ var value = cache[key];
// If this value is in the cache…
if (value) {
@@ -37,7 +33,7 @@ module.exports = function(key, callback) {
}
// Otherwise, add the value to the cache.
- value = map[key] = {
+ value = cache[key] = {
key: key,
next: head,
previous: null,
@@ -48,44 +44,24 @@ module.exports = function(key, callback) {
if (head) head.previous = value;
else tail = value;
head = value;
- n++;
+ size++;
- util.log(key);
+ // Flush any extra values.
+ flush();
// Load the requested resource!
- var u = url.parse(key);
- http.get({
- host: u.host,
- port: u.port,
- path: u.pathname + (u.search ? "?" + u.search : ""),
- headers: headers
- }, function(response) {
- var body = new Buffer(+response.headers['content-length'] || 2048),
- offset = 0;
- response
- .on("data", function(chunk) {
- var n = chunk.length + offset;
- if (n > body.length) body.copy(body = new Buffer(1 << Math.ceil(Math.log(n) / Math.LN2)));
- offset += chunk.copy(body, offset);
- })
- .on("end", function() {
- var callbacks = value.callbacks;
- delete value.callbacks; // must be deleted before callback!
- value.value = body.slice(0, offset);
- callbacks.forEach(function(callback) { callback(value.value); });
- });
- }).on("error", function(error) {
- callback(null);
+ queue(key, function(buffer) {
+ var callbacks = value.callbacks;
+ delete value.callbacks; // must be deleted before callback!
+ value.value = buffer;
+ callbacks.forEach(function(callback) { callback(buffer); });
});
-
- flush();
};
-// Flush any extra values.
function flush() {
- for (var value = tail; n > size && value; value = value.previous) {
- n--;
- delete map[value.key];
+ for (var value = tail; size > maxSize && value; value = value.previous) {
+ size--;
+ delete cache[value.key];
if (value.next) value.next.previous = value.previous;
else if (tail = value.previous) tail.next = null;
if (value.previous) value.previous.next = value.next;
View
@@ -0,0 +1,76 @@
+var http = require("http"),
+ url = require("url"),
+ util = require("util");
+
+var hosts = {},
+ maxActive = 4, // per host
+ maxAttempts = 4; // per uri
+
+var headers = {
+ "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_6_7) AppleWebKit/534.36 (KHTML, like Gecko) Chrome/13.0.767.1 Safari/534.36"
+};
+
+module.exports = function(uri, callback) {
+ var uuu = url.parse(uri);
+
+ // Retrieve the host-specific queue.
+ var host = hosts[uuu.host] || (hosts[uuu.host] = {
+ active: 0,
+ queued: []
+ });
+
+ // Process the host's queue, perhaps immediately starting our request.
+ load.attempt = 0;
+ host.queued.push(load);
+ process(host);
+
+ // Issue the HTTP request.
+ function load() {
+ http.get({
+ host: uuu.host,
+ port: uuu.port,
+ path: uuu.pathname + (uuu.search ? "?" + uuu.search : ""),
+ headers: headers
+ }, ready).on("error", error);
+ }
+
+ // Handle the HTTP response.
+ function ready(response) {
+ var b = new Buffer(+response.headers["content-length"] || 2048), i = 0;
+ response.on("data", data).on("end", end);
+
+ // Append each body chunk to our buffer.
+ function data(chunk) {
+ var n = chunk.length + i;
+ if (n > b.length) b.copy(b = new Buffer(1 << Math.ceil(Math.log(n) / Math.LN2)));
+ i += chunk.copy(b, i);
+ }
+
+ // Hooray, callback our available data!
+ function end() {
+ util.log(uri);
+ host.active--;
+ callback(b.slice(0, i));
+ process(host);
+ }
+ }
+
+ // Boo, an error occurred. We should retry, maybe.
+ function error(error) {
+ host.active--;
+ if (++load.attempt < maxAttempts) {
+ util.debug("retry " + uuu.host + " #" + load.attempt + ": " + error);
+ host.queued.push(load);
+ } else {
+ util.debug("abort " + uuu.host + ": " + error);
+ callback(null);
+ }
+ process(host);
+ }
+};
+
+function process(host) {
+ if (host.active >= maxActive || !host.queued.length) return;
+ host.active++;
+ host.queued.pop()();
+}

0 comments on commit e095167

Please sign in to comment.