Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Starting to work on the RedisSizedQueue

  • Loading branch information...
commit a554f2a31e50f0c66cb661daa8996f4eab76ef11 1 parent 9d737a5
@leehambley authored
View
2  .gitignore
@@ -0,0 +1,2 @@
+data/mongodb
+data/redis
View
12 Gemfile
@@ -0,0 +1,12 @@
+source "https://rubygems.org"
+
+gem 'mongo'
+gem 'bson_ext'
+gem 'redis'
+
+group :test do
+ gem 'debugger'
+ gem 'foreman'
+ gem 'minitest'
+ gem 'turn'
+end
View
36 Gemfile.lock
@@ -0,0 +1,36 @@
+GEM
+ remote: https://rubygems.org/
+ specs:
+ ansi (1.4.3)
+ bson (1.7.0)
+ bson_ext (1.7.0)
+ bson (~> 1.7.0)
+ columnize (0.3.6)
+ debugger (1.2.0)
+ columnize (>= 0.3.1)
+ debugger-linecache (~> 1.1.1)
+ debugger-ruby_core_source (~> 1.1.3)
+ debugger-linecache (1.1.2)
+ debugger-ruby_core_source (>= 1.1.1)
+ debugger-ruby_core_source (1.1.3)
+ foreman (0.56.0)
+ thor (>= 0.13.6)
+ minitest (3.4.0)
+ mongo (1.7.0)
+ bson (~> 1.7.0)
+ redis (3.0.1)
+ thor (0.16.0)
+ turn (0.9.6)
+ ansi
+
+PLATFORMS
+ ruby
+
+DEPENDENCIES
+ bson_ext
+ debugger
+ foreman
+ minitest
+ mongo
+ redis
+ turn
View
2  Procfile
@@ -0,0 +1,2 @@
+mongo: mongod run --config config/mongod.conf --rest
+redis: redis-server config/redis.conf
View
59 README.md
@@ -18,6 +18,11 @@ The original use-case was as pluggable "page queue" and "link queue"
implementations for the Anemone web crawler, of course these general purpose
classes can be used for anything where you might use a normal `Queue`.
+## Supported Classes
+
+ * Queue: Redis, MongoDB
+ * SizedQueue: Redis, MongoDB
+
## Requirements
The queues naturally enough need the Ruby gem for your desired backend, for
@@ -36,6 +41,60 @@ For that reason, you should not consider this portable between Ruby versions,
and obviously enough, you won't have a lot of success trying to connect Perl
or Python to the same queue.
+## Blocking
+
+The blocking reads/writes are achieved with `Thread.pass` and `until ...`
+loops, this is quite possibly too naïve to be widely useful, but I haven't
+personally had any problems, and the tests cover a variety of deadlock cases,
+I think it's save.
+
+## Connections
+
+### Redis
+
+Each queue uses two connections, the `redis` ruby gem blocks, so one cannot do
+this:
+
+ redis = Redis.new
+ t1 = Thread.new { redis.blpop "somekey" }
+ t2 = Thread.new { redis.get "someotherkey" }
+
+The whole adapter is blocked by the `blpop` in the first thread, because of
+that the Redis backed queue class will create two connections to the Redis
+server, the second connection is established the first time something should
+be popped from the queue, it is passed the same connection options as the
+first connection.
+
+## Atomicity
+
+The queue classes rely on atomic operations from the underlying data stores,
+and as such are relatively simple to implement in Ruby land, the backends
+however must support atomic read/write operations.
+
+## Collection Naming
+
+### Redis
+
+The collections for the Redis driver are named using the following scheme:
+
+ {
+ queue_key_name: "persistent-queue-classes:redis:queue:#{self.hash.abs}:queue",
+ waiting_key_name: "persistent-queue-classes:redis:queue:#{self.hash.abs}:waiting",
+ }
+
+The `waiting` key is used to store the number of threads waiting on a Queue,
+this is a standard Redis key that is operated upon with `INCR` and `DECR`.
+
+These can be overridden in the options hash passed to the options hash passed
+to `PersistentQueueClasses::Redis::Queue.new`.
+
+**Note:** When the last item is popped off the queue these keys are
+**REMOVED**. This ensures that owing to the slightly strange naming scheme
+(how useful are the object hash IDs, anyway?) that you do not completely
+pollute the Redis keyspace with random queue names.
+
+### MongoDB
+
## Acknowledgements
Props to Scott Reis for his work on Anemone's `queueadapter` branch which gave
View
9 Rakefile
@@ -0,0 +1,9 @@
+require 'rake/testtask'
+
+Rake::TestTask.new do |t|
+ t.libs << "test"
+ t.test_files = FileList['test/*_test.rb']
+ t.verbose = true
+end
+
+task :default => :test
View
5 config/mongod.conf
@@ -0,0 +1,5 @@
+# Store data in /usr/local/var/mongodb instead of the default /data/db
+dbpath = data/mongodb
+
+# Only accept local connections
+bind_ip = 127.0.0.1
View
11 config/redis.conf
@@ -0,0 +1,11 @@
+daemonize no
+port 6379
+bind 127.0.0.1
+logfile stdout
+loglevel warning
+databases 16
+save 900 1
+save 300 10
+save 60 10000
+dbfilename redis.rdb
+dir ./data/redis/
View
4 lib/persistent-queue-classes.rb
@@ -0,0 +1,4 @@
+require_relative 'persistent-queue-classes/version'
+require_relative 'persistent-queue-classes/mongodb/queue'
+require_relative 'persistent-queue-classes/redis/queue'
+require_relative 'persistent-queue-classes/redis/sized_queue'
View
30 lib/persistent-queue-classes/mongodb/queue.rb
@@ -0,0 +1,30 @@
+begin
+ require 'mongo'
+rescue LoadError
+ warn "To use the `PersistentQueueClasses::Mongo::Queue` please ensure the `mongo` and `bson_ext` gems are installed and on the load path."
+ exit 1
+end
+
+module PersistentQueueClasses
+
+ module MongoDB
+
+ class Queue
+
+ attr_reader :options
+
+ def initialize(options={})
+ @options = default_options.merge(options)
+ end
+
+ private
+
+ def default_options
+ Hash.new
+ end
+
+ end
+
+ end
+
+end
View
84 lib/persistent-queue-classes/redis/queue.rb
@@ -0,0 +1,84 @@
+require 'base64'
+begin
+ require 'redis'
+rescue LoadError
+ warn "To use the `PersistentQueueClasses::Redis::Queue` please ensure the `redis` Gem is installed and on the load path."
+ exit 1
+end
+
+module PersistentQueueClasses
+
+ module Redis
+
+ class Queue
+
+ attr_reader :options, :redis
+
+ def initialize(options={})
+ @options = default_options.merge(options)
+ end
+
+ def length
+ redis.llen(options[:queue_key_name]) || 0
+ end
+ alias :size :length
+
+ def empty?
+ length == 0
+ end
+
+ def num_waiting
+ (redis.get(options[:waiting_key_name]) || 0).to_i
+ end
+
+ def push(object)
+ redis.rpush options[:queue_key_name], Base64.encode64(Marshal.dump(object))
+ end
+ alias :enq :push
+ alias :<< :push
+
+ def pop
+ redis.incr options[:waiting_key_name]
+ key, object = bredis.blpop(options[:queue_key_name])
+ Marshal.load(Base64.decode64(object))
+ ensure
+ redis.decr options[:waiting_key_name]
+ clear if empty?
+ end
+ alias :deq :pop
+ alias :shift :pop
+
+ def clear
+ r = redis.multi do
+ redis.del options[:queue_key_name]
+ redis.del options[:waiting_key_name]
+ end
+ return []
+ end
+
+ private
+
+ def redis
+ @redis ||= begin
+ ::Redis.new(options).tap do |r|
+ r.setnx options[:waiting_key_name], 0
+ end
+ end
+ end
+
+ def bredis
+ @bredis ||= ::Redis.new(options)
+ end
+
+ def default_options
+ {
+ queue_key_name: "persistent-queue-classes:redis:queue:#{self.hash.abs}:queue",
+ waiting_key_name: "persistent-queue-classes:redis:queue:#{self.hash.abs}:waiting",
+ }
+ end
+
+ end
+
+ end
+
+end
View
20 lib/persistent-queue-classes/redis/sized_queue.rb
@@ -0,0 +1,20 @@
+require_relative 'queue'
+
+module PersistentQueueClasses
+
+ module Redis
+
+ class SizedQueue < Queue
+
+ attr_accessor :max
+
+ def initialize(max, options={})
+ @max = max
+ super default_options.merge(options)
+ end
+
+ end
+
+ end
+
+end
View
5 lib/persistent-queue-classes/version.rb
@@ -0,0 +1,5 @@
+module PersistentQueueClasses
+
+ VERSION = '1.0.0'
+
+end
View
15 test/helper.rb
@@ -0,0 +1,15 @@
+$:.unshift(File.dirname(__FILE__))
+
+require 'rubygems'
+require 'debugger'
+require 'bundler/setup'
+require 'turn/autorun'
+require 'persistent-queue-classes'
+
+require_relative 'queue_tests'
+
+Thread.abort_on_exception = true
+
+Turn.config.tap do |t|
+ t.ansi = true
+end
View
21 test/mongodb_queue_test.rb
@@ -0,0 +1,21 @@
+require 'helper'
+
+module PersistentQueueClasses
+
+ module MongoDB
+
+ class QueueTest < MiniTest::Unit::TestCase
+
+ #include QueueTests
+
+ private
+
+ def queue
+ Queue.new
+ end
+
+ end
+
+ end
+
+end
View
107 test/queue_tests.rb
@@ -0,0 +1,107 @@
+module SizedQueueTests
+
+ def test_max_reader
+ assert_equal 2, queue.max
+ end
+
+ # test max
+ # test assigning max
+
+end
+
+module QueueTests
+
+ def setup
+ queue.clear
+ end
+
+ def test_that_it_responds_to_the_same_api_as_the_queue_class
+ # Add Things
+ assert queue.respond_to?(:enq)
+ assert queue.respond_to?(:<<)
+ assert queue.respond_to?(:push)
+
+ # Remove Things
+ assert queue.respond_to?(:deq)
+ assert queue.respond_to?(:pop)
+ assert queue.respond_to?(:shift)
+
+ # Clean Up
+ assert queue.respond_to?(:clear)
+ assert queue.respond_to?(:empty?)
+
+ # Size
+ assert queue.respond_to?(:length)
+ assert queue.respond_to?(:size)
+
+ # Threads Waiting
+ assert queue.respond_to?(:num_waiting)
+ end
+
+ def test_that_size_is_zero_when_no_threads_are_waiting
+ assert_equal 0, queue.size
+ end
+
+ def test_that_length_is_zero_when_no_threads_are_waiting
+ assert_equal 0, queue.length
+ end
+
+ def test_that_num_waiting_is_zero_when_no_threads_are_waiting
+ assert_equal 0, queue.num_waiting
+ end
+
+ def test_queues_are_empty_when_new
+ assert queue.empty?
+ end
+
+ def test_that_clear_returns_nil
+ # It's not clear from the Ruby implementation why `clear` returns
+ # an empty array.
+ assert_equal [], queue.clear
+ end
+
+ def test_that_pop_should_block_when_the_queue_is_empty_increasing_num_waiting_by_one
+ t_pop = Thread.new(queue) do |q|
+ q.pop
+ end
+ Thread.new { sleep 0.1 }.join
+ assert_equal 1, queue.num_waiting
+ ensure
+ t_pop.kill
+ end
+
+ def test_that_pop_should_block_when_the_queue_is_empty_increasing_num_waiting_by_two
+ t1_pop = Thread.new(queue) do |q|
+ q.pop
+ end
+ t2_pop = Thread.new(queue) do |q|
+ q.pop
+ end
+ Thread.new { sleep 0.1 }.join
+ assert_equal 2, queue.num_waiting
+ ensure
+ [t1_pop, t2_pop].map(&:kill)
+ end
+
+ def test_fifo
+ queue.push :first
+ queue.push :second
+ assert_equal :first, queue.pop
+ assert_equal :second, queue.pop
+ end
+
+ def test_empty_after_clear
+ queue.push :test
+ refute queue.empty?
+ queue.clear
+ assert queue.empty?
+ end
+
+ def test_all_ruby_types_survive_the_push_and_pop_roundtrip
+ [ 1, 1.5, "Test String", [:mixed, "array"], :symbol, /regex/ ].each do |object|
+ queue.push object
+ assert_equal object, queue.pop
+ end
+ end
+
+end
View
61 test/redis_queue_test.rb
@@ -0,0 +1,61 @@
+require 'helper'
+
+module RedisQueueTests
+
+ def test_when_the_queue_is_emptied_the_redis_keys_are_deleted
+ waiting_key_name = 'test-waiting-key-is-removed'
+ queue_key_name = 'test-queue-key-is-removed'
+
+ q = PersistentQueueClasses::Redis::Queue.new(waiting_key_name: waiting_key_name, queue_key_name: queue_key_name)
+
+ q.push :test
+ assert redis.exists(queue_key_name)
+ assert redis.exists(waiting_key_name)
+
+ q.pop
+ refute redis.exists(queue_key_name)
+ refute redis.exists(waiting_key_name)
+ end
+
+ private
+
+ def redis
+ @_redis ||= ::Redis.new
+ end
+
+end
+
+module PersistentQueueClasses
+
+ module Redis
+
+ class QueueTest < MiniTest::Unit::TestCase
+
+ include QueueTests
+ include RedisQueueTests
+
+ private
+
+ def queue
+ @_queue ||= Queue.new
+ end
+
+ end
+
+ class SizedQueueTest < MiniTest::Unit::TestCase
+
+ include QueueTests
+ include SizedQueueTests
+ include RedisQueueTests
+
+ private
+
+ def queue
+ @_queue ||= SizedQueue.new(2)
+ end
+
+ end
+
+ end
+
+end
Please sign in to comment.
Something went wrong with that request. Please try again.