This repository has been archived by the owner on Jan 29, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 603
/
MongoConfigUtil.java
530 lines (438 loc) · 19.9 KB
/
MongoConfigUtil.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
// MongoConfigUtil.java
/*
* Copyright 2010 10gen Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.mongodb.hadoop.util;
import com.mongodb.*;
import com.mongodb.util.*;
import org.apache.commons.logging.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
/**
* Configuration helper tool for MongoDB related Map/Reduce jobs
*/
public class MongoConfigUtil {
private static final Log log = LogFactory.getLog( MongoConfigUtil.class );
private static final Mongo.Holder _mongos = new Mongo.Holder();
/**
* The JOB_* values are entirely optional and disregarded unless you use the MongoTool base toolset... If you don't,
* feel free to ignore these
*/
public static final String JOB_VERBOSE = "mongo.job.verbose";
public static final String JOB_BACKGROUND = "mongo.job.background";
public static final String JOB_MAPPER = "mongo.job.mapper";
public static final String JOB_COMBINER = "mongo.job.combiner";
public static final String JOB_PARTITIONER = "mongo.job.partitioner";
public static final String JOB_REDUCER = "mongo.job.reducer";
public static final String JOB_SORT_COMPARATOR = "mongo.job.sort_comparator";
public static final String JOB_MAPPER_OUTPUT_KEY = "mongo.job.mapper.output.key";
public static final String JOB_MAPPER_OUTPUT_VALUE = "mongo.job.mapper.output.value";
public static final String JOB_INPUT_FORMAT = "mongo.job.input.format";
public static final String JOB_OUTPUT_FORMAT = "mongo.job.output.format";
public static final String JOB_OUTPUT_KEY = "mongo.job.output.key";
public static final String JOB_OUTPUT_VALUE = "mongo.job.output.value";
public static final String INPUT_URI = "mongo.input.uri";
public static final String OUTPUT_URI = "mongo.output.uri";
/**
* The MongoDB field to read from for the Mapper Input.
*
* This will be fed to your mapper as the "Key" for the input.
*
* Defaults to {@code _id}
*/
public static final String INPUT_KEY = "mongo.input.key";
public static final String INPUT_NOTIMEOUT = "mongo.input.notimeout";
public static final String INPUT_QUERY = "mongo.input.query";
public static final String INPUT_FIELDS = "mongo.input.fields";
public static final String INPUT_SORT = "mongo.input.sort";
public static final String INPUT_LIMIT = "mongo.input.limit";
public static final String INPUT_SKIP = "mongo.input.skip";
/**
* When *not* using 'read_from_shards' or 'read_shard_chunks'
* The number of megabytes per Split to create for the input data.
*
* Currently defaults to 8MB, tweak it as necessary for your code.
*
* This default will likely change as we research better options.
*/
public static final String INPUT_SPLIT_SIZE = "mongo.input.split_size";
public static final int DEFAULT_SPLIT_SIZE = 8; // 8 mb per manual (non-sharding) split
/**
* If CREATE_INPUT_SPLITS is true but SPLITS_USE_CHUNKS is false, Mongo-Hadoop will attempt
* to create custom input splits for you. By default it will split on {@code _id}, which is a
* reasonable/sane default.
*
* If you want to customize that split point for efficiency reasons (such as different distribution)
* you may set this to any valid field name. The restriction on this key name are the *exact same rules*
* as when sharding an existing MongoDB Collection. You must have an index on the field, and follow the other
* rules outlined in the docs.
*
* This must be a JSON document, and not just a field name!
*
* @link http://www.mongodb.org/display/DOCS/Sharding+Introduction#ShardingIntroduction-ShardKeys
*/
public static final String INPUT_SPLIT_KEY_PATTERN = "mongo.input.split.split_key_pattern";
/**
* If {@code true}, the driver will attempt to split the MongoDB Input data (if reading from Mongo) into
* multiple InputSplits to allow parallelism/concurrency in processing within Hadoop. That is to say,
* Hadoop will assign one InputSplit per mapper.
*
* This is {@code true} by default now, but if {@code false}, only one InputSplit (your whole collection) will be
* assigned to Hadoop – severely reducing parallel mapping.
*/
public static final String CREATE_INPUT_SPLITS = "mongo.input.split.create_input_splits";
/**
* If {@code true} in a sharded setup splits will be made to connect to individual backend {@code mongod}s. This
* can be unsafe. If {@code mongos} is moving chunks around you might see duplicate data, or miss some data
* entirely. Defaults to {@code false}
*/
public static final String SPLITS_USE_SHARDS = "mongo.input.split.read_from_shards";
/**
* If {@code true} have one split = one shard chunk. If {@link #SPLITS_USE_SHARDS} is not true splits will still
* use chunks, but will connect through {@code mongos} instead of the individual backend {@code mongod}s (the safe
* thing to do). If {@link #SPLITS_USE_SHARDS} is {@code true} but this is {@code false} one split will be made for
* each backend shard. THIS IS UNSAFE and may result in data being run multiple times <p> Defaults to {@code true }
*/
public static final String SPLITS_USE_CHUNKS = "mongo.input.split.read_shard_chunks";
/**
* If true then shards are replica sets run queries on slaves. If set this will override any option passed on the
* URI.
*
* Defaults to {@code false}
*/
public static final String SPLITS_SLAVE_OK = "mongo.input.split.allow_read_from_secondaries";
public static boolean isJobVerbose( Configuration conf ){
return conf.getBoolean( JOB_VERBOSE, false );
}
public static void setJobVerbose( Configuration conf, boolean val ){
conf.setBoolean( JOB_VERBOSE, val );
}
public static boolean isJobBackground( Configuration conf ){
return conf.getBoolean( JOB_BACKGROUND, false );
}
public static void setJobBackground( Configuration conf, boolean val ){
conf.setBoolean( JOB_BACKGROUND, val );
}
// TODO - In light of key/value specifics should we have a base MongoMapper
// class?
public static Class<? extends Mapper> getMapper( Configuration conf ){
/** TODO - Support multiple inputs via getClasses ? **/
return conf.getClass( JOB_MAPPER, null, Mapper.class );
}
public static void setMapper( Configuration conf, Class<? extends Mapper> val ){
conf.setClass( JOB_MAPPER, val, Mapper.class );
}
public static Class<?> getMapperOutputKey( Configuration conf ){
return conf.getClass( JOB_MAPPER_OUTPUT_KEY, null );
}
public static void setMapperOutputKey( Configuration conf, Class<?> val ){
conf.setClass( JOB_MAPPER_OUTPUT_KEY, val, Object.class );
}
public static Class<?> getMapperOutputValue( Configuration conf ){
return conf.getClass( JOB_MAPPER_OUTPUT_VALUE, null );
}
public static void setMapperOutputValue( Configuration conf, Class<?> val ){
conf.setClass( JOB_MAPPER_OUTPUT_VALUE, val, Object.class );
}
public static Class<? extends Reducer> getCombiner( Configuration conf ){
return conf.getClass( JOB_COMBINER, null, Reducer.class );
}
public static void setCombiner( Configuration conf, Class<? extends Reducer> val ){
conf.setClass( JOB_COMBINER, val, Reducer.class );
}
// TODO - In light of key/value specifics should we have a base MongoReducer
// class?
public static Class<? extends Reducer> getReducer( Configuration conf ){
/** TODO - Support multiple outputs via getClasses ? **/
return conf.getClass( JOB_REDUCER, null, Reducer.class );
}
public static void setReducer( Configuration conf, Class<? extends Reducer> val ){
conf.setClass( JOB_REDUCER, val, Reducer.class );
}
public static Class<? extends Partitioner> getPartitioner( Configuration conf ){
return conf.getClass( JOB_PARTITIONER, null, Partitioner.class );
}
public static void setPartitioner( Configuration conf, Class<? extends Partitioner> val ){
conf.setClass( JOB_PARTITIONER, val, Partitioner.class );
}
public static Class<? extends RawComparator> getSortComparator( Configuration conf ){
return conf.getClass( JOB_SORT_COMPARATOR, null, RawComparator.class );
}
public static void setSortComparator( Configuration conf, Class<? extends RawComparator> val ){
conf.setClass( JOB_SORT_COMPARATOR, val, RawComparator.class );
}
public static Class<? extends OutputFormat> getOutputFormat( Configuration conf ){
return conf.getClass( JOB_OUTPUT_FORMAT, null, OutputFormat.class );
}
public static void setOutputFormat( Configuration conf, Class<? extends OutputFormat> val ){
conf.setClass( JOB_OUTPUT_FORMAT, val, OutputFormat.class );
}
public static Class<?> getOutputKey( Configuration conf ){
return conf.getClass( JOB_OUTPUT_KEY, null );
}
public static void setOutputKey( Configuration conf, Class<?> val ){
conf.setClass( JOB_OUTPUT_KEY, val, Object.class );
}
public static Class<?> getOutputValue( Configuration conf ){
return conf.getClass( JOB_OUTPUT_VALUE, null );
}
public static void setOutputValue( Configuration conf, Class<?> val ){
conf.setClass( JOB_OUTPUT_VALUE, val, Object.class );
}
public static Class<? extends InputFormat> getInputFormat( Configuration conf ){
return conf.getClass( JOB_INPUT_FORMAT, null, InputFormat.class );
}
public static void setInputFormat( Configuration conf, Class<? extends InputFormat> val ){
conf.setClass( JOB_INPUT_FORMAT, val, InputFormat.class );
}
public static MongoURI getMongoURI( Configuration conf, String key ){
final String raw = conf.get( key );
if ( raw != null && !raw.trim().isEmpty() )
return new MongoURI( raw );
else
return null;
}
public static MongoURI getInputURI( Configuration conf ){
return getMongoURI( conf, INPUT_URI );
}
public static DBCollection getCollection( MongoURI uri ){
try {
Mongo mongo = _mongos.connect( uri );
DB myDb = mongo.getDB(uri.getDatabase());
//if there's a username and password
if(uri.getUsername() != null && uri.getPassword() != null && !myDb.isAuthenticated()) {
boolean auth = myDb.authenticate(uri.getUsername(), uri.getPassword());
if(auth) {
log.info("Sucessfully authenticated with collection.");
}
else {
throw new IllegalArgumentException( "Unable to connect to collection." );
}
}
return uri.connectCollection(mongo);
}
catch ( final Exception e ) {
throw new IllegalArgumentException( "Unable to connect to collection." + e.getMessage(), e );
}
}
public static DBCollection getOutputCollection( Configuration conf ){
final MongoURI _uri = getOutputURI( conf );
if(_uri == null)
throw new IllegalArgumentException("Please set mongodb output uri.");
try{
return getCollection( _uri );
}
catch ( final Exception e ) {
throw new IllegalArgumentException( "Unable to connect to MongoDB Output Collection.", e );
}
}
public static DBCollection getInputCollection( Configuration conf ){
try {
final MongoURI _uri = getInputURI( conf );
return getCollection( _uri );
}
catch ( final Exception e ) {
throw new IllegalArgumentException(
"Unable to connect to MongoDB Input Collection at '" + getInputURI( conf ) + "'", e );
}
}
public static void setMongoURI( Configuration conf, String key, MongoURI value ){
conf.set( key, value.toString() ); // todo - verify you can toString a
// URI object
}
public static void setMongoURIString( Configuration conf, String key, String value ){
try {
final MongoURI uri = new MongoURI( value );
setMongoURI( conf, key, uri );
}
catch ( final Exception e ) {
throw new IllegalArgumentException( "Invalid Mongo URI '" + value + "' for Input URI", e );
}
}
public static void setInputURI( Configuration conf, String uri ){
setMongoURIString( conf, INPUT_URI, uri );
}
public static void setInputURI( Configuration conf, MongoURI uri ){
setMongoURI( conf, INPUT_URI, uri );
}
public static MongoURI getOutputURI( Configuration conf ){
return getMongoURI( conf, OUTPUT_URI );
}
public static void setOutputURI( Configuration conf, String uri ){
setMongoURIString( conf, OUTPUT_URI, uri );
}
public static void setOutputURI( Configuration conf, MongoURI uri ){
setMongoURI( conf, OUTPUT_URI, uri );
}
/**
* Set JSON but first validate it's parseable into a DBObject
*/
public static void setJSON( Configuration conf, String key, String value ){
try {
final Object dbObj = JSON.parse( value );
setDBObject( conf, key, (DBObject) dbObj );
}
catch ( final Exception e ) {
log.error( "Cannot parse JSON...", e );
throw new IllegalArgumentException( "Provided JSON String is not representable/parseable as a DBObject.",
e );
}
}
public static DBObject getDBObject( Configuration conf, String key ){
try {
final String json = conf.get( key );
final DBObject obj = (DBObject) JSON.parse( json );
if ( obj == null )
return new BasicDBObject();
else
return obj;
}
catch ( final Exception e ) {
throw new IllegalArgumentException( "Provided JSON String is not representable/parseable as a DBObject.",
e );
}
}
public static void setDBObject( Configuration conf, String key, DBObject value ){
conf.set( key, JSON.serialize( value ) );
}
public static void setQuery( Configuration conf, String query ){
setJSON( conf, INPUT_QUERY, query );
}
public static void setQuery( Configuration conf, DBObject query ){
setDBObject( conf, INPUT_QUERY, query );
}
/**
* Returns the configured query as a DBObject... If you want a string call toString() on the returned object. or use
* JSON.serialize()
*/
public static DBObject getQuery( Configuration conf ){
return getDBObject( conf, INPUT_QUERY );
}
public static void setFields( Configuration conf, String fields ){
setJSON( conf, INPUT_FIELDS, fields );
}
public static void setFields( Configuration conf, DBObject fields ){
setDBObject( conf, INPUT_FIELDS, fields );
}
/**
* Returns the configured fields as a DBObject... If you want a string call toString() on the returned object. or
* use JSON.serialize()
*/
public static DBObject getFields( Configuration conf ){
return getDBObject( conf, INPUT_FIELDS );
}
public static void setSort( Configuration conf, String sort ){
setJSON( conf, INPUT_SORT, sort );
}
public static void setSort( Configuration conf, DBObject sort ){
setDBObject( conf, INPUT_SORT, sort );
}
/**
* Returns the configured sort as a DBObject... If you want a string call toString() on the returned object. or use
* JSON.serialize()
*/
public static DBObject getSort( Configuration conf ){
return getDBObject( conf, INPUT_SORT );
}
public static int getLimit( Configuration conf ){
return conf.getInt( INPUT_LIMIT, 0 );
}
public static void setLimit( Configuration conf, int limit ){
conf.setInt( INPUT_LIMIT, limit );
}
public static int getSkip( Configuration conf ){
return conf.getInt( INPUT_SKIP, 0 );
}
public static void setSkip( Configuration conf, int skip ){
conf.setInt( INPUT_SKIP, skip );
}
public static int getSplitSize( Configuration conf ){
return conf.getInt( INPUT_SPLIT_SIZE, DEFAULT_SPLIT_SIZE );
}
public static void setSplitSize( Configuration conf, int value ){
conf.setInt( INPUT_SPLIT_SIZE, value );
}
/**
* if TRUE,
* Splits will be read by connecting to the individual shard servers,
* however this really isn't safe unless you know what you're doing.
* ( issue has to do with chunks moving / relocating during balancing phases)
* @return
*/
public static boolean canReadSplitsFromShards( Configuration conf ){
return conf.getBoolean( SPLITS_USE_SHARDS, false );
}
public static void setReadSplitsFromShards( Configuration conf, boolean value ){
conf.setBoolean( SPLITS_USE_SHARDS, value );
}
/**
* If sharding is enabled,
* Use the sharding configured chunks to split up data.
*/
public static boolean isShardChunkedSplittingEnabled( Configuration conf ) {
return conf.getBoolean( SPLITS_USE_CHUNKS, true );
}
public static void setShardChunkSplittingEnabled( Configuration conf, boolean value) {
conf.setBoolean( SPLITS_USE_CHUNKS, value );
}
public static boolean canReadSplitsFromSecondary( Configuration conf ) {
return conf.getBoolean( SPLITS_SLAVE_OK, false );
}
public static void setReadSplitsFromSecondary( Configuration conf, boolean value ) {
conf.setBoolean( SPLITS_SLAVE_OK, value );
}
public static boolean createInputSplits( Configuration conf ) {
return conf.getBoolean( CREATE_INPUT_SPLITS, true );
}
public static void setCreateInputSplits( Configuration conf, boolean value ) {
conf.setBoolean( CREATE_INPUT_SPLITS, value );
}
public static void setInputSplitKeyPattern( Configuration conf, String pattern ) {
setJSON( conf, INPUT_SPLIT_KEY_PATTERN, pattern );
}
public static void setInputSplitKey( Configuration conf, DBObject key ) {
setDBObject( conf, INPUT_SPLIT_KEY_PATTERN, key );
}
public static String getInputSplitKeyPattern( Configuration conf ) {
return conf.get( INPUT_SPLIT_KEY_PATTERN, "{ \"_id\": 1 }" );
}
public static DBObject getInputSplitKey( Configuration conf ) {
try {
final String json = getInputSplitKeyPattern( conf );
final DBObject obj = (DBObject) JSON.parse( json );
if ( obj == null )
return new BasicDBObject("_id", 1);
else
return obj;
}
catch ( final Exception e ) {
throw new IllegalArgumentException( "Provided JSON String is not representable/parseable as a DBObject.", e );
}
}
public static void setInputKey( Configuration conf, String fieldName ) {
// TODO (bwm) - validate key rules?
conf.set( INPUT_KEY, fieldName );
}
public static String getInputKey( Configuration conf ) {
return conf.get( INPUT_KEY, "_id" );
}
public static void setNoTimeout( Configuration conf, boolean value ) {
conf.setBoolean( INPUT_NOTIMEOUT, value );
}
public static boolean isNoTimeout( Configuration conf ) {
return conf.getBoolean( INPUT_NOTIMEOUT, false );
}
}