Skip to content

Commit

Permalink
Merge pull request #48673 from jonathanhefner/cache-anticipate-replac…
Browse files Browse the repository at this point in the history
…eable-compressor

Lay groundwork for replaceable cache compressor
  • Loading branch information
jonathanhefner committed Jul 5, 2023
2 parents 8bf2160 + 04bca8b commit 1d4c288
Show file tree
Hide file tree
Showing 6 changed files with 250 additions and 207 deletions.
160 changes: 13 additions & 147 deletions activesupport/lib/active_support/cache.rb
@@ -1,13 +1,13 @@
# frozen_string_literal: true

require "zlib"
require "active_support/core_ext/array/extract_options"
require "active_support/core_ext/enumerable"
require "active_support/core_ext/module/attribute_accessors"
require "active_support/core_ext/numeric/bytes"
require "active_support/core_ext/object/to_param"
require "active_support/core_ext/object/try"
require "active_support/core_ext/string/inflections"
require_relative "cache/entry"
require_relative "cache/serializer_with_fallback"

module ActiveSupport
Expand Down Expand Up @@ -231,12 +231,14 @@ def retrieve_pool_options(options)
#
# ==== Options
#
# * +:namespace+ - Sets the namespace for the cache. This option is
# especially useful if your application shares a cache with other
# applications.
# * +:coder+ - Replaces the default cache entry serialization mechanism
# with a custom one. The +coder+ must respond to +dump+ and +load+.
# Using a custom coder disables automatic compression.
# [+:namespace+]
# Sets the namespace for the cache. This option is especially useful if
# your application shares a cache with other applications.
#
# [+:coder+]
# Replaces the default serializer for cache entries. +coder+ must
# respond to +dump+ and +load+. Using a custom coder disables automatic
# compression.
#
# Alternatively, you can specify <tt>coder: :message_pack</tt> to use a
# preconfigured coder based on ActiveSupport::MessagePack that supports
Expand All @@ -248,10 +250,11 @@ def retrieve_pool_options(options)
# relevant cache operations, such as #read, #write, and #fetch.
def initialize(options = nil)
@options = options ? normalize_options(options) : {}

@options[:compress] = true unless @options.key?(:compress)
@options[:compress_threshold] = DEFAULT_COMPRESS_LIMIT unless @options.key?(:compress_threshold)
@options[:compress_threshold] ||= DEFAULT_COMPRESS_LIMIT

@coder = @options.delete(:coder) { default_coder } || NullCoder
@coder = @options.delete(:coder) { default_coder } || :passthrough
@coder = Cache::SerializerWithFallback[@coder] if @coder.is_a?(Symbol)
@coder_supports_compression = @coder.respond_to?(:dump_compressed)
end
Expand Down Expand Up @@ -721,7 +724,7 @@ def write_entry(key, entry, **options)
def serialize_entry(entry, **options)
options = merged_options(options)
if @coder_supports_compression && options[:compress]
@coder.dump_compressed(entry, options[:compress_threshold] || DEFAULT_COMPRESS_LIMIT)
@coder.dump_compressed(entry, options[:compress_threshold])
else
@coder.dump(entry)
end
Expand Down Expand Up @@ -983,142 +986,5 @@ def expires_at=(expires_at)
@options[:expires_at] = expires_at
end
end

module NullCoder # :nodoc:
extend self

def dump(entry)
entry
end

def dump_compressed(entry, threshold)
entry.compressed(threshold)
end

def load(payload)
payload
end
end

# This class is used to represent cache entries. Cache entries have a value, an optional
# expiration time, and an optional version. The expiration time is used to support the :race_condition_ttl option
# on the cache. The version is used to support the :version option on the cache for rejecting
# mismatches.
#
# Since cache entries in most instances will be serialized, the internals of this class are highly optimized
# using short instance variable names that are lazily defined.
class Entry # :nodoc:
class << self
def unpack(members)
new(members[0], expires_at: members[1], version: members[2])
end
end

attr_reader :version

# Creates a new cache entry for the specified value. Options supported are
# +:compressed+, +:version+, +:expires_at+ and +:expires_in+.
def initialize(value, compressed: false, version: nil, expires_in: nil, expires_at: nil, **)
@value = value
@version = version
@created_at = 0.0
@expires_in = expires_at&.to_f || expires_in && (expires_in.to_f + Time.now.to_f)
@compressed = true if compressed
end

def value
compressed? ? uncompress(@value) : @value
end

def mismatched?(version)
@version && version && @version != version
end

# Checks if the entry is expired. The +expires_in+ parameter can override
# the value set when the entry was created.
def expired?
@expires_in && @created_at + @expires_in <= Time.now.to_f
end

def expires_at
@expires_in ? @created_at + @expires_in : nil
end

def expires_at=(value)
if value
@expires_in = value.to_f - @created_at
else
@expires_in = nil
end
end

# Returns the size of the cached value. This could be less than
# <tt>value.bytesize</tt> if the data is compressed.
def bytesize
case value
when NilClass
0
when String
@value.bytesize
else
@s ||= Marshal.dump(@value).bytesize
end
end

def compressed? # :nodoc:
defined?(@compressed)
end

def compressed(compress_threshold)
return self if compressed?

case @value
when nil, true, false, Numeric
uncompressed_size = 0
when String
uncompressed_size = @value.bytesize
else
serialized = Marshal.dump(@value)
uncompressed_size = serialized.bytesize
end

if uncompressed_size >= compress_threshold
serialized ||= Marshal.dump(@value)
compressed = Zlib::Deflate.deflate(serialized)

