From 484aeaa1f32770ccdfac70495b27867405ecf862 Mon Sep 17 00:00:00 2001 From: "R.I.Pienaar" Date: Fri, 11 Jan 2013 17:20:22 +0000 Subject: [PATCH] use a actual working queue implementation, tested on 1.8.7 only --- connector/redis/redis.rb | 94 +++++++++++++++++++++++++++++++--------- 1 file changed, 74 insertions(+), 20 deletions(-) diff --git a/connector/redis/redis.rb b/connector/redis/redis.rb index 2d6a51c..133382c 100644 --- a/connector/redis/redis.rb +++ b/connector/redis/redis.rb @@ -15,6 +15,45 @@ module Connector # plugin which means we can give a very solid fast first-user # experience using this class Redis false + @receiver_queue = ThreadsafeQueue.new @receiver_thread = nil - @sender_redis = ::Redis.new - @sender_queue = Queue.new + @sender_redis = ::Redis.new :thread_safe => false + @sender_queue = ThreadsafeQueue.new @sender_thread = nil start_sender_thread @@ -54,10 +93,12 @@ def disconnect; end def receive msg = @receiver_queue.pop - Message.new(msg.body, msg, :headers => msg.headers) + return Message.new(msg.body, msg, :headers => msg.headers) end def publish(msg) + Log.debug("About to publish to the sender queue") + target = {:name => nil, :headers => {}, :name => nil} if msg.type == :direct_request @@ -91,27 +132,36 @@ def publish(msg) def start_receiver_thread(sources) @receiver_thread = Thread.new do - @receiver_redis.subscribe(@sources) do |on| - on.subscribe do |channel, subscriptions| - Log.debug("Subscribed to %s" % channel) - end + begin + @receiver_redis.subscribe(@sources) do |on| + on.subscribe do |channel, subscriptions| + Log.debug("Subscribed to %s" % channel) + end - on.message do |channel, message| - begin - decoded_msg = YAML.load(message) + on.message do |channel, message| + begin + Log.debug("Got a message on %s: %s" % [channel, message]) + decoded_msg = YAML.load(message) - new_message = OpenStruct.new - new_message.channel = channel - new_message.body = decoded_msg[:body] - new_message.headers = decoded_msg[:headers] + new_message = OpenStruct.new + new_message.channel = channel + new_message.body = decoded_msg[:body] + new_message.headers = decoded_msg[:headers] - @receiver_queue << new_message - rescue => e - Log.warn("Failed to receive from the receiver source: %s: %s" % [e.class, e.to_s]) + @receiver_queue << new_message + rescue => e + Log.warn("Failed to receive from the receiver source: %s: %s" % [e.class, e.to_s]) + end end end + rescue Exception => e + Log.warn("The receiver thread lost connection to the Redis server: %s: %s" % [e.class, e.to_s]) + sleep 0.2 + retry end end + + Log.debug("Started receiver_thread %s" % @receiver_thread.inspect) end def start_sender_thread @@ -121,11 +171,15 @@ def start_sender_thread msg = @sender_queue.pop encoded = {:body => msg[:body], :headers => msg[:headers]}.to_yaml @sender_redis.publish(msg[:channel], encoded) - rescue => e + rescue Exception => e Log.warn("Could not publish message to redis: %s: %s" % [e.class, e.to_s]) + sleep 0.2 + retry end end end + + Log.debug("Started sender_thread %s" % @sender_thread.inspect) end end end