Permalink
Browse files

rewritten api to allow for bounded concurrency

  • Loading branch information...
1 parent 0f9433e commit 2e2006f4fabc9828e2e25281ff0f073758bf3fc8 @vasc committed Jan 8, 2012
Showing with 108 additions and 78 deletions.
  1. +21 −24 lib/fila.coffee
  2. +1 −1 package.json
  3. +86 −53 test/fila-test.coffee
View
@@ -1,29 +1,26 @@
events = require 'events'
-exports.fila = () ->
- this.self = this
- this.ee = new events.EventEmitter()
- this.tasks = []
- this.results = []
- return this
-
-exports.fila.prototype =
- run: (end) ->
- this.end = end
- this.ee.on 'next', (result) =>
- this.results.push result
- this.next()
- this.next()
+class Fila
+ constructor: (concurrency) ->
+
+ @slots = if concurrency == undefined then 1 else concurrency
+ @ee = new events.EventEmitter()
+ @tasks = []
+
+ @ee.on 'over', () =>
+ @slots+=1
+ @next()
- push: (f) ->
- this.tasks.push f
+ enqueue: (f) ->
+ this.tasks.push f
+ @next()
- next: () ->
- if this.tasks.length > 0
- t = this.tasks.shift()
- t (result) => this.ee.emit 'next', result
- else
- this.end(this.results) unless this.end == undefined
-
+ next: () ->
+ ee = this.ee
+ if this.tasks.length > 0 and this.slots > 0
+ this.slots-=1
+ t = this.tasks.shift()
+ t () => ee.emit 'over'
-
+
+module.exports = Fila
View
@@ -2,7 +2,7 @@
"name": "fila",
"description": "Queue for asynchronous tasks",
"keywords": ["queue", "async"],
- "version": "0.0.1",
+ "version": "0.1.0",
"author": "Vasco Fernandes <vasco.box@gmail.com> (http://vascofernandes.com)",
"repository": { "type": "git", "url": "http://github.com/vasc/fila.git" },
"devDependencies": {
View
@@ -5,76 +5,59 @@ fila = require('../lib/fila')
vows.describe('fila').addBatch(
'Any fila':
- topic: new fila.fila()
+ topic: new fila()
'has an empty task queue,': (f) ->
assert.lengthOf f.tasks, 0
- 'a run function': (f) ->
- assert.isFunction f.run
- 'a push function': (f) ->
- assert.isFunction f.push
- 'and a next function': (f) ->
- assert.isFunction f.next
-
- 'An empty fila':
- topic: new fila.fila()
-
- 'when run':
- topic: (f) ->
- f.run (r) =>
- this.callback null, r
- return
-
- 'produces no results': (f) ->
- assert.lengthOf f, 0
-
+ 'and an enqueue function': (f) ->
+ assert.isFunction f.enqueue
'A fila filled with synchronous functions':
topic: () ->
- f = new fila.fila()
- f.push (next) -> next 1
- f.push (next) -> next 2
- f.push (next) -> next 3
- f
-
- 'holds them': (f) ->
- assert.lengthOf f.tasks, 3
- f.tasks[0] (r) -> assert.equal r, 1
- f.tasks[1] (r) -> assert.equal r, 2
- f.tasks[2] (r) -> assert.equal r, 3
+ callback = this.callback
+ f = new fila()
+ results = []
+ f.enqueue (cb) ->
+ results.push 1
+ cb()
+ f.enqueue (cb) ->
+ results.push 2
+ cb()
+ f.enqueue (cb) ->
+ results.push 3
+ callback null, results
+ return
- 'when run':
- topic: (f) ->
- f.run (r) =>
- this.callback null, r
- return
- 'produces the correct results': (r) ->
- assert.include r, 1
- assert.include r, 2
- assert.include r, 3
-
- 'in the correct order': (r) ->
- assert.deepEqual r, [1,2,3]
+ 'produces the correct results': (r) ->
+ assert.include r, 1
+ assert.include r, 2
+ assert.include r, 3
+
+ 'in the correct order': (r) ->
+ assert.deepEqual r, [1,2,3]
- 'A fila filled with assynchronous functions when run':
+ 'A fila filled with assynchronous functions':
topic: () ->
- f = new fila.fila()
- f.push (next) ->
+ f = new fila()
+ callback = this.callback
+ results = []
+ f.enqueue (cb) ->
setTimeout () ->
- next 1
+ results.push 1
+ cb()
, 300
- f.push (next) ->
+ f.enqueue (cb) ->
setTimeout () ->
- next 2
+ results.push 2
+ cb()
, 150
- f.push (next) ->
+ f.enqueue (cb) ->
setTimeout () ->
- next 3
+ results.push 3
+ callback null, results
, 1
- f.run (r) =>
- this.callback null, r
return
'produces the correct results': (r) ->
@@ -85,4 +68,54 @@ vows.describe('fila').addBatch(
'in the correct order': (r) ->
assert.deepEqual r, [1,2,3]
+ 'A fila with concurrent assynchronous functions when run':
+ topic: () ->
+ f = new fila(3)
+ callback = this.callback
+ results = []
+ f.enqueue (cb) ->
+ setTimeout () ->
+ results.push 1
+ cb()
+ , 300
+ f.enqueue (cb) ->
+ setTimeout () ->
+ results.push 2
+ cb()
+ , 150
+ f.enqueue (cb) ->
+ setTimeout () ->
+ results.push 3
+ cb()
+ , 1
+ f.enqueue (cb) ->
+ setTimeout () ->
+ results.push 4
+ callback null, results
+ , 600
+ f.enqueue (cb) ->
+ setTimeout () ->
+ results.push 5
+ cb()
+ , 300
+ f.enqueue (cb) ->
+ setTimeout () ->
+ results.push 6
+ cb()
+ ,1
+
+ return
+
+ 'produces the correct results': (r) ->
+ assert.include r, 1
+ assert.include r, 2
+ assert.include r, 3
+ assert.include r, 4
+ assert.include r, 5
+ assert.include r, 6
+
+ 'in the expected order': (r) ->
+ assert.deepEqual r, [3, 2, 1, 6, 5, 4]
+
+
).export(module)

0 comments on commit 2e2006f

Please sign in to comment.