Skip to content
Permalink
Browse files

adds support for limits in batch processing

  • Loading branch information
fxn committed Jul 13, 2016
1 parent 3b55ac2 commit 451437c6f57e66cc7586ec966e530493927098c7
@@ -1,3 +1,18 @@
* The flag `error_on_ignored_order_or_limit` has been deprecated in favor of
the current `error_on_ignored_order`.

*Xavier Noria*

* Batch processing methods support `limit`:

Post.limit(10_000).find_each do |post|
# ...
end

It also works in `find_in_batches` and `in_batches`.

*Xavier Noria*

* Using `group` with an attribute that has a custom type will properly cast
the hash keys after calling a calculation method like `count`. Fixes #25595.

@@ -72,11 +72,20 @@ def self.configurations

##
# :singleton-method:
# Specifies if an error should be raised on query limit or order being
# Specifies if an error should be raised if the query has an order being
# ignored when doing batch queries. Useful in applications where the
# limit or scope being ignored is error-worthy, rather than a warning.
# scope being ignored is error-worthy, rather than a warning.
mattr_accessor :error_on_ignored_order, instance_writer: false
self.error_on_ignored_order = false

mattr_accessor :error_on_ignored_order_or_limit, instance_writer: false
self.error_on_ignored_order_or_limit = false
def self.error_on_ignored_order_or_limit=(value)
ActiveSupport::Deprecation.warn(<<-MSG.squish)
The flag error_on_ignored_order_or_limit is deprecated. Limits are
now supported. Please use error_on_ignored_order= instead.
MSG
self.error_on_ignored_order = value
end

##
# :singleton-method:
@@ -1,8 +1,8 @@
require "active_record/relation/batches/batch_enumerator"
require 'active_record/relation/batches/batch_enumerator'

module ActiveRecord
module Batches
ORDER_OR_LIMIT_IGNORED_MESSAGE = "Scoped order and limit are ignored, it's forced to be batch order and batch size."
ORDER_IGNORE_MESSAGE = "Scoped order is ignored, it's forced to be batch order."

