Skip to content

cnsuhao/mongo-disco

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

#MongoDB Disco Adapter

The MongoDB Disco Adapter is a plugin which connect MongoDB and Disco MapRedcue framework by enabling users the ability to use MongoDB as an data input and/or an output source.

##Prerequisites For each machine in a disco cluster, it need following:

-Python

-PyMongo

-Disco

For instructions to setup disco clusters, please refer to the the guide(http://discoproject.org/doc/disco/start/install.html) in disco project website.

##Installation

  1. Check out the latest source code in github
   $ git clone https://github.com/mongo/mongo-disco.git mongo-disco
  1. Go to mongo-disco folder, and run the setup.py file to install MongoDisco package
    $ python setup.py install 
Note: it may request administrator privilege to run the script

It’s done! Start hacking!

##Example Word Counting is a classic example for MapReduce framework. It could be done extremely easily using the MongoDB Disco Adapter.

Step 1. Users need to specify the configuration for this job.

For example, users could specify where the input data is stored and how they would like to store output data by providing a mongodb uri.

config = {
        "input_uri": "mongodb://localhost/test.in",
        "output_uri": "mongodb://localhost/test.out",
        "create_input_splits": True,
        "split_key": {‘_id’:1},
        "split_size”:1, #MB
}

You can find more detailed configuration in the appendix.

Here, we assume we assume that input data is in database “test”, collection “in”, and we want to split data on “_id” field by setting the split_size equal to 1 Megabyte. The result would be written back to collection “out” at last.

Step 2. Write up its own map function

Here we would like to read the value under the field “word” and count it, so the map function would like following:

def map(doc, params):
    yield record.get('doc', "NoWord"), 1

Note: doc is an ordinary document return by mongodb query. You can perform any operations on it as MongoDB allowed.

Setup 3. Write up reduce function

As we already get key-value generators from the map process, we only need perform sum operation for each word.

def reduce(iter, params):
    from disco.util import kvgroup
    for word, counts in kvgroup(sorted(iter)):
        yield word, sum(counts)

The first parameter, iter, is an iterator over keys and values produced by the map function. We use disco.util.kvgroup() to simply pull out each word along with its counts, and sum them together.

Setup 4. Create a MongoJob instance and run it

from mongodisco.job import MongoJob

MongoJob().run(map=map, reduce=reduce, **config)

Now you run it in a terminal like other python codes and check the result in MongoDB.

##Appendix

Configuration for MongoJob

NameDefault ValueNote
input_urimongodb://localhost/test.inmongodb uri for input data
output_urimongodb://localhost/test.outmongodb uri for output result
print_to_stdoutFalseif True, print result to stdout
job_waitTrueif False, code won’t wait for end of job
create_input_splitsTrueif True, data will be splitted
split_size8size for one split
split_key{“_id”:1}field for performing splitting
use_shardsFalseif True, directly connect to shards to retrieve data
use_chunksTrueif True, directly use chunks splitted by mongoDB as splits
input_keyNoneUnknown!!!
slave_okFalsesame as slave_okay
query{}same as spec parameter of find method
fieldsNonesame as fields parameter of find method
sortNonesame as sort parameter of find method
limit0same as limit parameter of find method
skip0same as skip parameter of find method
job_output_key“_id”field name for output key
job_output_value“value”field name for output value

About

Integration with the Disco Framework for distributed computation

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

  • Python 100.0%