Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lay groundwork for replaceable cache compressor #48673

160 changes: 13 additions & 147 deletions activesupport/lib/active_support/cache.rb
@@ -1,13 +1,13 @@
# frozen_string_literal: true

require "zlib"
require "active_support/core_ext/array/extract_options"
require "active_support/core_ext/enumerable"
require "active_support/core_ext/module/attribute_accessors"
require "active_support/core_ext/numeric/bytes"
require "active_support/core_ext/object/to_param"
require "active_support/core_ext/object/try"
require "active_support/core_ext/string/inflections"
require_relative "cache/entry"
require_relative "cache/serializer_with_fallback"

module ActiveSupport
Expand Down Expand Up @@ -231,12 +231,14 @@ def retrieve_pool_options(options)
#
# ==== Options
#
# * +:namespace+ - Sets the namespace for the cache. This option is
# especially useful if your application shares a cache with other
# applications.
# * +:coder+ - Replaces the default cache entry serialization mechanism
# with a custom one. The +coder+ must respond to +dump+ and +load+.
# Using a custom coder disables automatic compression.
# [+:namespace+]
# Sets the namespace for the cache. This option is especially useful if
# your application shares a cache with other applications.
#
# [+:coder+]
# Replaces the default serializer for cache entries. +coder+ must
# respond to +dump+ and +load+. Using a custom coder disables automatic
# compression.
#
# Alternatively, you can specify <tt>coder: :message_pack</tt> to use a
# preconfigured coder based on ActiveSupport::MessagePack that supports
Expand All @@ -248,10 +250,11 @@ def retrieve_pool_options(options)
# relevant cache operations, such as #read, #write, and #fetch.
def initialize(options = nil)
@options = options ? normalize_options(options) : {}

@options[:compress] = true unless @options.key?(:compress)
@options[:compress_threshold] = DEFAULT_COMPRESS_LIMIT unless @options.key?(:compress_threshold)
@options[:compress_threshold] ||= DEFAULT_COMPRESS_LIMIT

@coder = @options.delete(:coder) { default_coder } || NullCoder
@coder = @options.delete(:coder) { default_coder } || :passthrough
@coder = Cache::SerializerWithFallback[@coder] if @coder.is_a?(Symbol)
@coder_supports_compression = @coder.respond_to?(:dump_compressed)
end
Expand Down Expand Up @@ -721,7 +724,7 @@ def write_entry(key, entry, **options)
def serialize_entry(entry, **options)
options = merged_options(options)
if @coder_supports_compression && options[:compress]
@coder.dump_compressed(entry, options[:compress_threshold] || DEFAULT_COMPRESS_LIMIT)
@coder.dump_compressed(entry, options[:compress_threshold])
else
@coder.dump(entry)
end
Expand Down Expand Up @@ -983,142 +986,5 @@ def expires_at=(expires_at)
@options[:expires_at] = expires_at
end
end

module NullCoder # :nodoc:
extend self

def dump(entry)
entry
end

def dump_compressed(entry, threshold)
entry.compressed(threshold)
end

def load(payload)
payload
end
end

# This class is used to represent cache entries. Cache entries have a value, an optional
# expiration time, and an optional version. The expiration time is used to support the :race_condition_ttl option
# on the cache. The version is used to support the :version option on the cache for rejecting
# mismatches.
#
# Since cache entries in most instances will be serialized, the internals of this class are highly optimized
# using short instance variable names that are lazily defined.
class Entry # :nodoc:
class << self
def unpack(members)
new(members[0], expires_at: members[1], version: members[2])
end
end

attr_reader :version

# Creates a new cache entry for the specified value. Options supported are
# +:compressed+, +:version+, +:expires_at+ and +:expires_in+.
def initialize(value, compressed: false, version: nil, expires_in: nil, expires_at: nil, **)
@value = value
@version = version
@created_at = 0.0
@expires_in = expires_at&.to_f || expires_in && (expires_in.to_f + Time.now.to_f)
@compressed = true if compressed
end

def value
compressed? ? uncompress(@value) : @value
end

def mismatched?(version)
@version && version && @version != version
end

# Checks if the entry is expired. The +expires_in+ parameter can override
# the value set when the entry was created.
def expired?
@expires_in && @created_at + @expires_in <= Time.now.to_f
end

def expires_at
@expires_in ? @created_at + @expires_in : nil
end

def expires_at=(value)
if value
@expires_in = value.to_f - @created_at
else
@expires_in = nil
end
end

# Returns the size of the cached value. This could be less than
# <tt>value.bytesize</tt> if the data is compressed.
def bytesize
case value
when NilClass
0
when String
@value.bytesize
else
@s ||= Marshal.dump(@value).bytesize
end
end

def compressed? # :nodoc:
defined?(@compressed)
end

def compressed(compress_threshold)
return self if compressed?

case @value
when nil, true, false, Numeric
uncompressed_size = 0
when String
uncompressed_size = @value.bytesize
else
serialized = Marshal.dump(@value)
uncompressed_size = serialized.bytesize
end

if uncompressed_size >= compress_threshold
serialized ||= Marshal.dump(@value)
compressed = Zlib::Deflate.deflate(serialized)

if compressed.bytesize < uncompressed_size
return Entry.new(compressed, compressed: true, expires_at: expires_at, version: version)
end
end
self
end

def local?
false
end

