Skip to content
Browse files

added plugin for mongodb

  • Loading branch information...
1 parent 21657bc commit a609a70f47f23223127ab4b4bfbbf8433938639d @twilson63 committed Jan 18, 2012
Showing with 55 additions and 0 deletions.
  1. +54 −0 src/plugins/mongodb.coffee
  2. +1 −0 src/queue.coffee
View
54 src/plugins/mongodb.coffee
@@ -0,0 +1,54 @@
+mongo = require 'mongoskin'
+_ = require 'underscore'
+
+
+module.exports =
+ name: 'q-mongodb'
+ # establish db connection
+ # ---
+ # param: db - Database Connection URL
+ # param: collection_name - Name of Cloudq Collection (Defaults cloudq.jobs)
+ init: (done) ->
+ # Init MongoDb
+ @db = mongo.db(process.env.MONGOSVR or 'localhost:27017/cloudq')
+ @jobs = @db.collection('cloudq.jobs')
+ done()
+
+ attach: (options) ->
+ # queue job
+ # ---
+ # param: name - Name of Queue
+ # param: job - Job Object
+ # param: cb - callback
+ @queueJob = (name, job, cb) ->
+ _.extend job,
+ queue: name
+ queue_state: @QUEUED
+ inserted_at: new Date()
+ @jobs.insert job, cb
+
+ # reserve job for processing
+ # ---
+ # param: name - Name of Queue
+ # param: cb - Callback
+ @reserveJob = (name, cb) ->
+ @jobs.findAndModify(
+ {queue: name, queue_state: @QUEUED }
+ , [['inserted_at', 'ascending']]
+ , {$set: {queue_state: @RESERVED, updated_at: new Date() }}
+ , {new: true }
+ cb
+ )
+
+ # remove job
+ # ---
+ # param: job_id - id of job to remove
+ # param: cb - callback
+ @removeJob = (job_id, cb) -> @jobs.removeById job_id, cb
+
+ # jobs by queue by state
+ # ---
+ # param: cb - callback
+ @groupJobs = (cb) ->
+ @jobs.group ['queue','queue_state'], {}, {"count":0}, "function(obj,prev){ prev.count++; }", true, cb
+
View
1 src/queue.coffee
@@ -1,6 +1,7 @@
broadway = require 'broadway'
queue = new broadway.App()
queue.use(require(process.env.PLUGIN or './plugins/couchdb'))
+#queue.use(require(process.env.PLUGIN or './plugins/mongodb'))
# # Queue
#

0 comments on commit a609a70

Please sign in to comment.
Something went wrong with that request. Please try again.