-
Notifications
You must be signed in to change notification settings - Fork 1.4k
/
source.rb
164 lines (145 loc) · 5.88 KB
/
source.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# frozen_string_literal: true
module GraphQL
class Dataloader
class Source
# Called by {Dataloader} to prepare the {Source}'s internal state
# @api private
def setup(dataloader)
# These keys have been requested but haven't been fetched yet
@pending_keys = []
# These keys have been passed to `fetch` but haven't been finished yet
@fetching_keys = []
# { key => result }
@results = {}
@dataloader = dataloader
end
attr_reader :dataloader
# @return [Dataloader::Request] a pending request for a value from `key`. Call `.load` on that object to wait for the result.
def request(key)
if !@results.key?(key)
@pending_keys << key
end
Dataloader::Request.new(self, key)
end
# @return [Dataloader::Request] a pending request for a values from `keys`. Call `.load` on that object to wait for the results.
def request_all(keys)
pending_keys = keys.select { |k| !@results.key?(k) }
@pending_keys.concat(pending_keys)
Dataloader::RequestAll.new(self, keys)
end
# @param key [Object] A loading key which will be passed to {#fetch} if it isn't already in the internal cache.
# @return [Object] The result from {#fetch} for `key`. If `key` hasn't been loaded yet, the Fiber will yield until it's loaded.
def load(key)
if @results.key?(key)
result_for(key)
else
@pending_keys << key
sync
result_for(key)
end
end
# @param keys [Array<Object>] Loading keys which will be passed to `#fetch` (or read from the internal cache).
# @return [Object] The result from {#fetch} for `keys`. If `keys` haven't been loaded yet, the Fiber will yield until they're loaded.
def load_all(keys)
if keys.any? { |k| !@results.key?(k) }
pending_keys = keys.select { |k| !@results.key?(k) }
@pending_keys.concat(pending_keys)
sync
end
keys.map { |k| result_for(k) }
end
# Subclasses must implement this method to return a value for each of `keys`
# @param keys [Array<Object>] keys passed to {#load}, {#load_all}, {#request}, or {#request_all}
# @return [Array<Object>] A loaded value for each of `keys`. The array must match one-for-one to the list of `keys`.
def fetch(keys)
# somehow retrieve these from the backend
raise "Implement `#{self.class}#fetch(#{keys.inspect}) to return a record for each of the keys"
end
# Wait for a batch, if there's anything to batch.
# Then run the batch and update the cache.
# @return [void]
def sync
pending_keys = @pending_keys.dup
@dataloader.yield
iterations = 0
while pending_keys.any? { |k| !@results.key?(k) }
iterations += 1
if iterations > 1000
raise "#{self.class}#sync tried 1000 times to load pending keys (#{pending_keys}), but they still weren't loaded. There is likely a circular dependency."
end
@dataloader.yield
end
nil
end
# @return [Boolean] True if this source has any pending requests for data.
def pending?
!@pending_keys.empty?
end
# Add these key-value pairs to this source's cache
# (future loads will use these merged values).
# @param results [Hash<Object => Object>] key-value pairs to cache in this source
# @return [void]
def merge(results)
@results.merge!(results)
nil
end
# Called by {GraphQL::Dataloader} to resolve and pending requests to this source.
# @api private
# @return [void]
def run_pending_keys
if !@fetching_keys.empty?
@pending_keys -= @fetching_keys
end
return if @pending_keys.empty?
fetch_keys = @pending_keys.uniq
@fetching_keys.concat(fetch_keys)
@pending_keys = []
results = fetch(fetch_keys)
fetch_keys.each_with_index do |key, idx|
@results[key] = results[idx]
end
nil
rescue StandardError => error
fetch_keys.each { |key| @results[key] = error }
ensure
if fetch_keys
@fetching_keys -= fetch_keys
end
end
# These arguments are given to `dataloader.with(source_class, ...)`. The object
# returned from this method is used to de-duplicate batch loads under the hood
# by using it as a Hash key.
#
# By default, the arguments are all put in an Array. To customize how this source's
# batches are merged, override this method to return something else.
#
# For example, if you pass `ActiveRecord::Relation`s to `.with(...)`, you could override
# this method to call `.to_sql` on them, thus merging `.load(...)` calls when they apply
# to equivalent relations.
#
# @param batch_args [Array<Object>]
# @param batch_kwargs [Hash]
# @return [Object]
def self.batch_key_for(*batch_args, **batch_kwargs)
[*batch_args, **batch_kwargs]
end
attr_reader :pending_keys
private
# Reads and returns the result for the key from the internal cache, or raises an error if the result was an error
# @param key [Object] key passed to {#load} or {#load_all}
# @return [Object] The result from {#fetch} for `key`.
# @api private
def result_for(key)
if !@results.key?(key)
raise GraphQL::InvariantError, <<-ERR
Fetching result for a key on #{self.class} that hasn't been loaded yet (#{key.inspect}, loaded: #{@results.keys})
This key should have been loaded already. This is a bug in GraphQL::Dataloader, please report it on GitHub: https://github.com/rmosolgo/graphql-ruby/issues/new.
ERR
end
result = @results[key]
raise result if result.class <= StandardError
result
end
end
end
end