/
map_reduce.rb
236 lines (214 loc) · 6.51 KB
/
map_reduce.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
# frozen_string_literal: true
# rubocop:todo all
module Mongoid
module Contextual
# Represents a mapReduce database command instruction.
class MapReduce
extend Forwardable
include Enumerable
include Command
def_delegators :results, :[]
def_delegators :entries, :==, :empty?
# Get all the counts returned by the map/reduce.
#
# @example Get the counts.
# map_reduce.counts
#
# @return [ Hash ] The counts.
def counts
results["counts"]
end
# Iterates over each of the documents in the map/reduce, excluding the
# extra information that was passed back from the database.
#
# @example Iterate over the results.
# map_reduce.each do |doc|
# p doc
# end
#
# @return [ Enumerator ] The enumerator.
def each
validate_out!
if block_given?
@map_reduce.each do |doc|
yield doc
end
else
@map_reduce.to_enum
end
end
# Get the number of documents emitted by the map/reduce.
#
# @example Get the emitted document count.
# map_reduce.emitted
#
# @return [ Integer ] The number of emitted documents.
def emitted
counts["emit"]
end
# Provide a finalize js function for the map/reduce.
#
# @example Provide a finalize function.
# map_reduce.finalize(func)
#
# @param [ String ] function The finalize function.
#
# @return [ MapReduce ] The map reduce.
def finalize(function)
@map_reduce = @map_reduce.finalize(function)
self
end
# Initialize the new map/reduce directive.
#
# @example Initialize the new map/reduce.
# MapReduce.new(criteria, map, reduce)
#
# @param [ Criteria ] criteria The Mongoid criteria.
# @param [ String ] map The map js function.
# @param [ String ] reduce The reduce js function.
def initialize(collection, criteria, map, reduce)
@collection = collection
@criteria = criteria
@map_reduce = @criteria.view.map_reduce(map, reduce)
end
# Get the number of documents that were input into the map/reduce.
#
# @example Get the count of input documents.
# map_reduce.input
#
# @return [ Integer ] The number of input documents.
def input
counts["input"]
end
# Sets the map/reduce to use jsMode.
#
# @example Set the map/reduce to jsMode.
# map_reduce.js_mode
#
# @return [ MapReduce ] The map/reduce.
def js_mode
@map_reduce = @map_reduce.js_mode(true)
self
end
# Specifies where the map/reduce output is to be stored.
# Please see MongoDB documentation for supported map reduce options.
#
# @example Store output in memory.
# map_reduce.out(inline: 1)
#
# @example Store output in a collection, replacing existing documents.
# map_reduce.out(replace: "collection_name")
#
# @example Store output in a collection, merging existing documents.
# map_reduce.out(merge: "collection_name")
#
# @example Store output in a collection, reducing existing documents.
# map_reduce.out(reduce: "collection_name")
#
# @example Return results from map reduce.
# map_reduce.out(inline: 1)
#
# @param [ Hash ] location The place to store the results.
#
# @return [ MapReduce ] The map/reduce object.
def out(location)
normalized = location.dup
normalized.transform_values! do |value|
value.is_a?(::Symbol) ? value.to_s : value
end
@map_reduce = @map_reduce.out(normalized)
self
end
# Get the number of documents output by the map/reduce.
#
# @example Get the output document count.
# map_reduce.output
#
# @return [ Integer ] The number of output documents.
def output
counts["output"]
end
# Get the raw output from the map/reduce operation.
#
# @example Get the raw output.
# map_reduce.raw
#
# @return [ Hash ] The raw output.
def raw
validate_out!
cmd = command
opts = { read: criteria.options.fetch(:read) } if criteria.options[:read]
@map_reduce.database.command(cmd, (opts || {}).merge(session: _session)).first
end
alias :results :raw
# Execute the map/reduce, returning the raw output.
# Useful when you don't care about map/reduce's output.
#
# @example Run the map reduce
# map_reduce.execute
#
# @return [ Hash ] The raw output
alias :execute :raw
# Get the number of documents reduced by the map/reduce.
#
# @example Get the reduced document count.
# map_reduce.reduced
#
# @return [ Integer ] The number of reduced documents.
def reduced
counts["reduce"]
end
# Adds a javascript object to the global scope of the map/reduce.
#
# @example Add an object to the global scope.
# map_reduce.scope(name: value)
#
# @param [ Hash ] object A hash of key/values for the global scope.
#
# @return [ MapReduce ]
def scope(object)
@map_reduce = @map_reduce.scope(object)
self
end
# Get the execution time of the map/reduce.
#
# @example Get the execution time.
# map_reduce.time
#
# @return [ Float ] The time in milliseconds.
def time
results["timeMillis"]
end
# Get a pretty string representation of the map/reduce, including the
# criteria, map, reduce, finalize, and out option.
#
# @example Inspect the map_reduce.
# map_reduce.inspect
#
# @return [ String ] The inspection string.
def inspect
%Q{#<Mongoid::Contextual::MapReduce
selector: #{criteria.selector.inspect}
class: #{criteria.klass}
map: #{command[:map]}
reduce: #{command[:reduce]}
finalize: #{command[:finalize]}
out: #{command[:out].inspect}>
}
end
# Returns the selector of the command spec.
#
# @return [ Hash ] The selector.
def command
@map_reduce.send(:map_reduce_spec)[:selector]
end
private
def validate_out!
raise Errors::NoMapReduceOutput.new({}) unless @map_reduce.out
end
def _session
criteria.send(:_session)
end
end
end
end