# Duplicates the value in a class. This is used by cache implementations that don't natively
# serialize entries to protect against accidental cache modifications.
def dup_value!
if @value && !compressed? && !(@value.is_a?(Numeric) || @value == true || @value == false)
if @value.is_a?(String)
@value = @value.dup
else
@value = Marshal.load(Marshal.dump(@value))
end
end
end

def pack
members = [value, expires_at, version]
members.pop while !members.empty? && members.last.nil?
members
end

private
def uncompress(value)
Marshal.load(Zlib::Inflate.inflate(value))
end
end
end
end
128 changes: 128 additions & 0 deletions activesupport/lib/active_support/cache/entry.rb
@@ -0,0 +1,128 @@
# frozen_string_literal: true

require "zlib"

module ActiveSupport
module Cache
# This class is used to represent cache entries. Cache entries have a value, an optional
# expiration time, and an optional version. The expiration time is used to support the :race_condition_ttl option
# on the cache. The version is used to support the :version option on the cache for rejecting
# mismatches.
#
# Since cache entries in most instances will be serialized, the internals of this class are highly optimized
# using short instance variable names that are lazily defined.
class Entry # :nodoc:
class << self
def unpack(members)
new(members[0], expires_at: members[1], version: members[2])
end
end

attr_reader :version

# Creates a new cache entry for the specified value. Options supported are
# +:compressed+, +:version+, +:expires_at+ and +:expires_in+.
def initialize(value, compressed: false, version: nil, expires_in: nil, expires_at: nil, **)
@value = value
@version = version
@created_at = 0.0
@expires_in = expires_at&.to_f || expires_in && (expires_in.to_f + Time.now.to_f)
@compressed = true if compressed
end

def value
compressed? ? uncompress(@value) : @value
end

def mismatched?(version)
@version && version && @version != version
end

# Checks if the entry is expired. The +expires_in+ parameter can override
# the value set when the entry was created.
def expired?
@expires_in && @created_at + @expires_in <= Time.now.to_f
end

def expires_at
@expires_in ? @created_at + @expires_in : nil
end

def expires_at=(value)
if value
@expires_in = value.to_f - @created_at
else
@expires_in = nil
end
end

# Returns the size of the cached value. This could be less than
# <tt>value.bytesize</tt> if the data is compressed.
def bytesize
case value
when NilClass
0
when String
@value.bytesize
else
@s ||= Marshal.dump(@value).bytesize
end
end

def compressed? # :nodoc:
defined?(@compressed)
end

def compressed(compress_threshold)
return self if compressed?

case @value
when nil, true, false, Numeric
uncompressed_size = 0
when String
uncompressed_size = @value.bytesize
else
serialized = Marshal.dump(@value)
uncompressed_size = serialized.bytesize
end

if uncompressed_size >= compress_threshold
serialized ||= Marshal.dump(@value)
compressed = Zlib::Deflate.deflate(serialized)

if compressed.bytesize < uncompressed_size
return Entry.new(compressed, compressed: true, expires_at: expires_at, version: version)
end
end
self
end

def local?
false
end

# Duplicates the value in a class. This is used by cache implementations that don't natively
# serialize entries to protect against accidental cache modifications.
def dup_value!
if @value && !compressed? && !(@value.is_a?(Numeric) || @value == true || @value == false)
if @value.is_a?(String)
@value = @value.dup
else
@value = Marshal.load(Marshal.dump(@value))
end
end
end

def pack
members = [value, expires_at, version]
members.pop while !members.empty? && members.last.nil?
members
end

private
def uncompress(value)
Marshal.load(Zlib::Inflate.inflate(value))
end
end
end
end
13 changes: 8 additions & 5 deletions activesupport/lib/active_support/cache/redis_cache_store.rb
Expand Up @@ -10,6 +10,7 @@
end

require "connection_pool"
require "active_support/core_ext/hash/slice"
require "active_support/core_ext/numeric/time"
require "active_support/digest"

Expand Down Expand Up @@ -141,7 +142,12 @@ def build_redis_client(**redis_options)
# cache.fetch('bar', skip_nil: true) { nil }
# cache.exist?('foo') # => true
# cache.exist?('bar') # => false
def initialize(namespace: nil, compress: true, compress_threshold: 1.kilobyte, coder: default_coder, expires_in: nil, race_condition_ttl: nil, error_handler: DEFAULT_ERROR_HANDLER, skip_nil: false, **redis_options)
def initialize(error_handler: DEFAULT_ERROR_HANDLER, **redis_options)
base_options = redis_options.extract!(
:coder, :compress, :compress_threshold,
:namespace, :expires_in, :race_condition_ttl, :skip_nil,
)

if pool_options = self.class.send(:retrieve_pool_options, redis_options)
@redis = ::ConnectionPool.new(pool_options) { self.class.build_redis(**redis_options) }
else
Expand All @@ -152,10 +158,7 @@ def initialize(namespace: nil, compress: true, compress_threshold: 1.kilobyte, c
@error_handler = error_handler
@supports_pipelining = true

super namespace: namespace,
compress: compress, compress_threshold: compress_threshold,
expires_in: expires_in, race_condition_ttl: race_condition_ttl,
coder: coder, skip_nil: skip_nil
super(base_options)
end

def inspect
Expand Down
@@ -1,5 +1,6 @@
# frozen_string_literal: true

require "zlib"
require "active_support/core_ext/kernel/reporting"

module ActiveSupport
Expand Down