Skip to content

Commit

Permalink
Add ActiveSupport::MessagePack
Browse files Browse the repository at this point in the history
`ActiveSupport::MessagePack` is a serializer that integrates with the
`msgpack` gem to serialize a variety of Ruby objects.  `AS::MessagePack`
supports several types beyond the base types that `msgpack` supports,
including `Time` and `Range`, as well as Active Support types such as
`AS::TimeWithZone` and `AS::HashWithIndifferentAccess`.

Compared to `JSON` and `Marshal`, `AS::MessagePack` can provide a
performance improvement and message size reduction.  For example, when
used with `MessageVerifier`:

  ```ruby
  # frozen_string_literal: true

  require "benchmark/ips"
  require "active_support/all"
  require "active_support/message_pack"

  marshal_verifier = ActiveSupport::MessageVerifier.new("secret", serializer: Marshal)
  json_verifier = ActiveSupport::MessageVerifier.new("secret", serializer: JSON)
  asjson_verifier = ActiveSupport::MessageVerifier.new("secret", serializer: ActiveSupport::JSON)
  msgpack_verifier = ActiveSupport::MessageVerifier.new("secret", serializer: ActiveSupport::MessagePack)

  ActiveSupport::Messages::Metadata.use_message_serializer_for_metadata = true
  expiry = 1.year.from_now
  data = { bool: true, num: 123456789, string: "x" * 50 }

  Benchmark.ips do |x|
    x.report("Marshal") do
      marshal_verifier.verify(marshal_verifier.generate(data, expires_at: expiry))
    end

    x.report("JSON") do
      json_verifier.verify(json_verifier.generate(data, expires_at: expiry))
    end

    x.report("AS::JSON") do
      asjson_verifier.verify(asjson_verifier.generate(data, expires_at: expiry))
    end

    x.report("MessagePack") do
      msgpack_verifier.verify(msgpack_verifier.generate(data, expires_at: expiry))
    end

    x.compare!
  end

  puts "Marshal size: #{marshal_verifier.generate(data, expires_at: expiry).bytesize}"
  puts "JSON size: #{json_verifier.generate(data, expires_at: expiry).bytesize}"
  puts "MessagePack size: #{msgpack_verifier.generate(data, expires_at: expiry).bytesize}"
  ```

  ```
  Warming up --------------------------------------
               Marshal     1.206k i/100ms
                  JSON     1.165k i/100ms
              AS::JSON   790.000  i/100ms
           MessagePack     1.798k i/100ms
  Calculating -------------------------------------
               Marshal     11.748k (± 1.3%) i/s -     59.094k in   5.031071s
                  JSON     11.498k (± 1.4%) i/s -     58.250k in   5.066957s
              AS::JSON      7.867k (± 2.4%) i/s -     39.500k in   5.024055s
           MessagePack     17.865k (± 0.8%) i/s -     89.900k in   5.032592s

  Comparison:
           MessagePack:    17864.9 i/s
               Marshal:    11747.8 i/s - 1.52x  (± 0.00) slower
                  JSON:    11498.4 i/s - 1.55x  (± 0.00) slower
              AS::JSON:     7866.9 i/s - 2.27x  (± 0.00) slower

  Marshal size: 254
  JSON size: 234
  MessagePack size: 194
  ```

