Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

return the inserted ids for postgres bulk insert #178

Merged
merged 13 commits into from Apr 21, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
19 changes: 19 additions & 0 deletions README.markdown
Expand Up @@ -2,6 +2,25 @@

activerecord-import is a library for bulk inserting data using ActiveRecord.

One of its major features is following activerecord associations and generating the minimal
number of SQL insert statements required, avoiding the N+1 insert problem. An example probably
explains it best. Say you had a schema like this:

Publishers have Books
Books have Reviews

and you wanted to bulk insert 100 new publishers with 10K books and 3 reviews per book. This library will follow the associations
down and generate only 3 SQL insert statements - one for the publishers, one for the books, and one for the reviews.

In contrast, the standard ActiveRecord save would generate
100 insert statements for the publishers, then it would visit each publisher and save all the books:
100 * 10,000 = 1,000,000 SQL insert statements
and then the reviews:
100 * 10,000 * 3 = 3M SQL insert statements,

That would be about 4M SQL insert statements vs 3, which results in vastly improved performance. In our case, it converted
an 18 hour batch process to <2 hrs.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for updating this. I've created a local copy of the branch and have formatted and massaged this a little bit, but it's mostly intact.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hey thanks - I noticed you have stuff over in the wiki section, but it is important that folks realize why you bothered to write this in the first place.

This is important stuff. I think it should really make its way into the rails core ....

### Rails 4.0

Use activerecord-import 0.4.0 or higher.
Expand Down
Expand Up @@ -4,4 +4,3 @@
class ActiveRecord::ConnectionAdapters::PostgreSQLAdapter
include ActiveRecord::Import::PostgreSQLAdapter
end

2 changes: 1 addition & 1 deletion lib/activerecord-import/adapters/abstract_adapter.rb
Expand Up @@ -16,7 +16,7 @@ def insert_many( sql, values, *args ) # :nodoc:
sql2insert = base_sql + values.join( ',' ) + post_sql
insert( sql2insert, *args )

number_of_inserts
[number_of_inserts,[]]
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking of making this return an OpenStruct or some basic object in case we decide to ever include more information. That will allow us to do that without breaking the interface again.

I obviously didn't think of that happening originally. :)

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, ignore the above comment here. I'm more interested in the public interface that users consume, less so about internal library interfaces. Although I wouldn't be opposed if they all allowed for easier extension or addition.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another benefit is that of discovery:

With module:

> ActiveRecord::Base.connection.method(:post_sql_statements)
=> #<Method: ActiveRecord::ConnectionAdapters::PostgreSQLAdapter(ActiveRecord::Import::PostgreSQLAdapter)#post_sql_statements>

With alias:

> ActiveRecord::Base.connection.method(:post_sql_statements)
 #<Method: ActiveRecord::ConnectionAdapters::PostgreSQLAdapter#post_sql_statements>

end

def pre_sql_statements(options)
Expand Down
4 changes: 2 additions & 2 deletions lib/activerecord-import/adapters/mysql_adapter.rb
Expand Up @@ -5,7 +5,7 @@ module ActiveRecord::Import::MysqlAdapter
NO_MAX_PACKET = 0
QUERY_OVERHEAD = 8 #This was shown to be true for MySQL, but it's not clear where the overhead is from.

# +sql+ can be a single string or an array. If it is an array all
# +sql+ can be a single string or an array. If it is an array all
# elements that are in position >= 1 will be appended to the final SQL.
def insert_many( sql, values, *args ) # :nodoc:
# the number of inserts default
Expand Down Expand Up @@ -46,7 +46,7 @@ def insert_many( sql, values, *args ) # :nodoc:
end
end

number_of_inserts
[number_of_inserts,[]]
end

# Returns the maximum number of bytes that the server will allow
Expand Down
27 changes: 27 additions & 0 deletions lib/activerecord-import/adapters/postgresql_adapter.rb
@@ -1,7 +1,34 @@
module ActiveRecord::Import::PostgreSQLAdapter
include ActiveRecord::Import::ImportSupport

def insert_many( sql, values, *args ) # :nodoc:
number_of_inserts = 1

base_sql,post_sql = if sql.is_a?( String )
[ sql, '' ]
elsif sql.is_a?( Array )
[ sql.shift, sql.join( ' ' ) ]
end

