Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
tree: 7bd674dd9b
Fetching contributors…

Octocat-spinner-32-eaf2f5

Cannot retrieve contributors at this time

file 85 lines (66 sloc) 1.806 kb
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
require 'base64'
begin
  require 'redis'
rescue LoadError
  warn "To use the `PersistentQueueClasses::Redis::Queue` please ensure the `redis` Gem is installed and on the load path."
  exit 1
end

module PersistentQueueClasses

  module Redis

    class Queue

      include PersistentQueueClasses::SharedQueueBehaviour

      def initialize(options={})
        @options = default_options.merge(options)
      end

      def length
        redis.llen(options[:queue_key_name]) || 0
      end
      alias :size :length

      def empty?
        length == 0
      end

      def num_waiting
        (redis.get(options[:waiting_key_name]) || 0).to_i
      end

      def push(object, non_blocking=false)
        redis.rpush options[:queue_key_name], encode_object(object)
      end
      alias :enq :push
      alias :<< :push

      def pop
        redis.incr options[:waiting_key_name]
        key, object = bredis.blpop(options[:queue_key_name])
        decode_object(object)
      ensure
        redis.decr options[:waiting_key_name]
        clear if empty?
      end
      alias :deq :pop
      alias :shift :pop

      def clear
        r = redis.multi do
          redis.del options[:queue_key_name]
          redis.del options[:waiting_key_name]
        end
        return []
      end

      def redis
        @redis ||= begin
          ::Redis.new(options).tap do |r|
            r.setnx options[:waiting_key_name], 0
          end
        end
      end

      private

      def bredis
        @bredis ||= ::Redis.new(options)
      end

      def default_options
        {
          queue_key_name: "persistent-queue-classes:redis:queue:#{self.hash.abs}:queue",
          waiting_key_name: "persistent-queue-classes:redis:queue:#{self.hash.abs}:waiting",
        }
      end

    end

  end

end
Something went wrong with that request. Please try again.