#MongoDB Disco Adapter
Note: for a more regularly maintained fork, please see https://github.com/mongodb/mongo-disco
The MongoDB Disco Adapter is a plugin which connect MongoDB and Disco MapRedcue framework by enabling users the ability to use MongoDB as an data input and/or an output source.
##Prerequisites For each machine in a disco cluster, it need following:
-Python
-PyMongo
-Disco
For instructions to setup disco clusters, please refer to the the guide(http://discoproject.org/doc/disco/start/install.html) in disco project website.
##Installation
- Check out the latest source code in github
$ git clone https://github.com/10genNYUITP/MongoDisco MongoDisco
- Go to MongoDisco folder, and run the setup.py file to install MongoDisco package
$ python setup.py install
Note: it may request administrator privilege to run the script
It’s done! Start hacking!
##Example Word Counting is a classic example for MapReduce framework. It could be done extremely easily using the MongoDB Disco Adapter.
Step 1. Users need to specify the configuration for this job.
For example, users could specify where the input data is stored and how they would like to store output data by providing a mongodb uri.
config = {
"input_uri": "mongodb://localhost/test.in",
"output_uri": "mongodb://localhost/test.out",
"create_input_splits": True,
"split_key": {‘_id’:1},
"split_size”:1, #MB
}
You can find more detailed configuration in the appendix.
Here, we assume we assume that input data is in database “test”, collection “in”, and we want to split data on “_id” field by setting the split_size equal to 1 Megabyte. The result would be written back to collection “out” at last.
Step 2. Write up its own map function
Here we would like to read the value under the field “word” and count it, so the map function would like following:
def map(doc, params):
yield record.get('doc', "NoWord"), 1
Note: doc is an ordinary document return by mongodb query. You can perform any operations on it as MongoDB allowed.
Setup 3. Write up reduce function
As we already get key-value generators from the map process, we only need perform sum operation for each word.
def reduce(iter, params):
from disco.util import kvgroup
for word, counts in kvgroup(sorted(iter)):
yield word, sum(counts)
The first parameter, iter, is an iterator over keys and values produced by the map function. We use disco.util.kvgroup() to simply pull out each word along with its counts, and sum them together.
Setup 4. Create a DiscoJob instance and run it
from mongodisco.job import DiscoJob
DiscoJob(config = config,map = map,reduce = reduce).run()
Now you run it in a terminal like other python codes and check the result in MongoDB.
##Appendix
Configuration for DiscoJob
Name | Default Value | Note |
input_uri | mongodb://localhost/test.in | mongodb uri for input data |
output_uri | mongodb://localhost/test.out | mongodb uri for output result |
print_to_stdout | False | if True, print result to stdout |
job_wait | True | if False, code won’t wait for end of job |
create_input_splits | True | if True, data will be splitted |
split_size | 8 | size for one split |
split_key | {“_id”:1} | field for performing splitting |
use_shards | False | if True, directly connect to shards to retrieve data |
use_chunks | True | if True, directly use chunks splitted by mongoDB as splits |
input_key | None | Unknown!!! |
slave_ok | False | same as slave_okay |
query | {} | same as spec parameter of find method |
fields | None | same as fields parameter of find method |
sort | None | same as sort parameter of find method |
limit | 0 | same as limit parameter of find method |
skip | 0 | same as skip parameter of find method |
job_output_key | “_id” | field name for output key |
job_output_value | “value” | field name for output value |