/
batchable.rb
397 lines (373 loc) · 12.6 KB
/
batchable.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
# frozen_string_literal: true
# rubocop:todo all
module Mongoid
module Association
module Embedded
# Contains behavior for executing operations in batch on embedded
# documents.
module Batchable
include Positional
# Insert new documents as a batch push ($push with $each). This ensures that
# all callbacks are run at the appropriate time and only 1 request is
# made to the database.
#
# @example Execute the batch push.
# batchable.batch_insert([ doc_one, doc_two ])
#
# @param [ Array<Document> ] docs The docs to add.
#
# @return [ Array<Hash> ] The inserts.
def batch_insert(docs)
execute_batch_push(docs)
end
# Clear all of the docs out of the association in a single swipe.
#
# @example Clear all docs.
# batchable.batch_clear(docs)
#
# @param [ Array<Document> ] docs The docs to clear.
#
# @return [ Array ] The empty array.
def batch_clear(docs)
pre_process_batch_remove(docs, :delete)
unless docs.empty?
collection.find(selector).update_one(
positionally(selector, "$unset" => { path => true }),
session: _session
)
# This solves the case in which a user sets, clears and resets an
# embedded document. Previously, since the embedded document was
# already marked not a "new_record", it wouldn't be persisted to
# the second time. This change fixes that and allows it to be persisted.
docs.each { |doc| doc.new_record = true }
post_process_batch_remove(docs, :delete)
end
_unscoped.clear
end
# Batch remove the provided documents as a $pullAll or $pull.
#
# @example Batch remove the documents.
# batchable.batch_remove([ doc_one, doc_two ])
#
# @param [ Array<Document> ] docs The docs to remove.
# @param [ Symbol ] method Delete or destroy.
def batch_remove(docs, method = :delete)
# If the _id is nil, we cannot use $pull and delete by searching for
# the id. Therefore we have to use pullAll with the documents'
# attributes.
removals = pre_process_batch_remove(docs, method)
pulls, pull_alls = removals.partition { |o| !o["_id"].nil? }
if !_base.persisted?
post_process_batch_remove(docs, method) unless docs.empty?
return reindex
end
if !docs.empty?
if !pulls.empty?
collection.find(selector).update_one(
positionally(selector, "$pull" => { path => { "_id" => { "$in" => pulls.pluck("_id") } } }),
session: _session
)
end
if !pull_alls.empty?
collection.find(selector).update_one(
positionally(selector, "$pullAll" => { path => pull_alls }),
session: _session
)
end
post_process_batch_remove(docs, method)
else
collection.find(selector).update_one(
positionally(selector, "$set" => { path => [] }),
session: _session
)
end
reindex
end
# Batch replace the provided documents as a $set.
#
# @example Batch replace the documents.
# batchable.batch_replace([ doc_one, doc_two ])
#
# @param [ Array<Document> | Array<Hash> ] docs The docs to replace with.
#
# @return [ Array<Hash> ] The inserts.
def batch_replace(docs)
if docs.blank?
if _assigning? && !empty?
_base.delayed_atomic_sets.delete(path)
clear_atomic_path_cache
_base.add_atomic_unset(first)
target_duplicate = _target.dup
pre_process_batch_remove(target_duplicate, :delete)
post_process_batch_remove(target_duplicate, :delete)
else
batch_remove(_target.dup)
end
elsif _target != docs
_base.delayed_atomic_sets.delete(path) unless _assigning?
docs = normalize_docs(docs).compact
_target.clear and _unscoped.clear
_base.delayed_atomic_unsets.delete(path)
clear_atomic_path_cache
inserts = execute_batch_set(docs)
add_atomic_sets(inserts)
end
end
private
# Add the atomic sets to the base document.
#
# @api private
#
# @example Add the atomic sets.
# batchable.add_atomic_sets([{ field: value }])
#
# @param [ Array<Hash> ] sets The atomic sets.
def add_atomic_sets(sets)
if _assigning?
_base.delayed_atomic_sets[path].try(:clear)
_base.collect_children.each do |child|
child.delayed_atomic_sets.clear
end
_base.delayed_atomic_sets[path] = sets
end
end
# Perform a batch persist of the provided documents with a $set.
#
# @api private
#
# @example Perform a batch $set.
# batchable.execute_batch_set(docs)
#
# @param [ Array<Document> ] docs The docs to persist.
#
# @return [ Array<Hash> ] The inserts.
def execute_batch_set(docs)
self.inserts_valid = true
inserts = pre_process_batch_insert(docs)
if insertable?
collection.find(selector).update_one(
positionally(selector, '$set' => { path => inserts }),
session: _session
)
post_process_batch_insert(docs)
end
inserts
end
# Perform a batch persist of the provided documents with $push and $each.
#
# @api private
#
# @example Perform a batch push.
# batchable.execute_batch_push(docs)
#
# @param [ Array<Document> ] docs The docs to persist.
#
# @return [ Array<Hash> ] The inserts.
def execute_batch_push(docs)
self.inserts_valid = true
pushes = pre_process_batch_insert(docs)
if insertable?
collection.find(selector).update_one(
positionally(selector, '$push' => { path => { '$each' => pushes } }),
session: _session
)
post_process_batch_insert(docs)
end
pushes
end
# Are we in a state to be able to batch insert?
#
# @api private
#
# @example Can inserts be performed?
# batchable.insertable?
#
# @return [ true | false ] If inserts can be performed.
def insertable?
persistable? && !_assigning? && inserts_valid
end
# Are the inserts currently valid?
#
# @api private
#
# @example Are the inserts currently valid.
# batchable.inserts_valid
#
# @return [ true | false ] If inserts are currently valid.
def inserts_valid
@inserts_valid
end
# Set the inserts valid flag.
#
# @api private
#
# @example Set the flag.
# batchable.inserts_valid = true
#
# @param [ true | false ] value The flag.
#
# @return [ true | false ] The flag.
def inserts_valid=(value)
@inserts_valid = value
end
# Normalize the documents, in case they were provided as an array of
# hashes.
#
# @api private
#
# @example Normalize the docs.
# batchable.normalize_docs(docs)
#
# @param [ Array<Document> | Array<Hash> ] docs The docs to normalize.
#
# @return [ Array<Document> ] The docs.
def normalize_docs(docs)
if docs.first.is_a?(::Hash)
docs.map do |doc|
attributes = { _association: _association, _parent: _base }
attributes.merge!(doc)
Factory.build(klass, attributes)
end
else
docs
end
end
# Get the atomic path.
#
# @api private
#
# @example Get the atomic path.
# batchable.path
#
# @return [ String ] The atomic path.
def path
@path ||= if _unscoped.empty?
Mongoid::Atomic::Paths::Embedded::Many.position_without_document(_base, _association)
else
_unscoped.first.atomic_path
end
end
# Clear the cache for path and atomic_paths. This method is used when
# the path method is used, and the association has not been set on the
# document yet, which can cause path and atomic_paths to be calculated
# incorrectly later.
#
# @api private
def clear_atomic_path_cache
self.path = nil
_base.instance_variable_set("@atomic_paths", nil)
end
# Set the atomic path.
#
# @api private
#
# @example Set the atomic path.
# batchable.path = "addresses"
#
# @param [ String ] value The path.
#
# @return [ String ] The path.
def path=(value)
@path = value
end
# Get the selector for executing atomic operations on the collection.
#
# @api private
#
# @example Get the selector.
# batchable.selector
#
# @return [ Hash ] The atomic selector.
def selector
@selector ||= _base.atomic_selector
end
# Pre processes the batch insert for the provided documents.
#
# @api private
#
# @example Pre process the documents.
# batchable.pre_process_batch_insert(docs)
#
# @param [ Array<Document> ] docs The documents.
#
# @return [ Array<Hash> ] The documents as an array of hashes.
def pre_process_batch_insert(docs)
docs.map do |doc|
next unless doc
append(doc)
if persistable? && !_assigning?
self.path = doc.atomic_path unless path
if doc.valid?(:create)
doc.run_before_callbacks(:save, :create)
else
self.inserts_valid = false
end
end
doc.send(:as_attributes)
end
end
# Pre process the batch removal.
#
# @api private
#
# @example Pre process the documents.
# batchable.pre_process_batch_remove(docs, :delete)
#
# @param [ Array<Document> ] docs The documents.
# @param [ Symbol ] method Delete or destroy.
#
# @return [ Array<Hash> ] The documents as hashes.
def pre_process_batch_remove(docs, method)
docs.map do |doc|
self.path = doc.atomic_path unless path
execute_callback :before_remove, doc
unless _assigning?
doc.apply_destroy_dependencies!
doc.run_before_callbacks(:destroy) if method == :destroy
end
_target.delete_one(doc)
_unscoped.delete_one(doc)
unbind_one(doc)
execute_callback :after_remove, doc
doc.send(:as_attributes)
end
end
# Post process the documents after batch insert.
#
# @api private
#
# @example Post process the documents.
# batchable.post_process_batch_insert(docs)
#
# @param [ Array<Documents> ] docs The inserted docs.
#
# @return [ Enumerable ] The document enum.
def post_process_batch_insert(docs)
docs.each do |doc|
doc.new_record = false
doc.run_after_callbacks(:create, :save)
doc.post_persist
end
end
# Post process the batch removal.
#
# @api private
#
# @example Post process the documents.
# batchable.post_process_batch_remove(docs, :delete)
#
# @param [ Array<Document> ] docs The documents.
# @param [ Symbol ] method Delete or destroy.
#
# @return [ Array<Document> ] The documents.
def post_process_batch_remove(docs, method)
docs.each do |doc|
doc.run_after_callbacks(:destroy) if method == :destroy
doc.freeze
doc.destroyed = true
end
end
end
end
end
end