Permalink
Browse files

Almost working

  • Loading branch information...
1 parent 62d09bb commit 7bd674dd9b76ca721b2cb4c504e0da16c5f746f2 @leehambley committed Oct 3, 2012
View
@@ -1,2 +1,2 @@
-data/mongodb
+data/mongo
data/redis
View
@@ -71,6 +71,15 @@ The queue classes rely on atomic operations from the underlying data stores,
and as such are relatively simple to implement in Ruby land, the backends
however must support atomic read/write operations.
+## Sharing Queues Between Threads
+
+The queues memoize their internal connection/collection variables, etc - in
+certain race situations these can cause problems, because of that we use the
+`MonitorMixin` from Ruby 1.9 to synchonrize memoization between threads.
+
+Here's an example
+
+
## Collection Naming
### Redis
View
@@ -1,5 +1,5 @@
# Store data in /usr/local/var/mongodb instead of the default /data/db
-dbpath = data/mongodb
+dbpath = data/mongo
# Only accept local connections
bind_ip = 127.0.0.1
@@ -1,3 +1,21 @@
+module PersistentQueueClasses
+
+ module SharedQueueBehaviour
+
+ attr_accessor :options
+
+ def encode_object(object)
+ Base64.encode64(Marshal.dump(object))
+ end
+
+ def decode_object(string)
+ Marshal.load(Base64.decode64(string))
+ end
+
+ end
+
+end
+
require_relative 'persistent-queue-classes/version'
require_relative 'persistent-queue-classes/mongodb/queue'
require_relative 'persistent-queue-classes/redis/queue'
@@ -11,16 +11,80 @@ module MongoDB
class Queue
- attr_reader :options
+ include PersistentQueueClasses::SharedQueueBehaviour
def initialize(options={})
@options = default_options.merge(options)
end
+ def clear
+ connection.drop_database options[:database_name]
+ [@collection, @waiting_collection, @connection, @database].map { |t| t = nil }
+ []
+ end
+
+ def pop
+ increment_waiting do
+ until _r = collection.find_and_modify({ sort: {_id: Mongo::ASCENDING}, remove: true})
+ Thread.stop
+ end
+ decode_object(_r["payload"])
+ end
+ end
+
+ def push(object)
+ collection.insert payload: encode_object(object)
+ end
+
+ def num_waiting
+ waiting_collection.find(_id: 'WAITING').first["value"].to_i
+ end
+
+ def length
+ collection.size
+ end
+ alias :size :length
+
+ def empty?
+ length == 0
+ end
+
private
+ def increment_waiting &block
+ waiting_collection.update({_id: 'WAITING'}, { "$inc" => { "value" => 1 } })
+ yield
+ ensure
+ waiting_collection.update({_id: 'WAITING'}, { "$inc" => { "value" => -1 } })
+ end
+
+ def collection
+ @collection ||= database[options[:queue_collection_name]].tap do |c|
+ c.ensure_index :_id
+ end
+ end
+
+ def waiting_collection
+ @waiting_collection ||= database[options[:waiting_collection_name]].tap do |wc|
+ wc.save({_id: :WAITING, value: 0})
+ end
+ end
+
+ def connection
+ @connection ||= Mongo::Connection.new options
+ end
+
+ def database
+ @database ||= connection[options[:database_name]]
+ end
+
def default_options
- Hash.new
+ {
+ database_name: "persistent-queue-classes:mongodb:#{self.hash.abs}",
+ queue_collection_name: "queue",
+ waiting_collection_name: "waiting",
+ safe: true
+ }
end
end
@@ -12,7 +12,7 @@ module Redis
class Queue
- attr_reader :options, :redis
+ include PersistentQueueClasses::SharedQueueBehaviour
def initialize(options={})
@options = default_options.merge(options)
@@ -31,16 +31,16 @@ def num_waiting
(redis.get(options[:waiting_key_name]) || 0).to_i
end
- def push(object)
- redis.rpush options[:queue_key_name], Base64.encode64(Marshal.dump(object))
+ def push(object, non_blocking=false)
+ redis.rpush options[:queue_key_name], encode_object(object)
end
alias :enq :push
alias :<< :push
def pop
redis.incr options[:waiting_key_name]
key, object = bredis.blpop(options[:queue_key_name])
- Marshal.load(Base64.decode64(object))
+ decode_object(object)
ensure
redis.decr options[:waiting_key_name]
clear if empty?
@@ -56,8 +56,6 @@ def clear
return []
end
- private
-
def redis
@redis ||= begin
::Redis.new(options).tap do |r|
@@ -66,6 +64,8 @@ def redis
end
end
+ private
+
def bredis
@bredis ||= ::Redis.new(options)
end
@@ -13,6 +13,14 @@ def initialize(max, options={})
super default_options.merge(options)
end
+ def push(obj)
+ redis.incr options[:waiting_key_name]
+ Thread.pass until length < max if length == max
+ bredis.rpush options[:queue_key_name], encode_object(obj)
+ ensure
+ redis.decr options[:waiting_key_name]
+ end
+
end
end
@@ -6,7 +6,7 @@ module MongoDB
class QueueTest < MiniTest::Unit::TestCase
- #include QueueTests
+ include QueueTests
private
View
@@ -1,11 +1,37 @@
+module RedisThreadRacingTests
+
+ def test_thread_racing_memoization
+ threads = 25.times.collect do
+ Thread.new(queue) { |q| Thread.current[:redis] = q.redis }
+ end
+ threads.map(&:join)
+ assert_equal 1, threads.collect { |t| t[:redis].object_id }.uniq.size
+ end
+
+end
+
module SizedQueueTests
def test_max_reader
assert_equal 2, queue.max
end
- # test max
- # test assigning max
+ def test_max_writer
+ queue.max = 15
+ assert_equal 15, queue.max
+ end
+
+ def test_push_when_full_will_block
+ queue.max = 1
+ queue.push(:something)
+ t1 = Thread.new(queue) do |q|
+ q.push(:something_else)
+ end
+ Thread.new { sleep 0.1 }.join
+ assert 1, queue.num_waiting
+ ensure
+ t1.kill
+ end
end
@@ -64,20 +90,24 @@ def test_that_pop_should_block_when_the_queue_is_empty_increasing_num_waiting_by
t_pop = Thread.new(queue) do |q|
q.pop
end
- Thread.new { sleep 0.1 }.join
+ Thread.new { sleep 0.5 }.join
assert_equal 1, queue.num_waiting
ensure
t_pop.kill
end
+ def test_pop_no_block
+ skip "Not Implemented Yet"
+ end
+
def test_that_pop_should_block_when_the_queue_is_empty_increasing_num_waiting_by_two
t1_pop = Thread.new(queue) do |q|
q.pop
end
t2_pop = Thread.new(queue) do |q|
q.pop
end
- Thread.new { sleep 0.1 }.join
+ Thread.new { sleep 0.5 }.join
assert_equal 2, queue.num_waiting
ensure
[t1_pop, t2_pop].map(&:kill)
@@ -33,6 +33,7 @@ class QueueTest < MiniTest::Unit::TestCase
include QueueTests
include RedisQueueTests
+ include RedisThreadRacingTests
private
@@ -46,7 +47,7 @@ class SizedQueueTest < MiniTest::Unit::TestCase
include QueueTests
include SizedQueueTests
- include RedisQueueTests
+ include RedisThreadRacingTests
private

0 comments on commit 7bd674d

Please sign in to comment.