Skip to content

Commit

Permalink
Merge pull request #31866 from fatkodima/redis_cache-connection_pool
Browse files Browse the repository at this point in the history
Add support for connection pooling on RedisCacheStore
  • Loading branch information
rafaelfranca committed Feb 14, 2018
1 parent b05e4c4 commit e8df458
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 25 deletions.
2 changes: 1 addition & 1 deletion Gemfile
Expand Up @@ -52,7 +52,7 @@ end
gem "dalli", ">= 2.2.1"
gem "listen", ">= 3.0.5", "< 3.2", require: false
gem "libxml-ruby", platforms: :ruby
gem "connection_pool"
gem "connection_pool", require: false

# for railties app_generator_test
gem "bootsnap", ">= 1.1.0", require: false
Expand Down
17 changes: 17 additions & 0 deletions activesupport/lib/active_support/cache.rb
Expand Up @@ -160,6 +160,23 @@ class Store
attr_reader :silence, :options
alias :silence? :silence

class << self
private
def retrieve_pool_options(options)
{}.tap do |pool_options|
pool_options[:size] = options.delete(:pool_size) if options[:pool_size]
pool_options[:timeout] = options.delete(:pool_timeout) if options[:pool_timeout]
end
end

def ensure_connection_pool_added!
require "connection_pool"
rescue LoadError => e
$stderr.puts "You don't have connection_pool installed in your application. Please add it to your Gemfile and run bundle install"
raise e
end
end

# Creates a new cache. The options will be passed to any write method calls
# except for <tt>:namespace</tt> which can be used to set the global
# namespace for the cache.
Expand Down
13 changes: 2 additions & 11 deletions activesupport/lib/active_support/cache/mem_cache_store.rb
Expand Up @@ -63,21 +63,12 @@ def self.build_mem_cache(*addresses) # :nodoc:
addresses = addresses.flatten
options = addresses.extract_options!
addresses = ["localhost:11211"] if addresses.empty?

pool_options = {}
pool_options[:size] = options[:pool_size] if options[:pool_size]
pool_options[:timeout] = options[:pool_timeout] if options[:pool_timeout]
pool_options = retrieve_pool_options(options)

if pool_options.empty?
Dalli::Client.new(addresses, options)
else
begin
require "connection_pool"
rescue LoadError => e
$stderr.puts "You don't have connection_pool installed in your application. Please add it to your Gemfile and run bundle install"
raise e
end

ensure_connection_pool_added!
ConnectionPool.new(pool_options) { Dalli::Client.new(addresses, options.merge(threadsafe: false)) }
end
end
Expand Down
40 changes: 29 additions & 11 deletions activesupport/lib/active_support/cache/redis_cache_store.rb
Expand Up @@ -21,6 +21,15 @@

module ActiveSupport
module Cache
module ConnectionPoolLike
def with
yield self
end
end

::Redis.include(ConnectionPoolLike)
::Redis::Distributed.include(ConnectionPoolLike)