Additionally, `ActiveSupport::MessagePack::CacheSerializer` is a
serializer that is suitable for use as an `ActiveSupport::Cache` coder.
`AS::MessagePack::CacheSerializer` can serialize `ActiveRecord::Base`
instances, including loaded associations.  Like `AS::MessagePack`, it
provides a performance improvement and payload size reduction:

  ```ruby
  # frozen_string_literal: true

  require "benchmark/ips"
  require "active_support/message_pack"

  ActiveRecord::Base.establish_connection(adapter: "sqlite3", database: ":memory:")

  ActiveRecord::Schema.define do
    create_table :posts, force: true do |t|
      t.string :body
      t.timestamps
    end

    create_table :comments, force: true do |t|
      t.integer :post_id
      t.string :body
      t.timestamps
    end
  end

  class Post < ActiveRecord::Base
    has_many :comments
  end

  class Comment < ActiveRecord::Base
    belongs_to :post
  end

  post = Post.create!(body: "x" * 100)
  2.times { post.comments.create!(body: "x" * 100) }
  post.comments.load
  cache_entry = ActiveSupport::Cache::Entry.new(post)

  Rails70Coder = ActiveSupport::Cache::Coders::Rails70Coder
  CacheSerializer = ActiveSupport::MessagePack::CacheSerializer

  Benchmark.ips do |x|
    x.report("Rails70Coder") do
      Rails70Coder.load(Rails70Coder.dump(cache_entry))
    end

    x.report("CacheSerializer") do
      CacheSerializer.load(CacheSerializer.dump(cache_entry))
    end

    x.compare!
  end

  puts "Rails70Coder size: #{Rails70Coder.dump(cache_entry).bytesize}"
  puts "CacheSerializer size: #{CacheSerializer.dump(cache_entry).bytesize}"
  ```

  ```
  Warming up --------------------------------------
          Rails70Coder   329.000  i/100ms
       CacheSerializer   492.000  i/100ms
  Calculating -------------------------------------
          Rails70Coder      3.285k (± 1.7%) i/s -     16.450k in   5.008447s
       CacheSerializer      4.895k (± 2.4%) i/s -     24.600k in   5.028803s

  Comparison:
       CacheSerializer:     4894.7 i/s
          Rails70Coder:     3285.4 i/s - 1.49x  slower

  Rails70Coder size: 808
  CacheSerializer size: 593
  ```

Co-authored-by: Jean Boussier <jean.boussier@gmail.com>
  • Loading branch information
jonathanhefner and byroot committed Apr 17, 2023
1 parent bd8aeea commit a2a6331
Show file tree
Hide file tree
Showing 15 changed files with 1,010 additions and 4 deletions.
1 change: 1 addition & 0 deletions Gemfile
Expand Up @@ -68,6 +68,7 @@ gem "listen", "~> 3.3", require: false
gem "libxml-ruby", platforms: :ruby
gem "connection_pool", require: false
gem "rexml", require: false
gem "msgpack", ">= 1.7.0", require: false

# for railties
gem "bootsnap", ">= 1.4.4", require: false
Expand Down
3 changes: 2 additions & 1 deletion Gemfile.lock
Expand Up @@ -320,7 +320,7 @@ GEM
mixlib-shellout (3.2.7)
chef-utils
mono_logger (1.1.1)
msgpack (1.6.0)
msgpack (1.7.0)
multi_json (1.15.0)
multipart-post (2.2.3)
mustermann (3.0.0)
Expand Down Expand Up @@ -580,6 +580,7 @@ DEPENDENCIES
minitest-bisect
minitest-ci
minitest-retry
msgpack (>= 1.7.0)
mysql2 (~> 0.5)
nokogiri (>= 1.8.1, != 1.11.0)
pg (~> 1.3)
Expand Down
124 changes: 124 additions & 0 deletions activerecord/lib/active_record/message_pack.rb
@@ -0,0 +1,124 @@
# frozen_string_literal: true

module ActiveRecord
module MessagePack # :nodoc:
FORMAT_VERSION = 1

class << self
def dump(input)
encoder = Encoder.new
[FORMAT_VERSION, encoder.encode(input), encoder.entries]
end

def load(dumped)
format_version, top_level, entries = dumped
unless format_version == FORMAT_VERSION
raise "Invalid format version: #{format_version.inspect}"
end
Decoder.new(entries).decode(top_level)
end
end

module Extensions
extend self

def install(registry)
registry.register_type 119, ActiveModel::Type::Binary::Data,
packer: :to_s,
unpacker: :new

registry.register_type 120, ActiveRecord::Base,
packer: method(:write_record),
unpacker: method(:read_record),
recursive: true
end

