Skip to content

Commit

Permalink
Adding close API.
Browse files Browse the repository at this point in the history
  • Loading branch information
mikeal committed Mar 12, 2012
1 parent 5546e4d commit 58526f9
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 41 deletions.
83 changes: 43 additions & 40 deletions index.js
Expand Up @@ -46,83 +46,86 @@ Task.prototype.promise = function (name, cb) {
return self.promises[name]
}


module.exports = function (dburl) {
var f = follow(dburl)
, c = couch(dburl)
, t = new events.EventEmitter()
, checking = false
;
f.include_docs = true
f.on('change', function (change) {
t.emit('tasked.update')
})
function Tasked (dburl) {
var self = this
self.url = dburl
self.follow = follow(dburl)
self.couch = couch(dburl)
self.updating = false

var interval = setInterval(function () {
self.follow.include_docs = true
self.follow.on('change', function (change) {
self.emit('tasked.update')
})

self.interval = setInterval(function () {
// Every minute force re-check
checking = false
t.emit('tasked.update')
self.updating = false
self.emit('tasked.update')
}, 60 * 1000)

t.close = function () {
clearInterval(interval)
}

t.on('tasked.update', function () {
if (!checking) {
c.design('tasked').view('tasks').query({}, function (e, results) {

self.on('tasked.update', function () {
if (!self.updating) {
self.couch.design('tasked').view('tasks').query({}, function (e, results) {
if (e) return console.error(e)
if (results.rows.length) {
results.rows.forEach(function (row) {
t.emit('tasked.check', row.id)
self.emit('tasked.check', row.id)
})
}
checking = false
self.updating = false
})
}
checking = true
self.updating = true
})
t.on('tasked.check', function (id) {

self.on('tasked.check', function (id) {
function check () {
c.get(id, function (e, doc) {
self.couch.get(id, function (e, doc) {
if (e) return console.error(e)
if (doc.state === 'new') {
doc.state = 'processing'
c.post(doc, function (e, info) {
self.couch.post(doc, function (e, info) {
if (e) return // someone else got it first
doc._rev = info.rev
t.emit('tasked.new', doc)
self.emit('tasked.new', doc)
})
}
})
}
if (t.delay) {

if (self.delay) {
setTimeout(check, Math.random() * 111)
} else {
check()
}
})
t.on('tasked.new', function (doc) {

self.on('tasked.new', function (doc) {
var task = new Task(doc, function (e) {
function update (doc) {
doc.state = 'complete'
doc.results = task.results
doc.errors = task.errors
}

c.update(doc._id, update, function (e, info) {
self.couch.update(doc._id, update, function (e, info) {
doc._rev = info.rev
f.emit('complete', doc)
self.follow.emit('complete', doc)
})
})
task.t = t
t.emit(doc.type, task)
task.t = self
self.emit(doc.type, task)
})

return t
}
util.inherits(Tasked, events.EventEmitter)
Tasked.prototype.close = function () {
clearInterval(this.interval)
this.follow.stop()
}

module.exports = function (dburl) {
return new Tasked(dburl)
}

module.exports.provision = function provision (url, cb) {
Expand Down
3 changes: 2 additions & 1 deletion tests/test-double.js
Expand Up @@ -26,7 +26,8 @@ tasked.provision(dburl, function (e) {
counter++
if (counter === 100) {
console.log('All tests passed.')
process.exit()
t1.close()
t2.close()
}
}

Expand Down

0 comments on commit 58526f9

Please sign in to comment.