# Redis cache store.
#
# Deployment note: Take care to use a *dedicated Redis cache* rather
Expand Down Expand Up @@ -173,7 +182,16 @@ def initialize(namespace: nil, compress: true, compress_threshold: 1.kilobyte, e
end

def redis
@redis ||= self.class.build_redis(**redis_options)
@redis ||= begin
pool_options = self.class.send(:retrieve_pool_options, redis_options)

if pool_options.any?
self.class.send(:ensure_connection_pool_added!)
::ConnectionPool.new(pool_options) { self.class.build_redis(**redis_options) }
else
self.class.build_redis(**redis_options)
end
end
end

def inspect
Expand Down Expand Up @@ -216,7 +234,7 @@ def delete_matched(matcher, options = nil)
instrument :delete_matched, matcher do
case matcher
when String
redis.eval DELETE_GLOB_LUA, [], [namespace_key(matcher, options)]
redis.with { |c| c.eval DELETE_GLOB_LUA, [], [namespace_key(matcher, options)] }
else
raise ArgumentError, "Only Redis glob strings are supported: #{matcher.inspect}"
end
Expand All @@ -234,7 +252,7 @@ def delete_matched(matcher, options = nil)
def increment(name, amount = 1, options = nil)
instrument :increment, name, amount: amount do
failsafe :increment do
redis.incrby normalize_key(name, options), amount
redis.with { |c| c.incrby normalize_key(name, options), amount }
end
end
end
Expand All @@ -250,7 +268,7 @@ def increment(name, amount = 1, options = nil)
def decrement(name, amount = 1, options = nil)
instrument :decrement, name, amount: amount do
failsafe :decrement do
redis.decrby normalize_key(name, options), amount
redis.with { |c| c.decrby normalize_key(name, options), amount }
end
end
end
Expand All @@ -272,7 +290,7 @@ def clear(options = nil)
if namespace = merged_options(options)[namespace]
delete_matched "*", namespace: namespace
else
redis.flushdb
redis.with { |c| c.flushdb }
end
end
end
Expand Down Expand Up @@ -303,7 +321,7 @@ def set_redis_capabilities
# Read an entry from the cache.
def read_entry(key, options = nil)
failsafe :read_entry do
deserialize_entry redis.get(key)
deserialize_entry redis.with { |c| c.get(key) }
end
end

Expand All @@ -322,7 +340,7 @@ def read_multi_mget(*names)
keys = names.map { |name| normalize_key(name, options) }

values = failsafe(:read_multi_mget, returning: {}) do
redis.mget(*keys)
redis.with { |c| c.mget(*keys) }
end

names.zip(values).each_with_object({}) do |(name, value), results|
Expand Down Expand Up @@ -354,17 +372,17 @@ def write_entry(key, entry, unless_exist: false, raw: false, expires_in: nil, ra
modifiers[:nx] = unless_exist
modifiers[:px] = (1000 * expires_in.to_f).ceil if expires_in

redis.set key, serialized_entry, modifiers
redis.with { |c| c.set key, serialized_entry, modifiers }
else
redis.set key, serialized_entry
redis.with { |c| c.set key, serialized_entry }
end
end
end

# Delete an entry from the cache.
def delete_entry(key, options)
failsafe :delete_entry, returning: false do
redis.del key
redis.with { |c| c.del key }
end
end

Expand All @@ -373,7 +391,7 @@ def write_multi_entries(entries, expires_in: nil, **options)
if entries.any?
if mset_capable? && expires_in.nil?
failsafe :write_multi_entries do
redis.mapped_mset(serialize_entries(entries, raw: options[:raw]))
redis.with { |c| c.mapped_mset(serialize_entries(entries, raw: options[:raw])) }
end
else
super
Expand Down
Expand Up @@ -6,7 +6,7 @@ def test_connection_pool

emulating_latency do
begin
cache = ActiveSupport::Cache.lookup_store(store, pool_size: 2, pool_timeout: 1)
cache = ActiveSupport::Cache.lookup_store(store, { pool_size: 2, pool_timeout: 1 }.merge(store_options))
cache.clear

threads = []
Expand All @@ -33,7 +33,7 @@ def test_connection_pool
def test_no_connection_pool
emulating_latency do
begin
cache = ActiveSupport::Cache.lookup_store(store)
cache = ActiveSupport::Cache.lookup_store(store, store_options)
cache.clear

threads = []
Expand All @@ -54,4 +54,7 @@ def test_no_connection_pool
end
end
end

private
def store_options; {}; end
end
45 changes: 45 additions & 0 deletions activesupport/test/cache/stores/redis_cache_store_test.rb
Expand Up @@ -5,6 +5,24 @@
require "active_support/cache/redis_cache_store"
require_relative "../behaviors"

driver_name = %w[ ruby hiredis ].include?(ENV["REDIS_DRIVER"]) ? ENV["REDIS_DRIVER"] : "hiredis"
driver = Object.const_get("Redis::Connection::#{driver_name.camelize}")

Redis::Connection.drivers.clear
Redis::Connection.drivers.append(driver)

# Emulates a latency on Redis's back-end for the key latency to facilitate
# connection pool testing.
class SlowRedis < Redis
def get(key, options = {})
if key =~ /latency/
sleep 3
else
super
end
end
end

module ActiveSupport::Cache::RedisCacheStoreTests
DRIVER = %w[ ruby hiredis ].include?(ENV["REDIS_DRIVER"]) ? ENV["REDIS_DRIVER"] : "hiredis"

Expand Down Expand Up @@ -119,6 +137,33 @@ def test_fetch_multi_uses_redis_mget
end
end

class ConnectionPoolBehaviourTest < StoreTest
include ConnectionPoolBehavior

private

def store
:redis_cache_store
end

def emulating_latency
old_redis = Object.send(:remove_const, :Redis)
Object.const_set(:Redis, SlowRedis)

yield
ensure
Object.send(:remove_const, :Redis)
Object.const_set(:Redis, old_redis)
end
end

class RedisDistributedConnectionPoolBehaviourTest < ConnectionPoolBehaviourTest
private
def store_options
{ url: %w[ redis://localhost:6379/0 redis://localhost:6379/0 ] }
end
end

# Separate test class so we can omit the namespace which causes expected,
# appropriate complaints about incompatible string encodings.
class KeyEncodingSafetyTest < StoreTest
Expand Down

0 comments on commit e8df458

Please sign in to comment.