Skip to content
This repository has been archived by the owner on Jan 22, 2022. It is now read-only.

add support for connection pooling #25

Merged
merged 3 commits into from Feb 18, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion .travis.yml
Expand Up @@ -7,7 +7,8 @@ rvm:
- rbx-19mode
- jruby-19mode
- jruby-head

services:
- redis-server
matrix:
allow_failures:
- rvm: jruby-head
Expand Down
80 changes: 57 additions & 23 deletions lib/active_support/cache/redis_store.rb
Expand Up @@ -4,6 +4,8 @@
module ActiveSupport
module Cache
class RedisStore < Store
attr_reader :data

# Instantiate the store.
#
# Example:
Expand All @@ -24,10 +26,30 @@ class RedisStore < Store
#
# RedisStore.new "localhost:6379/0", "localhost:6380/0"
# # => instantiate a cluster
#
# RedisStore.new "localhost:6379/0", "localhost:6380/0", pool_size: 5, pool_timeout: 10
# # => use a ConnectionPool
#
# RedisStore.new "localhost:6379/0", "localhost:6380/0",
# pool: ::ConnectionPool.new(size: 1, timeout: 1) { ::Redis::Store::Factory.create("localhost:6379/0") })
# # => supply an existing connection pool (e.g. for use with redis-sentinel or redis-failover)
def initialize(*addresses)
options = addresses.extract_options!
@data = ::Redis::Store::Factory.create(addresses)
super(options)
@options = addresses.extract_options!

@data = if @options[:pool]
raise "pool must be an instance of ConnectionPool" unless @options[:pool].is_a?(ConnectionPool)

@options[:pool]
elsif [:pool_size, :pool_timeout].any? { |key| @options.has_key?(key) }
pool_options = {}
pool_options[:size] = options[:pool_size] if options[:pool_size]
pool_options[:timeout] = options[:pool_timeout] if options[:pool_timeout]
@data = ::ConnectionPool.new(pool_options) { ::Redis::Store::Factory.create(addresses) }
else
::Redis::Store::Factory.create(addresses)
end

super(@options)
end

def write(name, value, options = nil)
Expand All @@ -52,7 +74,9 @@ def delete_matched(matcher, options = nil)
instrument(:delete_matched, matcher.inspect) do
matcher = key_matcher(matcher, options)
begin
!(keys = @data.keys(matcher)).empty? && @data.del(*keys)
with do |store|
!(keys = store.keys(matcher)).empty? && store.del(*keys)
end
rescue Errno::ECONNREFUSED, Redis::CannotConnectError
false
end
Expand All @@ -66,7 +90,7 @@ def delete_matched(matcher, options = nil)
# cache.read_multi "rabbit", "white-rabbit"
# cache.read_multi "rabbit", "white-rabbit", :raw => true
def read_multi(*names)
values = @data.mget(*names)
values = with { |c| c.mget(*names) }
values.map! { |v| v.is_a?(ActiveSupport::Cache::Entry) ? v.value : v }

# Remove the options hash before mapping keys to values
Expand All @@ -82,15 +106,17 @@ def fetch_multi(*names)
options = names.extract_options!
fetched = {}

@data.multi do
fetched = names.inject({}) do |memo, (name, _)|
memo[name] = results.fetch(name) do
value = yield name
write(name, value, options)
value
end
with do |c|
c.multi do
fetched = names.inject({}) do |memo, (name, _)|
memo[name] = results.fetch(name) do
value = yield name
write(name, value, options)
value
end

memo
memo
end
end
end

Expand Down Expand Up @@ -120,7 +146,7 @@ def fetch_multi(*names)
# cache.read "rabbit", :raw => true # => "1"
def increment(key, amount = 1)
instrument(:increment, key, :amount => amount) do
@data.incrby key, amount
with{|c| c.incrby key, amount}
end
end

Expand All @@ -147,18 +173,19 @@ def increment(key, amount = 1)
# cache.read "rabbit", :raw => true # => "-1"
def decrement(key, amount = 1)
instrument(:decrement, key, :amount => amount) do
@data.decrby key, amount
with{|c| c.decrby key, amount}

end
end

def expire(key, ttl)
@data.expire key, ttl
with { |c| c.expire key, ttl }
end

