Permalink
Browse files

make the db, port and host configurable

  • Loading branch information...
1 parent 484aeaa commit ca29a445620e1def5c3251b590bada98341a88ec @ripienaar committed Jan 14, 2013
Showing with 15 additions and 13 deletions.
  1. +15 −13 connector/redis/redis.rb
@@ -14,11 +14,14 @@ module Connector
# We'd also add a registration plugin for it and a discovery
# plugin which means we can give a very solid fast first-user
# experience using this
+ #
+ # Configure it with:
+ #
+ # plugin.redis.host = localhost
+ # plugin.redis.port = 6379
+ # plugin.redis.db = 1
class Redis<Base
class ThreadsafeQueue
- require 'thread'
- require 'timeout'
-
def initialize
@queue = []
@mutex = Mutex.new
@@ -58,14 +61,19 @@ def initialize
@config = Config.instance
@sources = []
@subscribed = false
+ @host = @config.pluginconf.fetch("redis.host", "localhost")
+ @port = Integer(@config.pluginconf.fetch("redis.port", "6379"))
+ @db = Integer(@config.pluginconf.fetch("redis.db", "1"))
end
def connect
- @receiver_redis = ::Redis.new :thread_safe => false
+ redis_opts = {:host => @host, :port => @port, :db => @db}
+
+ @receiver_redis = ::Redis.new(redis_opts)
@receiver_queue = ThreadsafeQueue.new
@receiver_thread = nil
- @sender_redis = ::Redis.new :thread_safe => false
+ @sender_redis = ::Redis.new(redis_opts)
@sender_queue = ThreadsafeQueue.new
@sender_thread = nil
@@ -93,7 +101,7 @@ def disconnect; end
def receive
msg = @receiver_queue.pop
- return Message.new(msg.body, msg, :headers => msg.headers)
+ Message.new(msg[:body], msg, :headers => msg[:headers])
end
def publish(msg)
@@ -141,14 +149,8 @@ def start_receiver_thread(sources)
on.message do |channel, message|
begin
Log.debug("Got a message on %s: %s" % [channel, message])
- decoded_msg = YAML.load(message)
-
- new_message = OpenStruct.new
- new_message.channel = channel
- new_message.body = decoded_msg[:body]
- new_message.headers = decoded_msg[:headers]
- @receiver_queue << new_message
+ @receiver_queue << YAML.load(message)
rescue => e
Log.warn("Failed to receive from the receiver source: %s: %s" % [e.class, e.to_s])
end

0 comments on commit ca29a44

Please sign in to comment.