sql2insert = base_sql + values.join( ',' ) + post_sql
ids = select_values( sql2insert, *args )

[number_of_inserts,ids]
end

def next_value_for_sequence(sequence_name)
%{nextval('#{sequence_name}')}
end

def post_sql_statements( table_name, options ) # :nodoc:
unless options[:primary_key].blank?
super(table_name, options) << (" RETURNING #{options[:primary_key]}")
else
super(table_name, options)
end
end

def support_setting_primary_key_of_imported_objects?
true
end
end
2 changes: 1 addition & 1 deletion lib/activerecord-import/adapters/sqlite3_adapter.rb
Expand Up @@ -34,7 +34,7 @@ def insert_many(sql, values, *args) # :nodoc:
insert( sql2insert, *args )
end

number_of_inserts
[number_of_inserts,[]]
end

def next_value_for_sequence(sequence_name)
Expand Down
115 changes: 95 additions & 20 deletions lib/activerecord-import/import.rb
Expand Up @@ -3,7 +3,7 @@
module ActiveRecord::Import::ConnectionAdapters ; end

module ActiveRecord::Import #:nodoc:
class Result < Struct.new(:failed_instances, :num_inserts)
class Result < Struct.new(:failed_instances, :num_inserts, :ids)
end

module ImportSupport #:nodoc:
Expand Down Expand Up @@ -68,7 +68,7 @@ def import(*args, &block)

# supports empty array
elsif args.last.is_a?( Array ) and args.last.empty?
return ActiveRecord::Import::Result.new([], 0) if args.last.empty?
return ActiveRecord::Import::Result.new([], 0, []) if args.last.empty?

# supports 2-element array and array
elsif args.size == 2 and args.first.is_a?( Array ) and args.last.is_a?( Array )
Expand Down Expand Up @@ -109,18 +109,21 @@ class << self
# Returns true if the current database connection adapter
# supports import functionality, otherwise returns false.
def supports_import?(*args)
connection.supports_import?(*args)
rescue NoMethodError
false
connection.respond_to?(:supports_import?) && connection.supports_import?(*args)
end

# Returns true if the current database connection adapter
# supports on duplicate key update functionality, otherwise
# returns false.
def supports_on_duplicate_key_update?
connection.supports_on_duplicate_key_update?
rescue NoMethodError
false
connection.respond_to?(:supports_on_duplicate_key_update?) && connection.supports_on_duplicate_key_update?
end

# returns true if the current database connection adapter
# supports setting the primary key of bulk imported models, otherwise
# returns false
def support_setting_primary_key_of_imported_objects?
connection.respond_to?(:support_setting_primary_key_of_imported_objects?) && connection.support_setting_primary_key_of_imported_objects?
end

# Imports a collection of values to the database.
Expand Down Expand Up @@ -172,6 +175,9 @@ def supports_on_duplicate_key_update?
# existing model instances in memory with updates from the import.
# * +timestamps+ - true|false, tells import to not add timestamps \
# (if false) even if record timestamps is disabled in ActiveRecord::Base
# * +recursive - true|false, tells import to import all autosave association
# if the adapter supports setting the primary keys of the newly imported
# objects.
#
# == Examples
# class BlogPost < ActiveRecord::Base ; end
Expand Down Expand Up @@ -230,11 +236,24 @@ def supports_on_duplicate_key_update?
# This returns an object which responds to +failed_instances+ and +num_inserts+.
# * failed_instances - an array of objects that fails validation and were not committed to the database. An empty array if no validation is performed.
# * num_inserts - the number of insert statements it took to import the data
def import( *args )
options = { :validate=>true, :timestamps=>true }
# * ids - the priamry keys of the imported ids, if the adpater supports it, otherwise and empty array.
def import(*args)
if args.first.is_a?( Array ) and args.first.first.is_a? ActiveRecord::Base
options = {}
options.merge!( args.pop ) if args.last.is_a?(Hash)

models = args.first
import_helper(models, options)
else
import_helper(*args)
end
end

def import_helper( *args )
options = { :validate=>true, :timestamps=>true, :primary_key=>primary_key }
options.merge!( args.pop ) if args.last.is_a? Hash