# Clear all the data from the store.
def clear
instrument(:clear, nil, nil) do
@data.flushdb
with(&:flushdb)
end
end

Expand All @@ -170,24 +197,31 @@ def exist?(name, options = nil)
end

def stats
@data.info
with(&:info)
end

def with(&block)
if @data.is_a?(::Redis::Store) || @data.is_a?(::Redis::DistributedStore)
block.call(@data)
else
@data.with(&block)
end
end

# Force client reconnection, useful Unicorn deployed apps.
def reconnect
@data.reconnect
@data.reconnect if @data.respond_to?(:reconnect)
end

protected
def write_entry(key, entry, options)
method = options && options[:unless_exist] ? :setnx : :set
@data.send method, key, entry, options
with { |client| client.send method, key, entry, options }
rescue Errno::ECONNREFUSED, Redis::CannotConnectError
false
end

def read_entry(key, options)
entry = @data.get key, options
entry = with { |c| c.get key, options }
if entry
entry.is_a?(ActiveSupport::Cache::Entry) ? entry : ActiveSupport::Cache::Entry.new(entry)
end
Expand All @@ -201,7 +235,7 @@ def read_entry(key, options)
# It's really needed and use
#
def delete_entry(key, options)
@data.del key
entry = with { |c| c.del key }
rescue Errno::ECONNREFUSED, Redis::CannotConnectError
false
end
Expand Down
1 change: 1 addition & 0 deletions redis-activesupport.gemspec
Expand Up @@ -26,6 +26,7 @@ Gem::Specification.new do |s|
s.add_development_dependency 'bundler', '~> 1.3'
s.add_development_dependency 'mocha', '~> 0.14.0'
s.add_development_dependency 'minitest', '~> 4.2'
s.add_development_dependency 'connection_pool', '~> 1.2.0'
s.add_development_dependency 'redis-store-testing'
end

22 changes: 20 additions & 2 deletions test/active_support/cache/redis_store_test.rb
@@ -1,10 +1,20 @@
require 'test_helper'
require 'ostruct'
require 'connection_pool'

describe ActiveSupport::Cache::RedisStore do
def setup
@store = ActiveSupport::Cache::RedisStore.new
@dstore = ActiveSupport::Cache::RedisStore.new "redis://127.0.0.1:6380/1", "redis://127.0.0.1:6381/1"
@dstore = ActiveSupport::Cache::RedisStore.new "redis://127.0.0.1:6379/5", "redis://127.0.0.1:6379/6"
@pool_store = ActiveSupport::Cache::RedisStore.new("redis://127.0.0.1:6379/2", pool_size: 5, pool_timeout: 10)
@external_pool_store = ActiveSupport::Cache::RedisStore.new(pool: ::ConnectionPool.new(size: 1, timeout: 1) { ::Redis::Store::Factory.create("redis://127.0.0.1:6379/3") })

@pool_store.data.class.must_equal ::ConnectionPool
@pool_store.data.instance_variable_get(:@size).must_equal 5
@external_pool_store.data.class.must_equal ::ConnectionPool
@external_pool_store.data.instance_variable_get(:@size).must_equal 1


@rabbit = OpenStruct.new :name => "bunny"
@white_rabbit = OpenStruct.new :color => "white"

Expand All @@ -15,6 +25,12 @@ def setup
end
end

it "raises an error if :pool isn't a pool" do
assert_raises(RuntimeError, 'pool must be an instance of ConnectionPool') do
ActiveSupport::Cache::RedisStore.new(pool: 'poolio')
end
end

it "namespaces all operations" do
address = "redis://127.0.0.1:6380/1/cache-namespace"
store = ActiveSupport::Cache::RedisStore.new(address)
Expand Down Expand Up @@ -193,7 +209,7 @@ def setup
it "clears the store" do
with_store_management do |store|
store.clear
store.instance_variable_get(:@data).keys("*").flatten.must_be_empty
store.with { |client| client.keys("*") }.flatten.must_be_empty
end
end

Expand Down Expand Up @@ -352,6 +368,8 @@ def instantiate_store(*addresses)
def with_store_management
yield @store
yield @dstore
yield @pool_store
yield @external_pool_store
end

def with_notifications
Expand Down