forked from samus/mongodb-csharp
/
MapReduce.cs
253 lines (226 loc) · 8.31 KB
/
MapReduce.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
using System;
using System.Collections.Generic;
using MongoDB.Commands;
using MongoDB.Results;
namespace MongoDB
{
/// <summary>
/// Provides a Fluent interface to build and execute Map/Reduce calls.
/// </summary>
public class MapReduce : IDisposable
{
private readonly IMongoDatabase _database;
private readonly Type _rootType;
private bool _canModify = true;
private bool _disposing;
/// <summary>
/// Initializes a new instance of the <see cref = "MapReduce" /> class.
/// </summary>
/// <param name = "database">The database.</param>
/// <param name = "name">The name.</param>
/// <param name = "rootType">Type of the root.</param>
public MapReduce(IMongoDatabase database, string name, Type rootType)
{
if(database == null)
throw new ArgumentNullException("database");
if(name == null)
throw new ArgumentNullException("name");
if(rootType == null)
throw new ArgumentNullException("rootType");
_rootType = rootType;
_database = database;
Command = new MapReduceCommand(name);
}
/// <summary>
/// Gets the result.
/// </summary>
/// <value>The result.</value>
internal MapReduceResult Result { get; private set; }
/// <summary>
/// Gets the command.
/// </summary>
/// <value>The command.</value>
public MapReduceCommand Command { get; private set; }
/// <summary>
/// Gets the documents.
/// </summary>
/// <value>The documents.</value>
public IEnumerable<Document> Documents
{
get
{
if(Result == null)
RetrieveData();
if(Result == null || Result.Ok == false)
throw new InvalidOperationException("Documents cannot be iterated when an error was returned from execute.");
var docs = _database.GetCollection<Document>(Result.CollectionName).FindAll().Documents;
using((IDisposable)docs)
{
foreach(var doc in docs)
yield return doc;
}
}
}
/// <summary>
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
/// </summary>
public void Dispose()
{
if(Command.KeepTemp || Command.Out != null || _disposing)
return;
_disposing = true;
if(Result == null || Result.Ok == false)
return; //Nothing to do.
//Drop the temporary collection that was created as part of results.
_database.Metadata.DropCollection(Result.CollectionName);
}
/// <summary>
/// The map function references the variable this to inspect the current object under consideration.
/// A map function must call emit(key,value) at least once, but may be invoked any number of times,
/// as may be appropriate.
/// </summary>
public MapReduce Map(string function)
{
return Map(new Code(function));
}
/// <summary>
/// The map function references the variable this to inspect the current object under consideration.
/// A map function must call emit(key,value) at least once, but may be invoked any number of times,
/// as may be appropriate.
/// </summary>
public MapReduce Map(Code function)
{
TryModify();
Command.Map = function;
return this;
}
/// <summary>
/// The reduce function receives a key and an array of values. To use, reduce the received values,
/// and return a result.
/// </summary>
/// <remarks>
/// The MapReduce engine may invoke reduce functions iteratively; thus, these functions
/// must be idempotent. If you need to perform an operation only once, use a finalize function.
/// </remarks>
public MapReduce Reduce(string function)
{
return Reduce(new Code(function));
}
/// <summary>
/// The reduce function receives a key and an array of values. To use, reduce the received values,
/// and return a result.
/// </summary>
/// <remarks>
/// The MapReduce engine may invoke reduce functions iteratively; thus, these functions
/// must be idempotent. If you need to perform an operation only once, use a finalize function.
/// </remarks>
public MapReduce Reduce(Code function)
{
TryModify();
Command.Reduce = function;
return this;
}
/// <summary>
/// Query filter object
/// </summary>
public MapReduce Query(Document query)
{
TryModify();
Command.Query = query;
return this;
}
/// <summary>
/// Sort the query. Useful for optimization
/// </summary>
public MapReduce Sort(Document sort)
{
TryModify();
Command.Sort = sort;
return this;
}
/// <summary>
/// Number of objects to return from collection
/// </summary>
public MapReduce Limit(long limit)
{
TryModify();
Command.Limit = limit;
return this;
}
/// <summary>
/// Name of the final collection the results should be stored in.
/// </summary>
/// <remarks>
/// A temporary collection is still used and then renamed to the target name atomically.
/// </remarks>
public MapReduce Out(String name)
{
TryModify();
Command.Out = name;
return this;
}
/// <summary>
/// When true the generated collection is not treated as temporary. Specifying out automatically makes
/// the collection permanent
/// </summary>
public MapReduce KeepTemp(bool keep)
{
TryModify();
Command.KeepTemp = keep;
return this;
}
/// <summary>
/// Provides statistics on job execution time.
/// </summary>
public MapReduce Verbose(bool val)
{
TryModify();
Command.Verbose = val;
return this;
}
/// <summary>
/// Function to apply to all the results when finished.
/// </summary>
public MapReduce Finalize(Code function)
{
TryModify();
Command.Finalize = function;
return this;
}
/// <summary>
/// Document where fields go into javascript global scope
/// </summary>
public MapReduce Scope(Document scope)
{
TryModify();
Command.Scope = scope;
return this;
}
/// <summary>
/// Retrieves the data.
/// </summary>
internal void RetrieveData()
{
if(Command.Command.Contains("map") == false || Command.Command.Contains("reduce") == false)
throw new InvalidOperationException("Cannot execute without a map and reduce function");
_canModify = false;
try
{
Result = new MapReduceResult(_database.SendCommand(_rootType, Command.Command));
}
catch(MongoCommandException exception)
{
Result = new MapReduceResult(exception.Error);
throw new MongoMapReduceException(exception);
}
}
/// <summary>
/// Tries the modify.
/// </summary>
private void TryModify()
{
if(_canModify == false)
throw new InvalidOperationException("Cannot modify a map/reduce that has already executed");
}
}
}