Permalink
Browse files

FIRST

  • Loading branch information...
James Weston James Weston
James Weston authored and James Weston committed Nov 15, 2012
0 parents commit dc083b85842fc499878f7c66211f920a401a8276
Showing with 194 additions and 0 deletions.
  1. +58 −0 README.md
  2. +116 −0 index.js
  3. +20 −0 test.js
@@ -0,0 +1,58 @@
+Queue It
+========
+ Utility to throttle programs which over utilize the spawn function. Can also be used to throttle functions which need it.
+
+Install
+-------
+```
+npm install queueit
+```
+
+Example
+-------
+
+```
+var QueueIt = require('queueit'),
+ q = new QueueIt( {
+ max_num_processes : 5
+ });
+
+q.start();
+
+for (i = 0; i < 1000; i++) {
+ q.push({
+ command: 'curl',
+ arguments: ['google.com'],
+ timeout: 1000,
+ cb : function (err,data) {
+ if (!err && data && data.toString) console.log(data.toString())
+ }
+ });
+}
+```
+
+Using Queue It with a random function
+
+```
+function random(options, cb) {
+ //do something
+ ...
+
+ ///callback
+ cb(err,data);
+}
+
+var QueueIt = require('queueit'),
+ q = new QueueIt( {
+ max_num_processes : 5,
+ func : random
+ });
+
+q.start();
+
+for (i = 0; i < 1000; i++) {
+ //build options
+ q.push(options,function () { //do something
+ });
+}
+```
116 index.js
@@ -0,0 +1,116 @@
+var spawn = require('child_process').spawn,
+ os = require('os');
+
+var MAX_NUM_PROCESSES = 500,
+ WAIT = 10;
+
+module.exports = QueueIt;
+
+function QueueIt(init) {
+ this.MAX_NUM_PROCESSES = (init && init.max_num_processes) ? init.max_num_processes : MAX_NUM_PROCESSES;
+ this.WAIT = (init && init.wait) ? init.wait : WAIT;
+
+ this.queue = [];
+ this.runningProcesses = [];
+ this.active = false;
+
+ this.func = (init && init.func) ? init.func : false;
+ if (this.func && typeof this.func !== "function") {
+ //not sure how I feel about this...
+ throw "QUEUEIT INIT FUNC MUST BE function";
+ }
+
+ self=this;
+
+ this.go = function () {
+ if (self.queue.length > 0 && self.runningProcesses.length < self.MAX_NUM_PROCESSES) {
+ var next = self.queue.shift(),
+ pid = self.runningProcesses.push(0);
+
+ if (self.func) {
+ self.call(pid,next);
+ }
+ else {
+ self.spawn(next.command, next.arguments, next.timeout, next.cb, pid);
+ }
+
+ self.wait();
+ }
+ else {
+ self.wait();
+ }
+ }
+
+ this.wait = function () {
+ if (self.active) setTimeout(self.go,self.WAIT);
+ }
+
+ this.start = function() {
+ var go = (!self.active);
+ self.active = true;
+ if (go) self.go();
+ }
+
+ this.stop = function() {
+ self.active = false;
+ }
+
+ this.push = function () {
+ if (this.func) {
+ self.queue.push(arguments);
+ }
+ else {
+ // arguments[0] = options = {
+ // command,
+ // arguments,
+ // timeout
+ // }
+ if (!arguments[0].timeout) arguments[0].timeout = 0;
+ self.queue.push(arguments[0]);
+ }
+ }
+
+ function dequeue(pid) {
+ if (self.runningProcesses.length !== 0) self.runningProcesses = self.runningProcesses.slice(0,pid-1).concat(self.runningProcesses.slice(pid+1,self.runningProcesses.length));
+ }
+
+ this.call = function (pid,args) {
+ var cb = args[args.length-1],
+ cbcb = function () {
+ // Dequeue then call callback with random length of arguments
+ dequeue(pid);
+ switch (arguments.length) {
+ case 1 : cb(arguments[0]); break;
+ case 2 : cb(arguments[0],arguments[1]); break;
+ case 3 : cb(arguments[0],arguments[1],arguments[2]); break;
+ case 4 : cb(arguments[0],arguments[1],arguments[2],arguments[3]); break;
+ case 5 : cb(arguments[0],arguments[1],arguments[2],arguments[3], arguments[4]); break;
+ }
+ }
+
+ switch (args.length) {
+ case 1 : self.func(cbcb); break;
+ case 2 : self.func(args[0],cbcb); break;
+ case 3 : self.func(args[0],args[1], cbcb); break;
+ case 4 : self.func(args[0],args[1], args[2], cbcb); break;
+ case 5 : self.func(args[0],args[1], args[2], args[3], cbcb); break;
+ }
+ }
+
+ this.spawn = function (command,arguments,timeout,cb,pid) {
+ var proc = spawn(command,arguments);
+ if (timeout > 0) setTimeout(function () { proc.kill('SIGHUP'); }, timeout)
+ proc.stdout.on('data', function (data) {
+ cb(null,data);
+ });
+
+ proc.stderr.on('data', function (data) {
+ cb(data)
+ });
+
+ proc.on('exit', function (code) {
+ dequeue(pid);
+ cb(null);
+ });
+ }
+}
20 test.js
@@ -0,0 +1,20 @@
+var QueueIt = require('./'),
+ q = new QueueIt( {
+ max_num_processes : 10001,
+ wait: 1
+ });
+
+q.start();
+var responses=1;
+
+for (i = 0; i < 100000; i++) {
+ q.push({
+ command: 'ping',
+ arguments: ['-c 1','8.8.8.8'],
+ timeout: 1000,
+ cb : function (err,data) {
+// if (!err && data) { console.log(responses++); }
+// if (err && err.toString) console.log(err.toString());
+ }
+ });
+}

0 comments on commit dc083b8

Please sign in to comment.