if compressed.bytesize < uncompressed_size
return Entry.new(compressed, compressed: true, expires_at: expires_at, version: version)
end
end
self
end

def local?
false
end

# Duplicates the value in a class. This is used by cache implementations that don't natively
# serialize entries to protect against accidental cache modifications.
def dup_value!
if @value && !compressed? && !(@value.is_a?(Numeric) || @value == true || @value == false)
if @value.is_a?(String)
@value = @value.dup
else
@value = Marshal.load(Marshal.dump(@value))
end
end
end

def pack
members = [value, expires_at, version]
members.pop while !members.empty? && members.last.nil?
members
end

private
def uncompress(value)
Marshal.load(Zlib::Inflate.inflate(value))
end
end
end
end
128 changes: 128 additions & 0 deletions activesupport/lib/active_support/cache/entry.rb
@@ -0,0 +1,128 @@
# frozen_string_literal: true

require "zlib"

module ActiveSupport
module Cache
# This class is used to represent cache entries. Cache entries have a value, an optional
# expiration time, and an optional version. The expiration time is used to support the :race_condition_ttl option
# on the cache. The version is used to support the :version option on the cache for rejecting
# mismatches.
#
# Since cache entries in most instances will be serialized, the internals of this class are highly optimized
# using short instance variable names that are lazily defined.
class Entry # :nodoc:
class << self
def unpack(members)
new(members[0], expires_at: members[1], version: members[2])
end
end

attr_reader :version

# Creates a new cache entry for the specified value. Options supported are
# +:compressed+, +:version+, +:expires_at+ and +:expires_in+.
def initialize(value, compressed: false, version: nil, expires_in: nil, expires_at: nil, **)
@value = value
@version = version
@created_at = 0.0
@expires_in = expires_at&.to_f || expires_in && (expires_in.to_f + Time.now.to_f)
@compressed = true if compressed
end

def value
compressed? ? uncompress(@value) : @value
end

def mismatched?(version)
@version && version && @version != version
end

# Checks if the entry is expired. The +expires_in+ parameter can override
# the value set when the entry was created.
def expired?
@expires_in && @created_at + @expires_in <= Time.now.to_f
end

def expires_at
@expires_in ? @created_at + @expires_in : nil
end

def expires_at=(value)
if value
@expires_in = value.to_f - @created_at
else
@expires_in = nil
end
end

# Returns the size of the cached value. This could be less than
# <tt>value.bytesize</tt> if the data is compressed.
def bytesize
case value
when NilClass
0
when String
@value.bytesize
else
@s ||= Marshal.dump(@value).bytesize
end
end

def compressed? # :nodoc:
defined?(@compressed)
end

def compressed(compress_threshold)
return self if compressed?

case @value
when nil, true, false, Numeric
uncompressed_size = 0
when String
uncompressed_size = @value.bytesize
else
serialized = Marshal.dump(@value)
uncompressed_size = serialized.bytesize
end

if uncompressed_size >= compress_threshold
serialized ||= Marshal.dump(@value)
compressed = Zlib::Deflate.deflate(serialized)

if compressed.bytesize < uncompressed_size
return Entry.new(compressed, compressed: true, expires_at: expires_at, version: version)
end
end
self
end

def local?
false
end

# Duplicates the value in a class. This is used by cache implementations that don't natively
# serialize entries to protect against accidental cache modifications.
def dup_value!
if @value && !compressed? && !(@value.is_a?(Numeric) || @value == true || @value == false)
if @value.is_a?(String)
@value = @value.dup
else
@value = Marshal.load(Marshal.dump(@value))
end
end
end

def pack
members = [value, expires_at, version]
members.pop while !members.empty? && members.last.nil?
members
end

private
def uncompress(value)
Marshal.load(Zlib::Inflate.inflate(value))
end
end
end
end
13 changes: 8 additions & 5 deletions activesupport/lib/active_support/cache/redis_cache_store.rb
Expand Up @@ -10,6 +10,7 @@
end

require "connection_pool"
require "active_support/core_ext/hash/slice"
require "active_support/core_ext/numeric/time"
require "active_support/digest"

Expand Down Expand Up @@ -141,7 +142,12 @@ def build_redis_client(**redis_options)
# cache.fetch('bar', skip_nil: true) { nil }
# cache.exist?('foo') # => true
# cache.exist?('bar') # => false
def initialize(namespace: nil, compress: true, compress_threshold: 1.kilobyte, coder: default_coder, expires_in: nil, race_condition_ttl: nil, error_handler: DEFAULT_ERROR_HANDLER, skip_nil: false, **redis_options)
def initialize(error_handler: DEFAULT_ERROR_HANDLER, **redis_options)
base_options = redis_options.extract!(
:coder, :compress, :compress_threshold,
:namespace, :expires_in, :race_condition_ttl, :skip_nil,
)

if pool_options = self.class.send(:retrieve_pool_options, redis_options)
@redis = ::ConnectionPool.new(pool_options) { self.class.build_redis(**redis_options) }
else
Expand All @@ -152,10 +158,7 @@ def initialize(namespace: nil, compress: true, compress_threshold: 1.kilobyte, c
@error_handler = error_handler
@supports_pipelining = true

super namespace: namespace,
compress: compress, compress_threshold: compress_threshold,
expires_in: expires_in, race_condition_ttl: race_condition_ttl,
coder: coder, skip_nil: skip_nil
super(base_options)
end

def inspect
Expand Down
@@ -1,5 +1,6 @@
# frozen_string_literal: true

require "zlib"
require "active_support/core_ext/kernel/reporting"

module ActiveSupport
Expand Down

0 comments on commit 1d4c288

Please sign in to comment.