Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

clean up logging and interfaces

  • Loading branch information...
commit 142fa6ee6bda8adb456e52cec89d7dd02ca522cd 1 parent 21d9eb4
@josephholsten josephholsten authored
View
5 lib/async_queue.coffee
@@ -0,0 +1,5 @@
+config = require './config'
+
+module.exports = switch config.async_queue
+ when 'redis' then require './async_queue/redis'
+ when 'amqp' then require './async_queue/amqp'
View
5 lib/async_queue/redis.coffee
@@ -0,0 +1,5 @@
+# The only async queue in redis is a fanout queue
+RedisFanout = require '../fanout/redis'
+module.exports = class Redis extends RedisFanout
+ _fanout_name: ->
+ @_queue_name()
View
8 lib/config.coffee
@@ -0,0 +1,8 @@
+module.exports =
+ fanout: 'redis'
+ # fanout: 'amqp'
+ async_queue: 'redis'
+ # async_queue: 'amqp'
+ queue: 'redis'
+ # queue: 'amqp'
+ dictionary: 'redis'
View
5 lib/dictionary.coffee
@@ -1,3 +1,4 @@
-Redis = require './dictionary/redis'
+config = require './config'
-module.exports = Redis
+module.exports = switch config.dictionary
+ when 'redis' then require './dictionary/redis'
View
16 lib/dispatcher.coffee
@@ -2,8 +2,8 @@ consts = require './consts'
Doctor = require './doctor'
AuditLog = require './audit_log'
ControlFanout = require './control_fanout'
-RequestFanout = require './request_fanout'
-ResponseFanout = require './response_fanout'
+RequestQueue = require './request_queue'
+ResponseQueue = require './response_queue'
JobQueue = require './job_queue'
Dictionary = require './dictionary'
DependencyCollection = require './dependency_collection'
@@ -24,8 +24,8 @@ class Dispatcher
@_audit_log = new AuditLog stream: options.audit
{db_index} = options
@_control_fanout = new ControlFanout {db_index}
- @_request_fanout = new RequestFanout {db_index}
- @_response_fanout = new ResponseFanout {db_index}
+ @_request_queue = new RequestQueue {db_index}
+ @_response_queue = new ResponseQueue {db_index}
@_job_queue = new JobQueue {db_index}
@_dict = new Dictionary {db_index}
@_dependencies = new DependencyCollection()
@@ -33,8 +33,8 @@ class Dispatcher
# Subscribe to the `requests` and `responses` channels.
listen: ->
- @_request_fanout.listen (source, keys) => @_requested source, keys
- @_response_fanout.listen (str) => @_responded str
+ @_request_queue.listen (source, keys) => @_requested source, keys
+ @_response_queue.listen (str) => @_responded str
# Send quit signals to the work queues.
quit: ->
@@ -42,8 +42,8 @@ class Dispatcher
@_control_fanout.quit()
finish = =>
@_job_queue.clear()
- @_request_fanout.end()
- @_response_fanout.end()
+ @_request_queue.end()
+ @_response_queue.end()
@_control_fanout.end()
@_dict.end()
@_job_queue.end()
View
6 lib/fanout.coffee
@@ -1,3 +1,5 @@
-Redis = require './fanout/redis'
+config = require './config'
-module.exports = Redis
+module.exports = switch config.fanout
+ when 'redis' then require './fanout/redis'
+ when 'amqp' then require './fanout/amqp'
View
6 lib/fanout/redis.coffee
@@ -11,17 +11,17 @@ module.exports = class RedisFanout
@_channel = _(@_fanout_name()).namespace db_index
publish: (msg) ->
- winston.debug "Redis queue: #{@_channel} <<", message: msg
+ winston.debug "Redis fanout: #{@_channel} <<", message: msg
@_db.publish @_channel, msg
listen: (callback) ->
@_db.on 'message', (ch, msg) =>
- winston.debug "Redis queue: #{@_channel} >>", message: msg
+ winston.debug "Redis fanout: #{@_channel} >>", message: msg
callback msg
@_db.subscribe @_channel
end: ->
- winston.debug "Redis queue: #{@_channel} :: DESTROY"
+ winston.debug "Redis fanout: #{@_channel} :: DESTROY"
@_db.end()
db: -> @_db
View
6 lib/queue.coffee
@@ -1,3 +1,5 @@
-Redis = require './queue/redis'
+config = require './config'
-module.exports = Redis
+module.exports = switch config.queue
+ when 'redis' then require './queue/redis'
+ when 'amqp' then require './queue/amqp'
View
19 lib/queue/redis.coffee
@@ -2,19 +2,30 @@ consts = require '../consts'
db = require '../db'
_ = require 'underscore'
require '../util'
+winston = require 'winston'
module.exports = class RedisQueue
constructor: (options) ->
{db_index} = options
@_db = db db_index
- clear: -> @_db.del @_queue_name()
+ clear: ->
+ winston.debug "Redis queue: #{@_queue_name()} :: CLEAR"
+ @_db.del @_queue_name()
- push: (msg) -> @_db.rpush @_queue_name(), msg
+ push: (msg) ->
+ winston.debug "Redis queue: #{@_queue_name()} <<", message: msg
+ @_db.rpush @_queue_name(), msg
- pop: (callback) -> @_db.blpop @_queue_name(), 0, callback
+ pop: (callback) ->
+ queue = @_queue_name()
+ @_db.blpop queue, 0, (err, [key, message]) =>
+ winston.debug "Redis queue: #{queue} >>", message: message
+ callback err, [key, message]
- end: -> @_db.end()
+ end: ->
+ winston.debug "Redis queue: #{@_queue_name()} :: DESTROY"
+ @_db.end()
db: -> @_db
View
9 lib/request_fanout.coffee → lib/request_queue.coffee
@@ -1,11 +1,8 @@
-Fanout = require './fanout'
+AsyncQueue = require './async_queue'
consts = require './consts'
-db = require './db'
-_ = require 'underscore'
-require './util'
-module.exports = class RequestFanout extends Fanout
- _fanout_name: -> 'requests'
+module.exports = class RequestQueue extends AsyncQueue
+ _queue_name: -> 'requests'
listen: (callback) ->
super (str) ->
View
4 lib/response_fanout.coffee
@@ -1,4 +0,0 @@
-Fanout = require './fanout'
-
-module.exports = class ResponseFanout extends Fanout
- _fanout_name: -> 'responses'
View
4 lib/response_queue.coffee
@@ -0,0 +1,4 @@
+AsyncQueue = require './async_queue'
+
+module.exports = class ResponseQueue extends AsyncQueue
+ _queue_name: -> 'responses'
View
12 lib/work_queue.coffee
@@ -1,8 +1,8 @@
Worker = require './worker'
Dictionary = require './dictionary'
ControlFanout = require './control_fanout'
-RequestFanout = require './request_fanout'
-ResponseFanout = require './response_fanout'
+RequestQueue = require './request_queue'
+ResponseQueue = require './response_queue'
JobQueue = require './job_queue'
events = require 'events'
consts = require './consts'
@@ -16,8 +16,8 @@ class WorkQueue extends events.EventEmitter
constructor: (@options) ->
{db_index} = @options
@worker_dict = new Dictionary {db_index}
- @worker_request_fanout = new RequestFanout {db_index}
- @worker_response_fanout = new ResponseFanout {db_index}
+ @worker_request_queue = new RequestQueue {db_index}
+ @worker_response_queue = new ResponseQueue {db_index}
@runners = {}
@mixins = {}
@_job_queue = new JobQueue {db_index}
@@ -102,8 +102,8 @@ class WorkQueue extends events.EventEmitter
@_job_queue.end()
@_control_fanout.end()
@worker_dict.end()
- @worker_request_fanout.end()
- @worker_response_fanout.end()
+ @worker_request_queue.end()
+ @worker_response_queue.end()
@callback?()
# Clean out the sticky cache
View
8 lib/worker.coffee
@@ -19,8 +19,8 @@ class Worker
@args = args # weird bug in coffeescript: wanted @args... in line above
@dict = @queue.worker_dict
@db = @dict.db()
- @_request_fanout = @queue.worker_request_fanout
- @_response_fanout = @queue.worker_response_fanout
+ @_request_queue = @queue.worker_request_queue
+ @_response_queue = @queue.worker_response_queue
@cache = {}
@saved_keys = {}
@cycle = {}
@@ -128,7 +128,7 @@ class Worker
@emitted_key[key] = true
json = value?.toJSON?() ? value
@dict.set key, JSON.stringify(json)
- @_response_fanout.publish key
+ @_response_queue.publish key
# If we've seen this `@for_reals` before, then blow right past it.
# Otherwise, abort the runner function and start over (after checking
@@ -242,7 +242,7 @@ class Worker
# on a resume key. Once we get that response, try again to fetch the
# dependencies (which should all be present).
request_missing: (keys) ->
- @_request_fanout.request_missing @key, keys
+ @_request_queue.request_missing @key, keys
# The dispatcher said to resume, so go look for the missing values again. If
# we're resuming from a cycle failure, go grab the key.
View
11 test/support/redeye_suite.coffee
@@ -30,7 +30,7 @@
dispatcher = require '../../lib/dispatcher'
redeye = require '../../lib/redeye'
consts = require '../../lib/consts'
-RequestFanout = require '../../lib/request_fanout'
+RequestQueue = require '../../lib/request_queue'
AuditListener = require './audit_listener'
db = require '../../lib/db'
_ = require 'underscore'
@@ -41,6 +41,7 @@ db_index = 4
winston.setLevels winston.config.syslog.levels
winston.level = 'info'
+# winston.level = 'debug'
# Test class for replacing a single expresso test.
class RedeyeTest
@@ -52,7 +53,7 @@ class RedeyeTest
@db_index = ++db_index
@db = db @db_index
@audit = new AuditListener
- @_request_fanout = new RequestFanout db_index: @db_index
+ @_request_queue = new RequestQueue db_index: @db_index
@opts = test_mode: true, db_index: @db_index, audit: @audit
@queue = redeye.queue @opts
@add_workers()
@@ -82,19 +83,19 @@ class RedeyeTest
# Forcefully quit the test
die: ->
@dispatcher.quit()
- @assert.ok false, "Timed out, sad panda"
@finish()
+ @assert.ok false, "Timed out, sad panda"
# Terminate the last redis connection, ending the test
finish: ->
clearTimeout @timeout
@db.end()
- @_request_fanout.end()
+ @_request_queue.end()
# Send a request to the correct `requests` channel
request: (args...) ->
@requested = args.join consts.arg_sep
- @_request_fanout.publish @requested
+ @_request_queue.publish @requested
# Set a redis value, but first convert to JSON
set: (args..., value) ->
Please sign in to comment.
Something went wrong with that request. Please try again.