Skip to content

Commit

Permalink
Support batching using composite primary keys and multiple column ord…
Browse files Browse the repository at this point in the history
…ering

When find_each/find_in_batches/in_batches are performed on a table with composite primary keys, ascending or descending order can be selected for each key.

    ```ruby
    Person.find_each(order: [:desc, :asc]) do |person|
      person.party_all_night!
    end
    ```
  • Loading branch information
TakuyaKurimoto committed Jun 5, 2023
1 parent 628cdf9 commit 9452b59
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 34 deletions.
12 changes: 12 additions & 0 deletions activerecord/CHANGELOG.md
@@ -1,3 +1,15 @@
* Support multiple column ordering for `find_each`, `find_in_batches` and `in_batches`.

When find_each/find_in_batches/in_batches are performed on a table with composite primary keys, ascending or descending order can be selected for each key.

```ruby
Person.find_each(order: [:desc, :asc]) do |person|
person.party_all_night!
end
```

*Takuya Kurimoto*

* Fix where on association with has_one/has_many polymorphic relations.

Before:
Expand Down
111 changes: 77 additions & 34 deletions activerecord/lib/active_record/relation/batches.rb
Expand Up @@ -6,6 +6,7 @@ module ActiveRecord
# = Active Record \Batches
module Batches
ORDER_IGNORE_MESSAGE = "Scoped order is ignored, it's forced to be batch order."
DEFAULT_ORDER = :asc