is_validating = options.delete( :validate )
is_validating = options[:validate]
is_validating = true unless options[:validate_with_context].nil?

# assume array of model objects
Expand All @@ -257,7 +276,7 @@ def import( *args )
end
# supports empty array
elsif args.last.is_a?( Array ) and args.last.empty?
return ActiveRecord::Import::Result.new([], 0) if args.last.empty?
return ActiveRecord::Import::Result.new([], 0, []) if args.last.empty?
# supports 2-element array and array
elsif args.size == 2 and args.first.is_a?( Array ) and args.last.is_a?( Array )
column_names, array_of_attributes = args
Expand All @@ -284,16 +303,26 @@ def import( *args )
return_obj = if is_validating
import_with_validations( column_names, array_of_attributes, options )
else
num_inserts = import_without_validations_or_callbacks( column_names, array_of_attributes, options )
ActiveRecord::Import::Result.new([], num_inserts)
(num_inserts, ids) = import_without_validations_or_callbacks( column_names, array_of_attributes, options )
ActiveRecord::Import::Result.new([], num_inserts, ids)
end

if options[:synchronize]
sync_keys = options[:synchronize_keys] || [self.primary_key]
synchronize( options[:synchronize], sync_keys)
end

return_obj.num_inserts = 0 if return_obj.num_inserts.nil?

# if we have ids, then set the id on the models and mark the models as clean.
if support_setting_primary_key_of_imported_objects?
set_ids_and_mark_clean(models, return_obj)

# if there are auto-save associations on the models we imported that are new, import them as well
if options[:recursive]
import_associations(models, options)
end
end

return_obj
end

Expand Down Expand Up @@ -327,12 +356,12 @@ def import_with_validations( column_names, array_of_attributes, options={} )
end
array_of_attributes.compact!

num_inserts = if array_of_attributes.empty? || options[:all_or_none] && failed_instances.any?
0
(num_inserts, ids) = if array_of_attributes.empty? || options[:all_or_none] && failed_instances.any?
[0,[]]
else
import_without_validations_or_callbacks( column_names, array_of_attributes, options )
end
ActiveRecord::Import::Result.new(failed_instances, num_inserts)
ActiveRecord::Import::Result.new(failed_instances, num_inserts, ids)
end

