Skip to content

Commit

Permalink
added debugging, fixed final timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
Levi Wheatcroft committed May 13, 2016
1 parent 1405620 commit a9fd001
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 45 deletions.
24 changes: 11 additions & 13 deletions example.js
Expand Up @@ -10,30 +10,28 @@ let throttle = new Throttle({
// send max 5 requests every `ratePer` ms
rate: 5,
// send max `rate` requests every 10000 ms
ratePer: 10000,
ratePer: 4000,
// max 2 requests should run concurrently
concurrent: 4
})


_.each(_.range(1, 10), function(iteration) {
var width = 100 + iteration
_.times(10, function(idx) {
request
.get('http://placekitten.com/' + width + '/100')
.use(throttle.plugin('test'))
.get('placekitten.com/100/' + (100 + idx))
.use(throttle.plugin('foo'))
.end(function(err, res) {
console.log(err ? err : 'serial ' + iteration)
console.log(err ? err : 'serial ' + idx)
})
console.log('added ' + iteration)
console.log('added ' + idx)
})
_.each(_.range(1, 15), function(iteration) {
var width = 100 + iteration
_.times(10, function(idx) {
request
.get('http://placekitten.com/' + width + '/100')
.get('placekitten.com/100/' + (100 + idx))
.use(throttle.plugin())
.end(function(err, res) {
console.log(err ? err : 'retrieved ' + iteration)
console.log(err ? err : 'res ' + idx)
})
console.log('added ' + iteration)
console.log('added ' + idx)
})


104 changes: 73 additions & 31 deletions index.js
@@ -1,5 +1,17 @@
'use strict'
const _ = require('lodash')
const _ = require('lodash')
const debug = require('debug')

/**
* ## debugging
* For debuging output to stdout, set debug environment variable a la:
* `DEBUG=superagent-throttle:* node myapp.js`
*/
const dbg = {
fn: debug('superagent-throttle:fn'),
req: debug('superagent-throttle:req'),
timeout: debug('superagent-throttle:timeout')
}

let Throttle

Expand All @@ -14,7 +26,7 @@ let defaults = {
// requests per `ratePer` ms
rate: 40,
// ms per `rate` requests
ratePer: 10000,
ratePer: 40000,
// max concurrent requests
concurrent: 20
}
Expand All @@ -27,6 +39,7 @@ let defaults = {
* @param {object} options - key value options
*/
Throttle = function(options) {
dbg.fn('Throttle constructor')
this.set(_.extend(
// instance properties
{
Expand All @@ -39,10 +52,9 @@ Throttle = function(options) {
defaults,
options
))
this.plugin = _.bind(this.plugin, this)
}



/**
* ## set
* update options on instance
Expand All @@ -57,6 +69,7 @@ Throttle = function(options) {
* @returns null
*/
Throttle.prototype.set = function(options, value) {
dbg.fn('set')
if (_.isString(options) && value) {
options = {}
options[options] = value
Expand All @@ -69,18 +82,17 @@ Throttle.prototype.set = function(options, value) {
* ## next
* checks whether instance has available capacity and calls throttle.send()
*
* @method
* @returns {Boolean}
*/
Throttle.prototype.next = function() {
dbg.fn('next')
let throttle = this
// make requestTimes `throttle.rate` long. Oldest request will be 0th index
if (throttle._requestTimes.length > throttle.rate) {
throttle._requestTimes = _.castArray(_.last(
throttle._requestTimes,
throttle.rate
))
}
throttle._requestTimes = _.slice(
throttle._requestTimes,
throttle.rate * -1
)

if (
// paused
!(throttle.active) ||
Expand All @@ -106,29 +118,51 @@ Throttle.prototype.next = function() {

/**
* ## serial
* updates throttle._serials and throttle._isRateBound
*
* serial subthrottles allow some requests to be serialised, whilst maintaining
* their place in the queue. The _serials structure keeps track of what serial
* queues are waiting for a response.
*
* ```
* throttle._serials = {
* 'example.com/end/point': true,
* 'example.com/another': false
* }
* ```
*
* @param {Request} request superagent request
* @param {Boolean} state new state for serial
*/
Throttle.prototype.serial = function(serial, state) {
Throttle.prototype.serial = function(request, state) {
dbg.fn('serial')
let serials = this._serials
let throttle = this
if (_.isObject(serial)) {
serial = serial.serial
}
if (serial === false) {
if (request.serial === false) {
return
}
if (state === undefined) {
return serials[serial]
return serials[request.serial]
}
if (state === false) {
throttle._isSerialBound = false
}
serials[serial] = state

serials[request.serial] = state
}

/**
* ## _isRateBound
* returns true if throttle is bound by rate
*
* @returns {Boolean}
*/
Throttle.prototype._isRateBound = function() {
return ((Date.now() - this._requestTimes[0]) < this.ratePer)
let throttle = this
dbg.fn('isRateBound')
return (
((Date.now() - throttle._requestTimes[0]) < throttle.ratePer) &&
(throttle._buffer.length > 0)
)
}

/**
Expand All @@ -139,15 +173,12 @@ Throttle.prototype._isRateBound = function() {
* available rate)
* - some request has ended (may have available concurrency)
*
* @param {Object} [buffered] key value object
* @param {Request} buffered.request the superagent request
* @param {arguments} buffered.arguments passed to request.end()
* @param {String} uri
* @param {Request} request the superagent request
* @returns null
*/
Throttle.prototype.cycle = function(request) {
dbg.fn('cycle')
let throttle = this

if (request) {
throttle._buffer.push(request)
}
Expand All @@ -156,32 +187,37 @@ Throttle.prototype.cycle = function(request) {
// fire requests
// throttle.next will return false if there's no capacity or throttle is
// drained
while (throttle.next()) {

}
while (throttle.next()) {}

// if bound by rate, set timeout to reassess later.
if (throttle._isRateBound()) {
let timeout = throttle.ratePer - (Date.now() - throttle._requestTimes[0])
dbg.timeout('set for %sms', timeout)
throttle._timeout = setTimeout(function() {
dbg.timeout('callback')
throttle.cycle()
}, throttle.ratePer - (Date.now() - throttle._requestTimes[0]))
}, timeout)
}
}

/**
* ## send
*
*
* @param {Request} request superagent request
* @returns null
*/
Throttle.prototype.send = function(request) {
dbg.fn('send')
let throttle = this
throttle.serial(request, true)
// attend to the throttle once we get a response
request.on('end', function() {
dbg.req('received')
throttle._current -= 1
throttle.serial(this, false)
throttle.cycle()
})
dbg.req('sent (%s queued)', throttle._buffer.length)
request.throttled.call(request)
throttle._requestTimes.push(Date.now())
throttle._current += 1
Expand All @@ -198,13 +234,16 @@ Throttle.prototype.send = function(request) {
* @returns null
*/
Throttle.prototype.plugin = function(serial) {
dbg.fn('plugin')
let throttle = this
return function(request) {
let patch = function(request) {
dbg.fn('plugin anon')
request.throttle = throttle
request.serial = serial || false
// replace request.end
request.throttled = request.end
request.end = function(callback) {
dbg.fn('patched end')
// the only param passed to .end is the callback, so we can just store
// that on the emitter, and don't need to deal any other arguments
// passed in
Expand All @@ -215,6 +254,9 @@ Throttle.prototype.plugin = function(serial) {
}
return request
}
return _.isObject(serial) ? patch(serial) : patch
}



module.exports = Throttle
2 changes: 1 addition & 1 deletion package.json
Expand Up @@ -24,6 +24,7 @@
},
"homepage": "http://leviwheatcroft.github.io/superagent-throttle/docs/index.js.html",
"dependencies": {
"debug": "^2.2.0",
"lodash": "^4.11.1",
"superagent": "^1.8.3"
},
Expand All @@ -32,7 +33,6 @@
"mocha": "^2.4.5",
"mocha-jshint": "^2.3.1",
"node-readme": "^0.1.9",
"superagent-mocker": "^0.4.0",
"unit.js": "^2.0.0"
}
}

0 comments on commit a9fd001

Please sign in to comment.