# Looping through a collection of records from the database
# (using the Scoping::Named::ClassMethods.all method, for example)
Expand Down Expand Up @@ -38,7 +39,16 @@ module Batches
# * <tt>:finish</tt> - Specifies the primary key value to end at, inclusive of the value.
# * <tt>:error_on_ignore</tt> - Overrides the application config to specify if an error should be raised when
# an order is present in the relation.
# * <tt>:order</tt> - Specifies the primary key order (can be +:asc+ or +:desc+). Defaults to +:asc+.
# * <tt>:order</tt> - Specifies the primary key order (can be +:asc+ or +:desc+ or an array consisting
# of :asc or :desc). Defaults to +:asc+.
#
# class Order < ActiveRecord::Base
# self.primary_key = [:id_1, :id_2]
# end
#
# Order.find_each(order: [:asc, :desc])
#
# In the above code, +id_1+ is sorted in ascending order and +id_2+ in descending order.
#
# Limits are honored, and if present there is no requirement for the batch
# size: it can be less than, equal to, or greater than the limit.
Expand Down Expand Up @@ -66,15 +76,15 @@ module Batches
#
# NOTE: By its nature, batch processing is subject to race conditions if
# other processes are modifying the database.
def find_each(start: nil, finish: nil, batch_size: 1000, error_on_ignore: nil, order: :asc, &block)
def find_each(start: nil, finish: nil, batch_size: 1000, error_on_ignore: nil, order: DEFAULT_ORDER, &block)
if block_given?
find_in_batches(start: start, finish: finish, batch_size: batch_size, error_on_ignore: error_on_ignore, order: order) do |records|
records.each(&block)
end
else
enum_for(:find_each, start: start, finish: finish, batch_size: batch_size, error_on_ignore: error_on_ignore, order: order) do
relation = self
apply_limits(relation, start, finish, order).size
apply_limits(relation, start, finish, build_batch_orders(order)).size
end
end
end
Expand Down Expand Up @@ -103,7 +113,16 @@ def find_each(start: nil, finish: nil, batch_size: 1000, error_on_ignore: nil, o
# * <tt>:finish</tt> - Specifies the primary key value to end at, inclusive of the value.
# * <tt>:error_on_ignore</tt> - Overrides the application config to specify if an error should be raised when
# an order is present in the relation.
# * <tt>:order</tt> - Specifies the primary key order (can be +:asc+ or +:desc+). Defaults to +:asc+.
# * <tt>:order</tt> - Specifies the primary key order (can be +:asc+ or +:desc+ or an array consisting
# of :asc or :desc). Defaults to +:asc+.
#
# class Order < ActiveRecord::Base
# self.primary_key = [:id_1, :id_2]
# end
#
# Order.find_in_batches(order: [:asc, :desc])
#
# In the above code, +id_1+ is sorted in ascending order and +id_2+ in descending order.
#
# Limits are honored, and if present there is no requirement for the batch
# size: it can be less than, equal to, or greater than the limit.
Expand All @@ -126,11 +145,11 @@ def find_each(start: nil, finish: nil, batch_size: 1000, error_on_ignore: nil, o
#
# NOTE: By its nature, batch processing is subject to race conditions if
# other processes are modifying the database.
def find_in_batches(start: nil, finish: nil, batch_size: 1000, error_on_ignore: nil, order: :asc)
def find_in_batches(start: nil, finish: nil, batch_size: 1000, error_on_ignore: nil, order: DEFAULT_ORDER)
relation = self
unless block_given?
return to_enum(:find_in_batches, start: start, finish: finish, batch_size: batch_size, error_on_ignore: error_on_ignore, order: order) do
total = apply_limits(relation, start, finish, order).size
total = apply_limits(relation, start, finish, build_batch_orders(order)).size
(total - 1).div(batch_size) + 1
end
end
Expand Down Expand Up @@ -168,7 +187,17 @@ def find_in_batches(start: nil, finish: nil, batch_size: 1000, error_on_ignore:
# * <tt>:finish</tt> - Specifies the primary key value to end at, inclusive of the value.
# * <tt>:error_on_ignore</tt> - Overrides the application config to specify if an error should be raised when
# an order is present in the relation.
# * <tt>:order</tt> - Specifies the primary key order (can be +:asc+ or +:desc+). Defaults to +:asc+.
# * <tt>:order</tt> - Specifies the primary key order (can be +:asc+ or +:desc+ or an array consisting
# of :asc or :desc). Defaults to +:asc+.
#
# class Order < ActiveRecord::Base
# self.primary_key = [:id_1, :id_2]
# end
#
# Order.in_batches(order: [:asc, :desc])
#
# In the above code, +id_1+ is sorted in ascending order and +id_2+ in descending order.
#
# * <tt>:use_ranges</tt> - Specifies whether to use range iteration (id >= x AND id <= y).
# It can make iterating over the whole or almost whole tables several times faster.
# Only whole table iterations use this style of iteration by default. You can disable this behavior by passing +false+.
Expand Down Expand Up @@ -207,11 +236,11 @@ def find_in_batches(start: nil, finish: nil, batch_size: 1000, error_on_ignore:
#
# NOTE: By its nature, batch processing is subject to race conditions if
# other processes are modifying the database.
def in_batches(of: 1000, start: nil, finish: nil, load: false, error_on_ignore: nil, order: :asc, use_ranges: nil)
def in_batches(of: 1000, start: nil, finish: nil, load: false, error_on_ignore: nil, order: DEFAULT_ORDER, use_ranges: nil)
relation = self

unless [:asc, :desc].include?(order)
raise ArgumentError, ":order must be :asc or :desc, got #{order.inspect}"
unless Array(order).all? { |ord| [:asc, :desc].include?(ord) }
raise ArgumentError, ":order must be :asc or :desc or an array consisting of :asc or :desc, got #{order.inspect}"
end

unless block_given?
Expand All @@ -228,8 +257,9 @@ def in_batches(of: 1000, start: nil, finish: nil, load: false, error_on_ignore:
batch_limit = remaining if remaining < batch_limit
end

relation = relation.reorder(*batch_order(order)).limit(batch_limit)
relation = apply_limits(relation, start, finish, order)
batch_orders = build_batch_orders(order)
relation = relation.reorder(batch_orders.to_h).limit(batch_limit)
relation = apply_limits(relation, start, finish, batch_orders)
relation.skip_query_cache! # Retaining the results in the query cache would undermine the point of batching
batch_relation = relation
empty_scope = to_sql == klass.unscoped.all.to_sql
Expand All @@ -244,7 +274,7 @@ def in_batches(of: 1000, start: nil, finish: nil, load: false, error_on_ignore:
ids = batch_relation.ids
finish = ids.last
if finish
yielded_relation = apply_finish_limit(batch_relation, finish, order)
yielded_relation = apply_finish_limit(batch_relation, finish, batch_orders)
yielded_relation = yielded_relation.except(:limit, :order)
yielded_relation.skip_query_cache!(false)
end
Expand Down Expand Up @@ -274,46 +304,55 @@ def in_batches(of: 1000, start: nil, finish: nil, load: false, error_on_ignore:
end
end

batch_relation = batch_condition(
relation, primary_key, primary_key_offset, order == :desc ? :lt : :gt
)
batch_orders_copy = batch_orders.dup
_last_column, last_order = batch_orders_copy.pop
operators = batch_orders_copy.map do |_column, order|
order == :desc ? :lteq : :gteq
end
operators << (last_order == :desc ? :lt : :gt)

batch_relation = batch_condition(relation, primary_key, primary_key_offset, operators)
end
end

private
def apply_limits(relation, start, finish, order)
relation = apply_start_limit(relation, start, order) if start
relation = apply_finish_limit(relation, finish, order) if finish
def apply_limits(relation, start, finish, batch_orders)
relation = apply_start_limit(relation, start, batch_orders) if start
relation = apply_finish_limit(relation, finish, batch_orders) if finish
relation
end

def apply_start_limit(relation, start, order)
batch_condition(relation, primary_key, start, order == :desc ? :lteq : :gteq)
def apply_start_limit(relation, start, batch_orders)
operators = batch_orders.map do |_column, order|
order == :desc ? :lteq : :gteq
end
batch_condition(relation, primary_key, start, operators)
end

def apply_finish_limit(relation, finish, order)
batch_condition(relation, primary_key, finish, order == :desc ? :gteq : :lteq)
def apply_finish_limit(relation, finish, batch_orders)
operators = batch_orders.map do |_column, order|
order == :desc ? :gteq : :lteq
end
batch_condition(relation, primary_key, finish, operators)
end

def batch_condition(relation, columns, values, operator)
columns = Array(columns)
values = Array(values)
cursor_positions = columns.zip(values)
def batch_condition(relation, columns, values, operators)
cursor_positions = Array(columns).zip(Array(values), operators)

first_clause_column, first_clause_value = cursor_positions.pop
first_clause_column, first_clause_value, operator = cursor_positions.pop
where_clause = predicate_builder[first_clause_column, first_clause_value, operator]

cursor_positions.reverse_each do |column_name, value|
where_clause = predicate_builder[column_name, value, operator == :lteq ? :lt : :gt].or(
predicate_builder[column_name, value, :eq].and(where_clause)
)
cursor_positions.reverse_each do |column_name, value, operator|
where_clause = predicate_builder[column_name, value, operator].and(where_clause)
end

relation.where(where_clause)
end

def batch_order(order)
Array(primary_key).map { |column| table[column].public_send(order) }
def build_batch_orders(order)
get_the_order_of_primary_key(order).map do |column, ord|
[column, ord || DEFAULT_ORDER]
end
end

def act_on_ignored_order(error_on_ignore)
Expand All @@ -325,5 +364,9 @@ def act_on_ignored_order(error_on_ignore)
logger.warn(ORDER_IGNORE_MESSAGE)
end
end

def get_the_order_of_primary_key(order)
Array(primary_key).zip(Array(order))
end
end
end
38 changes: 38 additions & 0 deletions activerecord/test/cases/batches_test.rb
Expand Up @@ -796,4 +796,42 @@ def test_find_in_batches_should_return_a_sized_enumerator
relation = Cpk::Order.where("shop_id > ? OR shop_id = ? AND id > ?", shop_id, shop_id, id).in_batches(of: 1).first
assert_equal order2, relation.first
end

test ".find_each with multiple column ordering and using composite primary key" do
Cpk::Book.create(author_id: 1, number: 1)
Cpk::Book.create(author_id: 1, number: 2)
Cpk::Book.create(author_id: 1, number: 3)
books = Cpk::Book.order(author_id: :asc, number: :desc).to_a
Cpk::Book.find_each(batch_size: 1, order: [:asc, :desc]).with_index do |book, index|
assert_equal books[index], book
end
end

test ".in_batches should start from the start option when using composite primary key with multiple column ordering" do
Cpk::Book.create(author_id: 1, number: 1)
Cpk::Book.create(author_id: 1, number: 2)
Cpk::Book.create(author_id: 1, number: 3)
second_book = Cpk::Book.order(author_id: :asc, number: :desc).second
relation = Cpk::Book.in_batches(of: 1, start: second_book.id, order: [:asc, :desc]).first
assert_equal second_book, relation.first
end

test ".in_batches should end at the finish option when using composite primary key with multiple column ordering" do
Cpk::Book.create(author_id: 1, number: 1)
Cpk::Book.create(author_id: 1, number: 2)
Cpk::Book.create(author_id: 1, number: 3)
second_book = Cpk::Book.order(author_id: :asc, number: :desc).second
relation = Cpk::Book.in_batches(of: 1, finish: second_book.id, order: [:asc, :desc]).to_a.last
assert_equal second_book, relation.first
end

test ".in_batches with scope and multiple column ordering and using composite primary key" do
Cpk::Book.create(author_id: 1, number: 1)
Cpk::Book.create(author_id: 1, number: 2)
Cpk::Book.create(author_id: 1, number: 3)
book1, book2 = Cpk::Book.order(author_id: :asc, number: :desc).first(2)
author_id, number = book1.id
relation = Cpk::Book.where("author_id >= ? AND number < ?", author_id, number).in_batches(of: 1, order: [:asc, :desc]).first
assert_equal book2, relation.first
end
end

0 comments on commit 9452b59

Please sign in to comment.