@@ -64,6 +64,7 @@ def initialize(nonblocking: self.class.default_nonblocking, fiber_limit: self.cl
64
64
@nonblocking = nonblocking
65
65
end
66
66
@fiber_limit = fiber_limit
67
+ @lazies_at_depth = Hash . new { |h , k | h [ k ] = [ ] }
67
68
end
68
69
69
70
# @return [Integer, nil]
@@ -140,10 +141,10 @@ def yield(source = Fiber[:__graphql_current_dataloader_source])
140
141
end
141
142
142
143
# @api private Nothing to see here
143
- def append_job ( &job )
144
+ def append_job ( callable = nil , &job )
144
145
# Given a block, queue it up to be worked through when `#run` is called.
145
- # (If the dataloader is already running, than a Fiber will pick this up later.)
146
- @pending_jobs . push ( job )
146
+ # (If the dataloader is already running, then a Fiber will pick this up later.)
147
+ @pending_jobs . push ( callable || job )
147
148
nil
148
149
end
149
150
@@ -160,6 +161,10 @@ def clear_cache
160
161
def run_isolated
161
162
prev_queue = @pending_jobs
162
163
prev_pending_keys = { }
164
+ prev_lazies_at_depth = @lazies_at_depth
165
+ @lazies_at_depth = @lazies_at_depth . dup . clear
166
+ # Clear pending loads but keep already-cached records
167
+ # in case they are useful to the given block.
163
168
@source_cache . each do |source_class , batched_sources |
164
169
batched_sources . each do |batch_args , batched_source_instance |
165
170
if batched_source_instance . pending?
@@ -179,6 +184,7 @@ def run_isolated
179
184
res
180
185
ensure
181
186
@pending_jobs = prev_queue
187
+ @lazies_at_depth = prev_lazies_at_depth
182
188
prev_pending_keys . each do |source_instance , pending |
183
189
pending . each do |key , value |
184
190
if !source_instance . results . key? ( key )
@@ -188,7 +194,8 @@ def run_isolated
188
194
end
189
195
end
190
196
191
- def run
197
+ # @param trace_query_lazy [nil, Execution::Multiplex]
198
+ def run ( trace_query_lazy : nil )
192
199
trace = Fiber [ :__graphql_current_multiplex ] &.current_trace
193
200
jobs_fiber_limit , total_fiber_limit = calculate_fiber_limit
194
201
job_fibers = [ ]
@@ -201,26 +208,13 @@ def run
201
208
while first_pass || !job_fibers . empty?
202
209
first_pass = false
203
210
204
- while ( f = ( job_fibers . shift || ( ( ( next_job_fibers . size + job_fibers . size ) < jobs_fiber_limit ) && spawn_job_fiber ( trace ) ) ) )
205
- if f . alive?
206
- finished = run_fiber ( f )
207
- if !finished
208
- next_job_fibers << f
209
- end
210
- end
211
- end
212
- join_queues ( job_fibers , next_job_fibers )
213
-
214
- while ( !source_fibers . empty? || @source_cache . each_value . any? { |group_sources | group_sources . each_value . any? ( &:pending? ) } )
215
- while ( f = source_fibers . shift || ( ( ( job_fibers . size + source_fibers . size + next_source_fibers . size + next_job_fibers . size ) < total_fiber_limit ) && spawn_source_fiber ( trace ) ) )
216
- if f . alive?
217
- finished = run_fiber ( f )
218
- if !finished
219
- next_source_fibers << f
220
- end
221
- end
211
+ run_pending_steps ( trace , job_fibers , next_job_fibers , jobs_fiber_limit , source_fibers , next_source_fibers , total_fiber_limit )
212
+
213
+ if !@lazies_at_depth . empty?
214
+ with_trace_query_lazy ( trace_query_lazy ) do
215
+ run_next_pending_lazies ( job_fibers , trace )
216
+ run_pending_steps ( trace , job_fibers , next_job_fibers , jobs_fiber_limit , source_fibers , next_source_fibers , total_fiber_limit )
222
217
end
223
- join_queues ( source_fibers , next_source_fibers )
224
218
end
225
219
end
226
220
@@ -248,6 +242,11 @@ def run_fiber(f)
248
242
f . resume
249
243
end
250
244
245
+ # @api private
246
+ def lazy_at_depth ( depth , lazy )
247
+ @lazies_at_depth [ depth ] << lazy
248
+ end
249
+
251
250
def spawn_fiber
252
251
fiber_vars = get_fiber_variables
253
252
Fiber . new ( blocking : !@nonblocking ) {
@@ -275,6 +274,59 @@ def merge_records(records, index_by: :id)
275
274
276
275
private
277
276
277
+ def run_next_pending_lazies ( job_fibers , trace )
278
+ smallest_depth = nil
279
+ @lazies_at_depth . each_key do |depth_key |
280
+ smallest_depth ||= depth_key
281
+ if depth_key < smallest_depth
282
+ smallest_depth = depth_key
283
+ end
284
+ end
285
+
286
+ if smallest_depth
287
+ lazies = @lazies_at_depth . delete ( smallest_depth )
288
+ if !lazies . empty?
289
+ lazies . each_with_index do |l , idx |
290
+ append_job { l . value }
291
+ end
292
+ job_fibers . unshift ( spawn_job_fiber ( trace ) )
293
+ end
294
+ end
295
+ end
296
+
297
+ def run_pending_steps ( trace , job_fibers , next_job_fibers , jobs_fiber_limit , source_fibers , next_source_fibers , total_fiber_limit )
298
+ while ( f = ( job_fibers . shift || ( ( ( next_job_fibers . size + job_fibers . size ) < jobs_fiber_limit ) && spawn_job_fiber ( trace ) ) ) )
299
+ if f . alive?
300
+ finished = run_fiber ( f )
301
+ if !finished
302
+ next_job_fibers << f
303
+ end
304
+ end
305
+ end
306
+ join_queues ( job_fibers , next_job_fibers )
307
+
308
+ while ( !source_fibers . empty? || @source_cache . each_value . any? { |group_sources | group_sources . each_value . any? ( &:pending? ) } )
309
+ while ( f = source_fibers . shift || ( ( ( job_fibers . size + source_fibers . size + next_source_fibers . size + next_job_fibers . size ) < total_fiber_limit ) && spawn_source_fiber ( trace ) ) )
310
+ if f . alive?
311
+ finished = run_fiber ( f )
312
+ if !finished
313
+ next_source_fibers << f
314
+ end
315
+ end
316
+ end
317
+ join_queues ( source_fibers , next_source_fibers )
318
+ end
319
+ end
320
+
321
+ def with_trace_query_lazy ( multiplex_or_nil , &block )
322
+ if ( multiplex = multiplex_or_nil )
323
+ query = multiplex . queries . length == 1 ? multiplex . queries [ 0 ] : nil
324
+ multiplex . current_trace . execute_query_lazy ( query : query , multiplex : multiplex , &block )
325
+ else
326
+ yield
327
+ end
328
+ end
329
+
278
330
def calculate_fiber_limit
279
331
total_fiber_limit = @fiber_limit || Float ::INFINITY
280
332
if total_fiber_limit < 4
0 commit comments