Skip to content
This repository has been archived by the owner on Dec 7, 2022. It is now read-only.

Commit

Permalink
use a actual working queue implementation, tested on 1.8.7 only
Browse files Browse the repository at this point in the history
  • Loading branch information
ripienaar committed Jan 11, 2013
1 parent 18af055 commit 484aeaa
Showing 1 changed file with 74 additions and 20 deletions.
94 changes: 74 additions & 20 deletions connector/redis/redis.rb
Expand Up @@ -15,19 +15,58 @@ module Connector
# plugin which means we can give a very solid fast first-user
# experience using this
class Redis<Base
class ThreadsafeQueue
require 'thread'
require 'timeout'

def initialize
@queue = []
@mutex = Mutex.new
@cv = ConditionVariable.new
end

def push(item)
@mutex.synchronize do
@queue.push(item)
end

@cv.signal
end

alias_method :<<, :push

def pop
@mutex.synchronize do
while true
if @queue.empty?
begin
Timeout::timeout(5) do
@cv.wait(@mutex)
end
rescue Timeout::Error
retry
end
else
return @queue.shift
end
end
end
end
end

def initialize
@config = Config.instance
@sources = []
@subscribed = false
end

def connect
@receiver_redis = ::Redis.new
@receiver_queue = Queue.new
@receiver_redis = ::Redis.new :thread_safe => false
@receiver_queue = ThreadsafeQueue.new
@receiver_thread = nil

@sender_redis = ::Redis.new
@sender_queue = Queue.new
@sender_redis = ::Redis.new :thread_safe => false
@sender_queue = ThreadsafeQueue.new
@sender_thread = nil

start_sender_thread
Expand All @@ -54,10 +93,12 @@ def disconnect; end

def receive
msg = @receiver_queue.pop
Message.new(msg.body, msg, :headers => msg.headers)
return Message.new(msg.body, msg, :headers => msg.headers)
end

def publish(msg)
Log.debug("About to publish to the sender queue")

target = {:name => nil, :headers => {}, :name => nil}

if msg.type == :direct_request
Expand Down Expand Up @@ -91,27 +132,36 @@ def publish(msg)

def start_receiver_thread(sources)
@receiver_thread = Thread.new do
@receiver_redis.subscribe(@sources) do |on|
on.subscribe do |channel, subscriptions|
Log.debug("Subscribed to %s" % channel)
end
begin
@receiver_redis.subscribe(@sources) do |on|
on.subscribe do |channel, subscriptions|
Log.debug("Subscribed to %s" % channel)
end

on.message do |channel, message|
begin
decoded_msg = YAML.load(message)
on.message do |channel, message|
begin
Log.debug("Got a message on %s: %s" % [channel, message])
decoded_msg = YAML.load(message)

new_message = OpenStruct.new
new_message.channel = channel
new_message.body = decoded_msg[:body]
new_message.headers = decoded_msg[:headers]
new_message = OpenStruct.new
new_message.channel = channel
new_message.body = decoded_msg[:body]
new_message.headers = decoded_msg[:headers]

@receiver_queue << new_message
rescue => e
Log.warn("Failed to receive from the receiver source: %s: %s" % [e.class, e.to_s])
@receiver_queue << new_message
rescue => e
Log.warn("Failed to receive from the receiver source: %s: %s" % [e.class, e.to_s])
end
end
end
rescue Exception => e
Log.warn("The receiver thread lost connection to the Redis server: %s: %s" % [e.class, e.to_s])
sleep 0.2
retry
end
end

Log.debug("Started receiver_thread %s" % @receiver_thread.inspect)
end

def start_sender_thread
Expand All @@ -121,11 +171,15 @@ def start_sender_thread
msg = @sender_queue.pop
encoded = {:body => msg[:body], :headers => msg[:headers]}.to_yaml
@sender_redis.publish(msg[:channel], encoded)
rescue => e
rescue Exception => e
Log.warn("Could not publish message to redis: %s: %s" % [e.class, e.to_s])
sleep 0.2
retry
end
end
end

Log.debug("Started sender_thread %s" % @sender_thread.inspect)
end
end
end
Expand Down

0 comments on commit 484aeaa

Please sign in to comment.