Skip to content
Browse files

Added the Redis Promises proxy + specs

  • Loading branch information...
1 parent 4a3a78b commit 9d0d3a0bfaf507e92ca8d8f1fcf8095d6b0c8270 @mathieuravaux committed May 7, 2012
View
1 lib/redis-auto-batches.rb
@@ -1,4 +1,5 @@
require "redis-auto-batches/promise"
+require "redis-auto-batches/redis_promise_proxy"
require "redis-auto-batches/version"
module RedisAutoBatches
View
173 lib/redis-auto-batches/redis_promise_proxy.rb
@@ -0,0 +1,173 @@
+# TODO: think about and manage the use of the multi / exec commands, so that
+# we have a transparent behavior between a multi and an exec command.
+
+module RedisAutoBatches
+ class RedisPromiseProxy
+ class PromiseWithIndex < RedisAutoBatches::Promise
+ attr_accessor :promise_index
+ def initialize(index, &block)
+ promise_index = index
+ super(&block)
+ end
+
+ # def respond_to?(method)
+ # method.equal?(:promise_index) || super(method)
+ # end
+ end
+
+ MAX_BUFFERED_PROMISES = 1_000
+
+ READ_COMMANDS = ::Set.new(%w[
+ ttl sort
+ randomkey keys srandmember type
+ get mget mapped_mget [] []=
+ exists hexists
+ hget hmget hkeys hgetall hvals hlen
+ lindex llen lrange
+ zscore zcard zcount zrange zrank zrangebyscore zrevrange zrevrangebyscore zrevrank
+ smembers sismember sdiff sunion sinter scard
+ dbsize debug
+ ].map(&:to_sym))
+
+
+ WRITE_COMMANDS = ::Set.new(%w[
+ incr decr incrby decrby hincrby zincrby
+ del expireat getset hdel hmset hset hsetnx info lastsave lpop lpush lrem lset ltrim
+ mapped_hmset move mset msetnx rename
+ renamenx rpop rpoplpush rpush sadd sdiffstore select set setnx
+ sinterstore smove spop srem subscribe sunionstore zadd
+ zinterstore zrem zremrangebyrank zremrangebyscore zunionstore
+ ].map(&:to_sym))
+
+ IMMEDIATE_COMMANDS = Set.new(%w[
+ auth bgrewtriteaof bgsave blpop brpop brpoplpush config debug flushall flushdb monitor
+ persist expire setex psubscribe publish punsubscribe quit save shutdown slaveof unwatch watch
+ ].map(&:to_sym))
+
+ attr_accessor :redis
+ attr_accessor :buffered_promises
+ attr_accessor :in_unit_of_work
+
+ ##
+ # Creates a new proxy that will be usable just like a normal
+ # Redis object, but will buffer Redis commands in transactions
+ # to minimize the number of necessary round-trips
+ #
+ # @example
+ # redis = Statistics::RedisPromiseProxy.new(Redis.new)
+ # user_ids = [<user_1>, <user_2>, <user_3>, <user_4>, ]
+ # friend_counts = user_ids.map { |id| redis.scard("users:#{id}:friendships") }
+ # friend_counts.count # => 4
+ # friend_counts[0] # => 174 # the Redis request is made right here
+ #
+ # @param Redis
+ # @see Redis.new
+ def initialize(redis)
+ @redis = redis
+ @queue_mutex = ::Mutex.new
+ @flush_mutex = ::Mutex.new
+ @buffered_promises = []
+ end
+
+
+ ##
+ # if _command_ is a read command, return a promise of the result of calling this command
+ # else, realize the buffered promises and then call the command, directly returning its result
+ # TODO: maybe we should wrap the result of the command in a promise in any case
+ def method_missing(command, *args, &block)
+ promise_redis_result(command, *args, &block)
+ end
+
+ def promise_redis_result(command, *args, &block)
+ @queue_mutex.synchronize do
+ if IMMEDIATE_COMMANDS.include?(command)
+ # puts "Will flush non-read command called on Redis (#{command.inspect})."
+ flush
+ @redis.send(command, *args, &block)
+ else
+ if outside_unit_of_work?
+ # if Rails.env.test?
+ # # unless caller.any? { |line| line.include?('/rspec/core/') && line.include?('run_before_each') }
+ # # puts "\t#{caller.join("\n\t")}"
+ # raise RuntimeError.new("Accessing to redis outside a unit of work")
+ # # end
+ # end
+ # puts "Forcing the creation of a unit of work..."
+ start_unit_of_work
+ end
+
+ #delay with a promise
+ raise NotImplementedError.new("Passing a block when calling a Redis command is not currently supported.") if block_given?
+ index = @buffered_promises.count
+
+ promise = PromiseWithIndex.new(index) do
+ # puts "Will flush since value accessed for promise ##{index} (#{command}, #{args.inspect})"
+ flush
+ promise.force
+ end
+ @buffered_promises << [promise, command, args]
+ flush if flush_needed?
+ promise
+ end
+ end
+ end
+
+ #TODO: add this to monitor_with_new_relic and log the executions times
+ def flush
+ @flush_mutex.synchronize do
+ return if @buffered_promises.empty?
+
+ results = if @buffered_promises.length == 1
+ (prom, command, args) = @buffered_promises.first
+ # puts "command: #{command}(#{args.inspect})"
+ res = @redis.send(command, *args)
+ [res]
+ else
+ @redis.multi
+ @buffered_promises.each do |promise, command, args|
+ @redis.send(command, *args)
+ end
+ @redis.exec
+ end
+
+ results.each.with_index do |result, index|
+ # puts " ==> #{result}, #{index}"
+ @buffered_promises[index][0].fulfill(result)
+ end
+
+ @buffered_promises = []
+ end
+ end
+
+ # Redis.unit_of_work method like mongoid that will flush the previous unit_of_work and ensure the termination of the current unit_of_work
+ def unit_of_work
+ previous_state = @in_unit_of_work
+ begin
+ start_unit_of_work
+ yield if block_given?
+ ensure
+ end_unit_of_work(previous_state)
+ end
+ end
+
+ def start_unit_of_work
+ flush
+ @in_unit_of_work = true
+ end
+
+ def end_unit_of_work(previous_state)
+ flush
+ @in_unit_of_work = previous_state
+ end
+
+ def outside_unit_of_work?
+ ! @in_unit_of_work
+ end
+
+ # we flush every 1000 promises (at least)
+ def flush_needed?
+ @buffered_promises.length >= MAX_BUFFERED_PROMISES
+ end
+
+ end
+end
View
63 spec/acceptance_spec.rb
@@ -0,0 +1,63 @@
+require 'spec_helper'
+
+describe 'Acceptance criteria' do
+ let(:redis) { Redis.connect }
+ subject { RedisAutoBatches::RedisPromiseProxy.new(redis) }
+
+ before do
+ redis.set("key1", 10)
+ redis.set("key2", 20)
+ redis.set("key3", 30)
+ end
+
+ include RedisMonitoring
+
+ context "when used inside a unit of work" do
+ context "when executing several read Redis commands" do
+ it "does only one round-trip to Redis" do
+ subject.unit_of_work do
+ [ subject.get("key1"),
+ subject.get("key2"),
+ subject.get("key3")
+ ]
+ end
+
+ actual_redis_commands.should == [
+ 'multi',
+ 'get key1',
+ 'get key2',
+ 'get key3',
+ 'exec'
+ ]
+ end
+ end
+
+ context "when issuing read commands after write commands" do
+ def issue_operations
+ subject.unit_of_work do
+ [
+ subject.get("key1"),
+ subject.set("key1", "1000"),
+ subject.get("key1")
+ ]
+ end
+ end
+
+ it "still does only one round-trip to redis" do
+ pending "Threading issues"
+ issue_operations
+
+ commands = actual_redis_commands
+ # puts commands.inspect
+ nb_round_trips_to_redis(commands).should == 1
+ end
+
+ it "reads the newly written value, as expected" do
+ values = issue_operations
+ values.should == %w(10 OK 1000)
+ end
+ end
+
+
+ end
+end
View
191 spec/redis_promise_proxy_spec.rb
@@ -0,0 +1,191 @@
+require 'spec_helper'
+
+describe RedisAutoBatches::RedisPromiseProxy do
+ let(:redis) { stub(:redis) }
+ subject { RedisAutoBatches::RedisPromiseProxy.new(redis) }
+
+ # around(:each) do |example|
+ # subject.unit_of_work do
+ # example.run
+ # redis.should_receive(:multi)
+ # end
+ # end
+
+ def quacks_like_a_promise?(object)
+ object.respond_to?(:__force__) &&
+ object.respond_to?(:__chain__) &&
+ object.respond_to?(:__pending__?) &&
+ object.respond_to?(:__fulfilled__?) &&
+ object.respond_to?(:__failed__?)
+ end
+
+ let(:keys) { %w[ key_1 key_2 key_3 key_4 key_5 ] }
+
+ describe "#get" do
+ it "returns a promise" do
+ subject.stub(:outside_unit_of_work?, false)
+ quacks_like_a_promise?(subject.get("key_1")).should be_true
+ end
+
+ context "when the result is used" do
+ it "hits redis" do
+ subject.unit_of_work do
+ redis.should_receive(:get).with("key_1").and_return("fine")
+ result = subject.get("key_1")
+ result.length.should == 4
+ end
+ end
+ end
+
+ context "when the result isn't used" do
+ it "doesn't hit redis" do
+ subject.stub(:outside_unit_of_work?, false)
+ redis.should_not_receive(:get)
+ subject.get("key_1")
+ end
+ end
+
+ end
+
+ describe "succession of 5 gets" do
+ it "returns 5 promises and fulfill it" do
+ subject.unit_of_work do
+ keys.each do |key|
+ quacks_like_a_promise?(subject.get(key)).should be_true
+ end
+ redis.should_receive(:multi).exactly(1).times.and_return(nil)
+ redis.should_receive(:get).exactly(5).times.and_return(nil)
+ redis.should_receive(:exec).and_return(['1', '2', '3', '4', '5'])
+ end
+ end
+
+ context "when the result is not used" do
+ it "doesn't hit redis" do
+ subject.stub(:outside_unit_of_work?, false)
+ redis.should_not_receive(:multi)
+ redis.should_not_receive(:get)
+ redis.should_not_receive(:exec)
+ keys.each { |key| subject.get(key) }
+ end
+ end
+
+ context "when the result of one of the promises is used" do
+ before do
+ redis.should_receive(:multi).exactly(1).times.and_return(nil)
+ redis.should_receive(:get).exactly(5).times.and_return(nil)
+ redis.should_receive(:exec).and_return(['1', '2', '3', '4', '5'])
+ end
+
+ it "hits redis in a transaction" do
+ subject.unit_of_work do
+ results = keys.map { |key| subject.get(key) }
+ results[3] + results[4]
+ end
+ end
+
+ it "fulfills each promise with the respective correct value" do
+ subject.unit_of_work do
+ results = keys.map { |key| subject.get(key) }
+ results.should == ['1', '2', '3', '4', '5']
+ end
+ end
+ end
+
+ end
+
+ # get and set now behaves exactly the same
+ # describe "#set" do
+ # it "hits redis immediately" do
+ # redis.should_not_receive(:multi)
+ # redis.should_not_receive(:exec)
+ # redis.should_receive(:set).with("key_1", "value_1").and_return('1')
+ # subject.set('key_1', 'value_1')
+ # end
+ #
+ # it "flushes pending buffered reads" do
+ # redis.should_receive(:multi).exactly(1).times.and_return(nil)
+ # redis.should_receive(:get).exactly(5).times.and_return(nil)
+ # redis.should_receive(:exec).and_return(['1', '2', '3', '4', '5'])
+ # redis.should_receive(:set).with("key_1", "value_1").and_return('1')
+ # keys.each { |key| subject.get(key) }
+ # subject.set('key_1', 'value_1')
+ # end
+ #
+ # end
+
+ context "repeated usage" do
+ it "reinitializes correctly its data structures" do
+ redis.should_receive(:multi).exactly(2).times.and_return(nil)
+ redis.should_receive(:get).exactly(5).times.and_return(nil)
+ redis.should_receive(:exec).and_return(['1', '2', '3'], ['4', '5'])
+ subject.unit_of_work do
+ one, two, three = subject.get('key_1'), subject.get('key_2'), subject.get('key_3')
+ one.pending?.should be_true
+ two.pending?.should be_true
+ three.pending?.should be_true
+
+ three.to_i.should == 3
+ one.fulfilled?.should be_true
+ two.fulfilled?.should be_true
+ three.fulfilled?.should be_true
+
+ four, five = subject.get('key_4'), subject.get('key_5')
+ four.pending?.should be_true
+ five.pending?.should be_true
+ four.to_i.should == 4
+ five.fulfilled?.should be_true
+ five.to_i.should == 5
+ end
+
+ end
+ end
+
+ context "with chaining of operations on the promise" do
+ it "stays lazy" do
+ subject.stub(:outside_unit_of_work?, false)
+ redis.should_not_receive(:multi)
+ redis.should_not_receive(:get)
+ subject.get('key_1').chain {|v| v.to_i }
+ end
+
+ it "applies the computation in the expected way" do
+ redis.should_receive(:get).and_return('12')
+ subject.unit_of_work do
+ subject.get('key_1').chain {|v| v.to_i }.should == 12
+ end
+
+ end
+
+ end
+
+ describe "#unit_of_work" do
+ it "flush everything on the start of a new unit of work and restore correctly after" do
+ subject.unit_of_work do
+ subject.get('key_1')
+ redis.should_receive(:get).with("key_1").and_return("fine")
+ subject.unit_of_work do
+ subject.get('key_2')
+ redis.should_receive(:get).with("key_2").and_return("fine")
+ end
+ subject.get('key_3')
+ redis.should_receive(:get).with("key_3").and_return("fine")
+ end
+ end
+ it "execute immediately immediate commands (and flush waiting commands before)" do
+ subject.unit_of_work do
+ redis.should_receive(:expire).with("key_1", 14)
+ subject.expire('key_1', 14)
+ end
+ end
+ it "flush after MAX_BUFFERED_PROMISES" do
+ subject.unit_of_work do
+ redis.should_receive(:multi)
+ redis.should_receive(:get).with("key_1").exactly(RedisAutoBatches::RedisPromiseProxy::MAX_BUFFERED_PROMISES).times.and_return(nil)
+ redis.should_receive(:exec).and_return(["fine"])
+ RedisAutoBatches::RedisPromiseProxy::MAX_BUFFERED_PROMISES.times { subject.get('key_1') }
+ subject.get('key_1')
+ redis.should_receive(:get).with("key_1").exactly(1).times.and_return("fine")
+ end
+ end
+ end
+end
View
80 spec/support/redis_monitoring.rb
@@ -0,0 +1,80 @@
+module RedisMonitoring
+
+ def self.included(base)
+ base.class_eval do
+ before do
+ @called_commands = []
+ start_redis_monitoring
+ end
+
+ after do
+ @verif_thread.kill
+ end
+ end
+ end
+
+ def start_redis_monitoring
+ mutex_for_redis_verification_startup = Mutex.new
+
+ @verif_thread = Thread.new do
+ mutex_for_redis_verification_startup.lock
+ Redis.connect.monitor do |command|
+ if command.include? 'monitor'
+ mutex_for_redis_verification_startup.unlock
+ end
+
+ if command.include? 'ping'
+ # puts "Killing verif thread"
+ Thread.current.kill
+ end
+
+ unless /"monitor"|OK|ping$/ =~ command
+ command_without_timestamp = command.gsub(/^\d+\.\d+ /, '').gsub('"', '')
+ @called_commands << command_without_timestamp
+ end
+ end
+ end
+
+ # let the verif thread acquire the mutex and wait on it while the verif starts up
+ @verif_thread.run
+ mutex_for_redis_verification_startup.lock
+ # puts "Rocknroll !"
+ end
+
+
+
+ def nb_round_trips_to_redis(commands)
+ # puts "\n\nCounting nb_round_trips_to_redis"
+ round_trips = 0
+ in_a_transaction = false
+
+ # puts commands.inspect
+ commands.each do |command|
+ # puts command.inspect
+ if command == 'multi'
+ round_trips += 1
+ in_a_transaction = true
+ elsif command == 'exec'
+ in_a_transaction = false
+ elsif !in_a_transaction
+ round_trips += 1
+ end
+ end
+
+ # puts "Counted #{round_trips} round_trips !\n\n\n"
+ round_trips
+ end
+
+
+ def actual_redis_commands
+ # wait for the redis monitor thread to receive feedback on the sent commands
+ # puts "sending a ping ! #{@called_commands.inspect}"
+
+ redis.ping
+
+ @verif_thread.join
+
+ @called_commands
+ end
+
+end

0 comments on commit 9d0d3a0

Please sign in to comment.
Something went wrong with that request. Please try again.