# Looping through a collection of records from the database
# (using the Scoping::Named::ClassMethods.all method, for example)
@@ -34,15 +34,19 @@ module Batches
# * <tt>:start</tt> - Specifies the primary key value to start from, inclusive of the value.
# * <tt>:finish</tt> - Specifies the primary key value to end at, inclusive of the value.
# * <tt>:error_on_ignore</tt> - Overrides the application config to specify if an error should be raised when
# the order and limit have to be ignored due to batching.
# the order has to be ignored due to batching.
#
# This is especially useful if you want multiple workers dealing with
# the same processing queue. You can make worker 1 handle all the records
# between id 0 and 10,000 and worker 2 handle from 10,000 and beyond
# (by setting the +:start+ and +:finish+ option on each worker).
# Limits are honored, and if present there is no requirement for the batch
# size, it can be less than, equal, or greater than the limit.
#
# # Let's process for a batch of 2000 records, skipping the first 2000 rows
# Person.find_each(start: 2000, batch_size: 2000) do |person|
# The options +start+ and +finish+ are especially useful if you want
# multiple workers dealing with the same processing queue. You can make
# worker 1 handle all the records between id 1 and 9999 and worker 2
# handle from 10000 and beyond by setting the +:start+ and +:finish+
# option on each worker.
#
# # Let's process from record 10_000 on.
# Person.find_each(start: 10_000) do |person|
# person.party_all_night!
# end
#
@@ -51,8 +55,8 @@ module Batches
# work. This also means that this method only works when the primary key is
# orderable (e.g. an integer or string).
#
# NOTE: You can't set the limit either, that's used to control
# the batch sizes.
# NOTE: By its nature, batch processing is subject to race conditions if
# other processes are modifying the database.
def find_each(start: nil, finish: nil, batch_size: 1000, error_on_ignore: nil)
if block_given?
find_in_batches(start: start, finish: finish, batch_size: batch_size, error_on_ignore: error_on_ignore) do |records|
@@ -89,15 +93,19 @@ def find_each(start: nil, finish: nil, batch_size: 1000, error_on_ignore: nil)
# * <tt>:start</tt> - Specifies the primary key value to start from, inclusive of the value.
# * <tt>:finish</tt> - Specifies the primary key value to end at, inclusive of the value.
# * <tt>:error_on_ignore</tt> - Overrides the application config to specify if an error should be raised when
# the order and limit have to be ignored due to batching.
# the order has to be ignored due to batching.
#
# Limits are honored, and if present there is no requirement for the batch
# size, it can be less than, equal, or greater than the limit.
#
# This is especially useful if you want multiple workers dealing with
# the same processing queue. You can make worker 1 handle all the records
# between id 0 and 10,000 and worker 2 handle from 10,000 and beyond
# (by setting the +:start+ and +:finish+ option on each worker).
# The options +start+ and +finish+ are especially useful if you want
# multiple workers dealing with the same processing queue. You can make
# worker 1 handle all the records between id 1 and 9999 and worker 2
# handle from 10000 and beyond by setting the +:start+ and +:finish+
# option on each worker.
#
# # Let's process the next 2000 records
# Person.find_in_batches(start: 2000, batch_size: 2000) do |group|
# # Let's process from record 10_000 on.
# Person.find_in_batches(start: 10_000) do |group|
# group.each { |person| person.party_all_night! }
# end
#
@@ -106,8 +114,8 @@ def find_each(start: nil, finish: nil, batch_size: 1000, error_on_ignore: nil)
# work. This also means that this method only works when the primary key is
# orderable (e.g. an integer or string).
#
# NOTE: You can't set the limit either, that's used to control
# the batch sizes.
# NOTE: By its nature, batch processing is subject to race conditions if
# other processes are modifying the database.
def find_in_batches(start: nil, finish: nil, batch_size: 1000, error_on_ignore: nil)
relation = self
unless block_given?
@@ -149,17 +157,19 @@ def find_in_batches(start: nil, finish: nil, batch_size: 1000, error_on_ignore:
# * <tt>:start</tt> - Specifies the primary key value to start from, inclusive of the value.
# * <tt>:finish</tt> - Specifies the primary key value to end at, inclusive of the value.
# * <tt>:error_on_ignore</tt> - Overrides the application config to specify if an error should be raised when
# the order and limit have to be ignored due to batching.
# the order has to be ignored due to batching.
#
# Limits are honored, and if present there is no requirement for the batch
# size, it can be less than, equal, or greater than the limit.
#
# This is especially useful if you want to work with the
# ActiveRecord::Relation object instead of the array of records, or if
# you want multiple workers dealing with the same processing queue. You can
# make worker 1 handle all the records between id 0 and 10,000 and worker 2
# handle from 10,000 and beyond (by setting the +:start+ and +:finish+
# option on each worker).
# The options +start+ and +finish+ are especially useful if you want
# multiple workers dealing with the same processing queue. You can make
# worker 1 handle all the records between id 1 and 9999 and worker 2
# handle from 10000 and beyond by setting the +:start+ and +:finish+
# option on each worker.
#
# # Let's process the next 2000 records
# Person.in_batches(of: 2000, start: 2000).update_all(awesome: true)
# # Let's process from record 10_000 on.
# Person.in_batches(start: 10_000).update_all(awesome: true)
#
# An example of calling where query method on the relation:
#
@@ -179,31 +189,38 @@ def find_in_batches(start: nil, finish: nil, batch_size: 1000, error_on_ignore:
# consistent. Therefore the primary key must be orderable, e.g an integer
# or a string.
#
# NOTE: You can't set the limit either, that's used to control the batch
# sizes.
# NOTE: By its nature, batch processing is subject to race conditions if
# other processes are modifying the database.
def in_batches(of: 1000, start: nil, finish: nil, load: false, error_on_ignore: nil)
relation = self
unless block_given?
return BatchEnumerator.new(of: of, start: start, finish: finish, relation: self)
end

if arel.orders.present? || arel.taken.present?
act_on_order_or_limit_ignored(error_on_ignore)
if arel.orders.present?
act_on_ignored_order(error_on_ignore)
end

batch_limit = of
if limit_value
remaining = limit_value
batch_limit = remaining if remaining < batch_limit
relation = relation.limit(nil) # new relation without the limit
end

relation = relation.reorder(batch_order).limit(of)
relation = relation.reorder(batch_order).limit(batch_limit)
relation = apply_limits(relation, start, finish)
batch_relation = relation

loop do
if load
records = batch_relation.records
ids = records.map(&:id)
yielded_relation = self.where(primary_key => ids)
yielded_relation = where(primary_key => ids)
yielded_relation.load_records(records)
else
ids = batch_relation.pluck(primary_key)
yielded_relation = self.where(primary_key => ids)
yielded_relation = where(primary_key => ids)
end

break if ids.empty?
@@ -213,7 +230,20 @@ def in_batches(of: 1000, start: nil, finish: nil, load: false, error_on_ignore:

yield yielded_relation

break if ids.length < of
break if ids.length < batch_limit

if limit_value
remaining -= ids.length

if remaining == 0
# Saves a useless iteration when the limit is a multiple of the
# batch size.
break
elsif remaining < batch_limit
relation = relation.limit(remaining)
end
end

batch_relation = relation.where(arel_attribute(primary_key).gt(primary_key_offset))
end
end
@@ -230,13 +260,13 @@ def batch_order
"#{quoted_table_name}.#{quoted_primary_key} ASC"
end

def act_on_order_or_limit_ignored(error_on_ignore)
raise_error = (error_on_ignore.nil? ? self.klass.error_on_ignored_order_or_limit : error_on_ignore)
def act_on_ignored_order(error_on_ignore)
raise_error = (error_on_ignore.nil? ? self.klass.error_on_ignored_order : error_on_ignore)

if raise_error
raise ArgumentError.new(ORDER_OR_LIMIT_IGNORED_MESSAGE)
raise ArgumentError.new(ORDER_IGNORE_MESSAGE)
elsif logger
logger.warn(ORDER_OR_LIMIT_IGNORED_MESSAGE)
logger.warn(ORDER_IGNORE_MESSAGE)
end
end
end

4 comments on commit 451437c

@kaspth

This comment has been minimized.

Copy link
Member

@kaspth kaspth replied Jul 13, 2016

Nice ❤️

@TOAST3R

This comment has been minimized.

Copy link

@TOAST3R TOAST3R replied Jul 14, 2016

Yes really nice 😄

@pcreux

This comment has been minimized.

Copy link
Contributor

@pcreux pcreux replied Jul 20, 2016

Thank you @fxn ! This one bit me more than once! :)

@subodhkhanduri-oyo

This comment has been minimized.

Copy link

@subodhkhanduri-oyo subodhkhanduri-oyo replied May 31, 2017

Thank you! 👍 😃

Please sign in to comment.
You can’t perform that action at this time.