/
insert_all.rb
212 lines (158 loc) · 6.47 KB
/
insert_all.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
# frozen_string_literal: true
require "active_support/core_ext/enumerable"
module ActiveRecord
class InsertAll # :nodoc:
attr_reader :model, :connection, :inserts, :keys
attr_reader :on_duplicate, :returning, :unique_by
def initialize(model, inserts, on_duplicate:, returning: nil, unique_by: nil)
raise ArgumentError, "Empty list of attributes passed" if inserts.blank?
@model, @connection, @inserts, @keys = model, model.connection, inserts, inserts.first.keys.map(&:to_s)
@on_duplicate, @returning, @unique_by = on_duplicate, returning, unique_by
if model.scope_attributes?
@scope_attributes = model.scope_attributes
@keys |= @scope_attributes.keys
end
@keys = @keys.to_set
@returning = (connection.supports_insert_returning? ? primary_keys : false) if @returning.nil?
@returning = false if @returning == []
@unique_by = find_unique_index_for(unique_by)
@on_duplicate = :skip if @on_duplicate == :update && updatable_columns.empty?
ensure_valid_options_for_connection!
end
def execute
message = +"#{model} "
message << "Bulk " if inserts.many?
message << (on_duplicate == :update ? "Upsert" : "Insert")
connection.exec_insert_all to_sql, message
end
def updatable_columns
keys - readonly_columns - unique_by_columns
end
def primary_keys
Array(connection.schema_cache.primary_keys(model.table_name))
end
def skip_duplicates?
on_duplicate == :skip
end
def update_duplicates?
on_duplicate == :update
end
def map_key_with_value
inserts.map do |attributes|
attributes = attributes.stringify_keys
attributes.merge!(scope_attributes) if scope_attributes
verify_attributes(attributes)
keys.map do |key|
yield key, attributes[key]
end
end
end
private
attr_reader :scope_attributes
def find_unique_index_for(unique_by)
if !connection.supports_insert_conflict_target?
return if unique_by.nil?
raise ArgumentError, "#{connection.class} does not support :unique_by"
end
name_or_columns = unique_by || model.primary_key
match = Array(name_or_columns).map(&:to_s)
if index = unique_indexes.find { |i| match.include?(i.name) || i.columns == match }
index
elsif match == primary_keys
unique_by.nil? ? nil : ActiveRecord::ConnectionAdapters::IndexDefinition.new(model.table_name, "#{model.table_name}_primary_key", true, match)
else
raise ArgumentError, "No unique index found for #{name_or_columns}"
end
end
def unique_indexes
connection.schema_cache.indexes(model.table_name).select(&:unique)
end
def ensure_valid_options_for_connection!
if returning && !connection.supports_insert_returning?
raise ArgumentError, "#{connection.class} does not support :returning"
end
if skip_duplicates? && !connection.supports_insert_on_duplicate_skip?
raise ArgumentError, "#{connection.class} does not support skipping duplicates"
end
if update_duplicates? && !connection.supports_insert_on_duplicate_update?
raise ArgumentError, "#{connection.class} does not support upsert"
end
if unique_by && !connection.supports_insert_conflict_target?
raise ArgumentError, "#{connection.class} does not support :unique_by"
end
end
def to_sql
connection.build_insert_sql(ActiveRecord::InsertAll::Builder.new(self))
end
def readonly_columns
primary_keys + model.readonly_attributes.to_a
end
def unique_by_columns
Array(unique_by&.columns)
end
def verify_attributes(attributes)
if keys != attributes.keys.to_set
raise ArgumentError, "All objects being inserted must have the same keys"
end
end
class Builder # :nodoc:
attr_reader :model
delegate :skip_duplicates?, :update_duplicates?, :keys, to: :insert_all
def initialize(insert_all)
@insert_all, @model, @connection = insert_all, insert_all.model, insert_all.connection
end
def into
"INTO #{model.quoted_table_name} (#{columns_list})"
end
def values_list
types = extract_types_from_columns_on(model.table_name, keys: keys)
values_list = insert_all.map_key_with_value do |key, value|
connection.with_yaml_fallback(types[key].serialize(value))
end
connection.visitor.compile(Arel::Nodes::ValuesList.new(values_list))
end
def returning
format_columns(insert_all.returning) if insert_all.returning
end
def conflict_target
if index = insert_all.unique_by
sql = +"(#{format_columns(index.columns)})"
sql << " WHERE #{index.where}" if index.where
sql
elsif update_duplicates?
"(#{format_columns(insert_all.primary_keys)})"
end
end
def updatable_columns
quote_columns(insert_all.updatable_columns)
end
def touch_model_timestamps_unless(&block)
model.send(:timestamp_attributes_for_update_in_model).map do |column_name|
if touch_timestamp_attribute?(column_name)
"#{column_name}=(CASE WHEN (#{updatable_columns.map(&block).join(" AND ")}) THEN #{model.quoted_table_name}.#{column_name} ELSE CURRENT_TIMESTAMP END),"
end
end.compact.join
end
private
attr_reader :connection, :insert_all
def touch_timestamp_attribute?(column_name)
update_duplicates? && !insert_all.updatable_columns.include?(column_name)
end
def columns_list
format_columns(insert_all.keys)
end
def extract_types_from_columns_on(table_name, keys:)
columns = connection.schema_cache.columns_hash(table_name)
unknown_column = (keys - columns.keys).first
raise UnknownAttributeError.new(model.new, unknown_column) if unknown_column
keys.index_with { |key| model.type_for_attribute(key) }
end
def format_columns(columns)
columns.respond_to?(:map) ? quote_columns(columns).join(",") : columns
end
def quote_columns(columns)
columns.map(&connection.method(:quote_column_name))
end
end
end
end