# Imports the passed in +column_names+ and +array_of_attributes+
Expand Down Expand Up @@ -364,6 +393,7 @@ def import_without_validations_or_callbacks( column_names, array_of_attributes,
columns_sql = "(#{column_names.map{|name| connection.quote_column_name(name) }.join(',')})"
insert_sql = "INSERT #{options[:ignore] ? 'IGNORE ':''}INTO #{quoted_table_name} #{columns_sql} VALUES "
values_sql = values_sql_for_columns_and_attributes(columns, array_of_attributes)
ids = []
if not supports_import?
number_inserted = 0
values_sql.each do |values|
Expand All @@ -375,15 +405,60 @@ def import_without_validations_or_callbacks( column_names, array_of_attributes,
post_sql_statements = connection.post_sql_statements( quoted_table_name, options )

# perform the inserts
number_inserted = connection.insert_many( [ insert_sql, post_sql_statements ].flatten,
(number_inserted,ids) = connection.insert_many( [ insert_sql, post_sql_statements ].flatten,
values_sql,
"#{self.class.name} Create Many Without Validations Or Callbacks" )
end
number_inserted
[number_inserted, ids]
end

private

def set_ids_and_mark_clean(models, import_result)
unless models.nil?
import_result.ids.each_with_index do |id, index|
models[index].id = id.to_i
models[index].instance_variable_get(:@changed_attributes).clear # mark the model as saved
end
end
end

def import_associations(models, options)
# now, for all the dirty associations, collect them into a new set of models, then recurse.
# notes:
# does not handle associations that reference themselves
# assumes that the only associations to be saved are marked with :autosave
# should probably take a hash to associations to follow.
associated_objects_by_class={}
models.each {|model| find_associated_objects_for_import(associated_objects_by_class, model) }

associated_objects_by_class.each_pair do |class_name, associations|
associations.each_pair do |association_name, associated_records|
associated_records.first.class.import(associated_records, options) unless associated_records.empty?
end
end
end

# We are eventually going to call Class.import <objects> so we build up a hash
# of class => objects to import.
def find_associated_objects_for_import(associated_objects_by_class, model)
associated_objects_by_class[model.class.name]||={}

model.class.reflect_on_all_autosave_associations.each do |association_reflection|
associated_objects_by_class[model.class.name][association_reflection.name]||=[]

association = model.association(association_reflection.name)
association.loaded!

changed_objects = association.select {|a| a.new_record? || a.changed?}
changed_objects.each do |child|
child.send("#{association_reflection.foreign_key}=", model.id)
end
associated_objects_by_class[model.class.name][association_reflection.name].concat changed_objects
end
associated_objects_by_class
end

# Returns SQL the VALUES for an INSERT statement given the passed in +columns+
# and +array_of_attributes+.
def values_sql_for_columns_and_attributes(columns, array_of_attributes) # :nodoc:
Expand Down
4 changes: 3 additions & 1 deletion test/models/book.rb
@@ -1,3 +1,5 @@
class Book < ActiveRecord::Base
belongs_to :topic
belongs_to :topic, :inverse_of=>:books
has_many :chapters, :autosave => true, :inverse_of => :book
has_many :end_notes, :autosave => true, :inverse_of => :book
end
4 changes: 4 additions & 0 deletions test/models/chapter.rb
@@ -0,0 +1,4 @@
class Chapter < ActiveRecord::Base
belongs_to :book, :inverse_of=>:chapters
validates :title, :presence => true
end
4 changes: 4 additions & 0 deletions test/models/end_note.rb
@@ -0,0 +1,4 @@
class EndNote < ActiveRecord::Base
belongs_to :book, :inverse_of=>:end_notes
validates :note, :presence => true
end
2 changes: 1 addition & 1 deletion test/models/topic.rb
Expand Up @@ -2,7 +2,7 @@ class Topic < ActiveRecord::Base
validates_presence_of :author_name
validates :title, numericality: { only_integer: true }, on: :context_test

has_many :books
has_many :books, :autosave=>true, :inverse_of=>:topic
belongs_to :parent, :class_name => "Topic"

composed_of :description, :mapping => [ %w(title title), %w(author_name author_name)], :allow_nil => true, :class_name => "TopicDescription"
Expand Down
2 changes: 1 addition & 1 deletion test/postgresql/import_test.rb
@@ -1,4 +1,4 @@
require File.expand_path(File.dirname(__FILE__) + '/../test_helper')
require File.expand_path(File.dirname(__FILE__) + '/../support/postgresql/import_examples')

should_support_postgresql_import_functionality
should_support_postgresql_import_functionality
15 changes: 15 additions & 0 deletions test/schema/generic_schema.rb
Expand Up @@ -66,6 +66,21 @@
t.column :for_sale, :boolean, :default => true
end

create_table :chapters, :force => true do |t|
t.column :title, :string
t.column :book_id, :integer, :null => false
t.column :created_at, :datetime
t.column :updated_at, :datetime
end

create_table :end_notes, :force => true do |t|
t.column :note, :string
t.column :book_id, :integer, :null => false
t.column :created_at, :datetime
t.column :updated_at, :datetime
end


create_table :languages, :force=>true do |t|
t.column :name, :string
t.column :developer_id, :integer
Expand Down
20 changes: 20 additions & 0 deletions test/support/factories.rb
@@ -1,4 +1,8 @@
FactoryGirl.define do
sequence(:book_title) {|n| "Book #{n}"}
sequence(:chapter_title) {|n| "Chapter #{n}"}
sequence(:end_note) {|n| "Endnote #{n}"}

factory :group do
sequence(:order) { |n| "Order #{n}" }
end
Expand All @@ -16,4 +20,20 @@
factory :widget do
sequence(:w_id){ |n| n}
end

factory :topic_with_book, :parent=>:topic do |m|
after(:build) do |topic|
2.times do
book = topic.books.build(:title=>FactoryGirl.generate(:book_title), :author_name=>'Stephen King')
3.times do
book.chapters.build(:title => FactoryGirl.generate(:chapter_title))
end

4.times do
book.end_notes.build(:note => FactoryGirl.generate(:end_note))
end
end
end
end

end