def write_record(record, packer)
packer.write(ActiveRecord::MessagePack.dump(record))
end

def read_record(unpacker)
ActiveRecord::MessagePack.load(unpacker.read)
end
end

class Encoder
attr_reader :entries

def initialize
@entries = []
@refs = {}.compare_by_identity
end

def encode(input)
if input.is_a?(Array)
input.map { |record| encode_record(record) }
elsif input
encode_record(input)
end
end

def encode_record(record)
ref = @refs[record]

if !ref
ref = @refs[record] = @entries.size
@entries << build_entry(record)
add_cached_associations(record, @entries.last)
end

ref
end

def build_entry(record)
[
ActiveSupport::MessagePack::Extensions.dump_class(record.class),
record.attributes_for_database,
record.new_record?
]
end

def add_cached_associations(record, entry)
record.class.reflections.each_value do |reflection|
if record.association_cached?(reflection.name)
entry << reflection.name << encode(record.association(reflection.name).target)
end
end
end
end

class Decoder
def initialize(entries)
@records = entries.map { |entry| build_record(entry) }
@records.zip(entries) { |record, entry| resolve_cached_associations(record, entry) }
end

def decode(ref)
if ref.is_a?(Array)
ref.map { |r| @records[r] }
elsif ref
@records[ref]
end
end

def build_record(entry)
class_name, attributes_hash, is_new_record, * = entry
klass = ActiveSupport::MessagePack::Extensions.load_class(class_name)
attributes = klass.attributes_builder.build_from_database(attributes_hash)
klass.allocate.init_with_attributes(attributes, is_new_record)
end

def resolve_cached_associations(record, entry)
i = 3 # entry == [class_name, attributes_hash, is_new_record, *associations]
while i < entry.length
begin
record.association(entry[i]).target = decode(entry[i + 1])
rescue ActiveRecord::AssociationNotFoundError
# The association no longer exists, so just skip it.
end
i += 2
end
end
end
end
end
9 changes: 9 additions & 0 deletions activerecord/lib/active_record/railtie.rb
Expand Up @@ -424,5 +424,14 @@ class Railtie < Rails::Railtie # :nodoc:
end
end
end

initializer "active_record.message_pack" do
ActiveSupport.on_load(:message_pack) do
ActiveSupport.on_load(:active_record) do
require "active_record/message_pack"
ActiveRecord::MessagePack::Extensions.install(ActiveSupport::MessagePack::CacheSerializer)
end
end
end
end
end
89 changes: 89 additions & 0 deletions activerecord/test/cases/message_pack_test.rb
@@ -0,0 +1,89 @@
# frozen_string_literal: true

require "cases/helper"
require "models/author"
require "models/binary"
require "models/comment"
require "models/post"
require "active_support/message_pack"
require "active_record/message_pack"

class ActiveRecordMessagePackTest < ActiveRecord::TestCase
test "enshrines type IDs" do
expected = {
119 => ActiveModel::Type::Binary::Data,
120 => ActiveRecord::Base,
}

factory = ::MessagePack::Factory.new
ActiveRecord::MessagePack::Extensions.install(factory)
actual = factory.registered_types.to_h do |entry|
[entry[:type], entry[:class]]
end

assert_equal expected, actual
end

test "roundtrips record and cached associations" do
post = Post.create!(title: "A Title", body: "A body.")
post.create_author!(name: "An Author")
post.comments.create!(body: "A comment.")
post.comments.create!(body: "Another comment.", author: post.author)
post.comments.load

assert_no_queries do
roundtripped_post = roundtrip(post)

assert_equal post, roundtripped_post
assert_equal post.author, roundtripped_post.author
assert_equal post.comments.to_a, roundtripped_post.comments.to_a
assert_equal post.comments.map(&:author), roundtripped_post.comments.map(&:author)

assert_same roundtripped_post, roundtripped_post.comments[0].post
assert_same roundtripped_post, roundtripped_post.comments[1].post
assert_same roundtripped_post.author, roundtripped_post.comments[1].author
end
end

test "roundtrips new_record? status" do
post = Post.new(title: "A Title", body: "A body.")
post.create_author!(name: "An Author")

assert_no_queries do
roundtripped_post = roundtrip(post)

assert_equal post.attributes, roundtripped_post.attributes
assert_equal post.new_record?, roundtripped_post.new_record?
assert_equal post.author, roundtripped_post.author
assert_equal post.author.new_record?, roundtripped_post.author.new_record?
end
end

test "roundtrips binary attribute" do
binary = Binary.new(data: Marshal.dump("data"))
assert_equal binary.attributes, roundtrip(binary).attributes
end

test "raises ActiveSupport::MessagePack::MissingClassError if record class no longer exists" do
klass = Class.new(Post)
def klass.name; "SomeLegacyClass"; end
dumped = serializer.dump(klass.new(title: "A Title", body: "A body."))

assert_raises ActiveSupport::MessagePack::MissingClassError do
serializer.load(dumped)
end
end

private
def serializer
@serializer ||= ::MessagePack::Factory.new.tap do |factory|
ActiveRecord::MessagePack::Extensions.install(factory)
ActiveSupport::MessagePack::Extensions.install(factory)
ActiveSupport::MessagePack::Extensions.install_unregistered_type_error(factory)
end
end

def roundtrip(input)
serializer.load(serializer.dump(input))
end
end
50 changes: 50 additions & 0 deletions activesupport/lib/active_support/message_pack.rb
@@ -0,0 +1,50 @@
# frozen_string_literal: true

begin
gem "msgpack", ">= 1.7.0"
require "msgpack"
rescue LoadError => error
warn "ActiveSupport::MessagePack requires the msgpack gem, version 1.7.0 or later. " \
"Please add it to your Gemfile: `gem \"msgpack\", \">= 1.7.0\"`"
raise error
end

require_relative "message_pack/cache_serializer"
require_relative "message_pack/serializer"

module ActiveSupport
module MessagePack
extend Serializer

##
# :singleton-method: dump
# :call-seq: dump(object)
#
# Dumps an object. Raises ActiveSupport::MessagePack::UnserializableObjectError
# if the object type is not supported.
#
#--
# Implemented by Serializer#dump.

##
# :singleton-method: load
# :call-seq: load(dumped)
#
# Loads an object dump created by ::dump.
#
#--
# Implemented by Serializer#load.

##
# :singleton-method: signature?
# :call-seq: signature?(dumped)
#
# Returns true if the given dump begins with an +ActiveSupport::MessagePack+
# signature.
#
#--
# Implemented by Serializer#signature?.

ActiveSupport.run_load_hooks(:message_pack, self)
end
end
44 changes: 44 additions & 0 deletions activesupport/lib/active_support/message_pack/cache_serializer.rb
@@ -0,0 +1,44 @@
# frozen_string_literal: true

require_relative "serializer"

module ActiveSupport
module MessagePack
module CacheSerializer
include Serializer
extend self

ZLIB_HEADER = "\x78"

def dump(entry)
super(entry.pack)
end

def dump_compressed(entry, threshold) # :nodoc:
dumped = dump(entry)
if dumped.bytesize >= threshold
compressed = Zlib::Deflate.deflate(dumped)
compressed.bytesize < dumped.bytesize ? compressed : dumped
else
dumped
end
end

def load(dumped)
dumped = Zlib::Inflate.inflate(dumped) if compressed?(dumped)
ActiveSupport::Cache::Entry.unpack(super)
rescue ActiveSupport::MessagePack::MissingClassError
# Treat missing class as cache miss => return nil
end

private
def compressed?(dumped)
dumped.start_with?(ZLIB_HEADER)
end

def install_unregistered_type_handler
Extensions.install_unregistered_type_fallback(message_pack_factory)
end
end
end
end

0 comments on commit a2a6331

Please sign in to comment.