Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

MapReduce updated to work with MongoDB version >= 1.7.4

  • Loading branch information...
commit f7ae5b7235597478df421c80afcf62012a02a95a 1 parent 3db3cc9
Tony Hannan authored
50 Database/MongoDB/Query.hs
@@ -33,7 +33,7 @@ module Database.MongoDB.Query (
33 33 -- ** Group
34 34 Group(..), GroupKey(..), group,
35 35 -- ** MapReduce
36   - MapReduce(..), MapFun, ReduceFun, FinalizeFun, mapReduce, runMR, runMR',
  36 + MapReduce(..), MapFun, ReduceFun, FinalizeFun, MROut(..), MRMerge(..), MRResult, mapReduce, runMR, runMR',
37 37 -- * Command
38 38 Command, runCommand, runCommand1,
39 39 eval,
@@ -531,6 +531,7 @@ group g = at "retval" <$> runCommand ["group" =: groupDocument g]
531 531 -- ** MapReduce
532 532
533 533 -- | Maps every document in collection to a list of (key, value) pairs, then for each unique key reduces all its associated values to a single result. There are additional parameters that may be set to tweak this basic operation.
  534 +-- This implements the latest version of map-reduce that requires MongoDB 1.7.4 or greater. To map-reduce against an older server use runCommand directly as described in http://www.mongodb.org/display/DOCS/MapReduce.
534 535 data MapReduce = MapReduce {
535 536 rColl :: Collection,
536 537 rMap :: MapFun,
@@ -538,8 +539,7 @@ data MapReduce = MapReduce {
538 539 rSelect :: Selector, -- ^ Operate on only those documents selected. Default is [] meaning all documents.
539 540 rSort :: Order, -- ^ Default is [] meaning no sort
540 541 rLimit :: Limit, -- ^ Default is 0 meaning no limit
541   - rOut :: Maybe Collection, -- ^ Output to given permanent collection, otherwise output to a new temporary collection whose name is returned.
542   - rKeepTemp :: Bool, -- ^ If True, the temporary output collection is made permanent. If False, the temporary output collection persists for the life of the current pipe only, however, other pipes may read from it while the original one is still alive. Note, reading from a temporary collection after its original pipe dies returns an empty result (not an error). The default for this attribute is False, unless 'rOut' is specified, then the collection permanent.
  542 + rOut :: MROut, -- ^ Output to a collection with a certain merge policy. Default is no collection (Inline). Note, you don't want this default if your result set is large.
543 543 rFinalize :: Maybe FinalizeFun, -- ^ Function to apply to all the results when finished. Default is Nothing.
544 544 rScope :: Document, -- ^ Variables (environment) that can be accessed from map/reduce/finalize. Default is [].
545 545 rVerbose :: Bool -- ^ Provide statistics on job execution time. Default is False.
@@ -554,32 +554,60 @@ type ReduceFun = Javascript
554 554 type FinalizeFun = Javascript
555 555 -- ^ @(key, value) -> final_value@. A finalize function may be run after reduction. Such a function is optional and is not necessary for many map/reduce cases. The finalize function takes a key and a value, and returns a finalized value.
556 556
  557 +data MROut =
  558 + Inline -- ^ Return results directly instead of writing them to an output collection. Results must fit within 16MB limit of a single document
  559 + | Output MRMerge Collection (Maybe Database) -- ^ Write results to given collection, in other database if specified. Follow merge policy when entry already exists
  560 + deriving (Show, Eq)
  561 +
  562 +data MRMerge =
  563 + Replace -- ^ Clear all old data and replace it with new data
  564 + | Merge -- ^ Leave old data but overwrite entries with the same key with new data
  565 + | Reduce -- ^ Leave old data but combine entries with the same key via MR's reduce function
  566 + deriving (Show, Eq)
  567 +
  568 +type MRResult = Document
  569 +-- ^ Result of running a MapReduce has some stats besides the output. See http://www.mongodb.org/display/DOCS/MapReduce#MapReduce-Resultobject
  570 +
557 571 mrDocument :: MapReduce -> Document
558 572 -- ^ Translate MapReduce data into expected document form
559 573 mrDocument MapReduce{..} =
560 574 ("mapreduce" =: rColl) :
561   - ("out" =? rOut) ++
  575 + ("out" =: mrOutDoc rOut) :
562 576 ("finalize" =? rFinalize) ++ [
563 577 "map" =: rMap,
564 578 "reduce" =: rReduce,
565 579 "query" =: rSelect,
566 580 "sort" =: rSort,
567 581 "limit" =: (fromIntegral rLimit :: Int),
568   - "keeptemp" =: rKeepTemp,
569 582 "scope" =: rScope,
570 583 "verbose" =: rVerbose ]
571 584
  585 +mrOutDoc :: MROut -> Document
  586 +-- ^ Translate MROut into expected document form
  587 +mrOutDoc Inline = ["inline" =: (1 :: Int)]
  588 +mrOutDoc (Output mrMerge coll mDB) = (mergeName mrMerge =: coll) : mdb mDB where
  589 + mergeName Replace = "replace"
  590 + mergeName Merge = "merge"
  591 + mergeName Reduce = "reduce"
  592 + mdb Nothing = []
  593 + mdb (Just (Database db)) = ["db" =: db]
  594 +
572 595 mapReduce :: Collection -> MapFun -> ReduceFun -> MapReduce
573 596 -- ^ MapReduce on collection with given map and reduce functions. Remaining attributes are set to their defaults, which are stated in their comments.
574   -mapReduce col map' red = MapReduce col map' red [] [] 0 Nothing False Nothing [] False
  597 +mapReduce col map' red = MapReduce col map' red [] [] 0 Inline Nothing [] False
575 598
576 599 runMR :: (DbAccess m) => MapReduce -> m Cursor
577 600 -- ^ Run MapReduce and return cursor of results. Error if map/reduce fails (because of bad Javascript)
578   --- TODO: Delete temp result collection when cursor closes. Until then, it will be deleted by the server when pipe closes.
579   -runMR mr = find . query [] =<< (at "result" <$> runMR' mr)
580   -
581   -runMR' :: (DbAccess m) => MapReduce -> m Document
582   --- ^ Run MapReduce and return a result document containing a "result" field holding the output Collection and additional statistic fields. Error if the map/reduce failed (because of bad Javascript).
  601 +runMR mr = do
  602 + res <- runMR' mr
  603 + case look "result" res of
  604 + Just (String coll) -> find $ query [] coll
  605 + Just (Doc doc) -> use (Database $ at "db" doc) $ find $ query [] (at "collection" doc)
  606 + Just x -> error $ "unexpected map-reduce result field: " ++ show x
  607 + Nothing -> newCursor (Database "") "" 0 $ return $ CS 0 0 (at "results" res)
  608 +
  609 +runMR' :: (DbAccess m) => MapReduce -> m MRResult
  610 +-- ^ Run MapReduce and return a MR result document containing stats and the results if Inlined. Error if the map/reduce failed (because of bad Javascript).
583 611 runMR' mr = do
584 612 doc <- runCommand (mrDocument mr)
585 613 return $ if true1 "ok" doc then doc else error $ "mapReduce error:\n" ++ show doc ++ "\nin:\n" ++ show mr
24 map-reduce-example.md
Source Rendered
@@ -13,14 +13,13 @@ Setup
13 13 To start, we'll insert some example data which we can perform
14 14 map/reduce queries on:
15 15
16   - $ ghci -package mongoDB
17   - GHCi, version 6.12.1: http://www.haskell.org/ghc/ :? for help
  16 + $ ghci
18 17 ...
19 18 Prelude> :set prompt "> "
20 19 > :set -XOverloadedStrings
21 20 > import Database.MongoDB
22 21 > import Data.CompactString ()
23   - > conn <- newConnPool 1 (host "localhost")
  22 + > conn <- newConnPool 1 (host "127.0.0.1")
24 23 > let run act = access safe Master conn $ use (Database "test") act
25 24 > :{
26 25 run $ insertMany "mr1" [
@@ -68,18 +67,17 @@ key:
68 67 Note: We can't just return values.length as the reduce function might
69 68 be called iteratively on the results of other reduce steps.
70 69
71   -Finally, we run mapReduce and iterate over the result collection:
  70 +Finally, we run mapReduce, results by default will be return in an array in the result document (inlined):
72 71
73   - > run $ runMR (mapReduce "mr1" mapFn reduceFn) >>= rest
74   - Right [[ _id: "cat", value: 3.0],[ _id: "dog", value: 2.0],[ _id: "mouse", value: 1.0]]
  72 + > run $ runMR' (mapReduce "mr1" mapFn reduceFn)
  73 + Right [ results: [[ _id: "cat", value: 3.0],[ _id: "dog", value: 2.0],[ _id: "mouse", value: 1.0]], timeMillis: 379, counts: [ input: 4, emit: 6, reduce: 2, output: 3], ok: 1.0]
75 74
76   -Advanced Map/Reduce
77   --------------------
  75 +Inlining only works if result set < 16MB. An alternative to inlining is outputing to a collection. But what to do if there is data already in the collection from a previous run of the same MapReduce? You have three alternatives in the MRMerge data type: Replace, Merge, and Reduce. See its documentation for details. To output to a collection, set the mOut field in MapReduce.
78 76
79   -MongoDB returns additional statistics in the map/reduce results. To
80   -obtain them, use *runMR'* instead:
  77 + > run $ runMR' (mapReduce "mr1" mapFn reduceFn) {rOut = Output Replace "mr1out" Nothing}
  78 + Right [ result: "mr1out", timeMillis: 379, counts: [ input: 4, emit: 6, reduce: 2, output: 3], ok: 1.0]
81 79
82   - > run $ runMR' (mapReduce "mr1" mapFn reduceFn)
83   - Right [ result: "tmp.mr.mapreduce_1276482643_7", timeMillis: 379, counts: [ input: 4, emit: 6, output: 3], ok: 1.0]
  80 +You can now query the mr1out collection to see the result, or run another MapReduce on it! A shortcut for running the map-reduce then querying the result collection right away is `runMR`.
84 81
85   -You can then obtain the results from here by quering the result collection yourself. *runMR* (above) does this for you but discards the statistics.
  82 + > run $ rest =<< runMR (mapReduce "mr1" mapFn reduceFn) {rOut = Output Replace "mr1out" Nothing}
  83 + Right [[ _id: "cat", value: 3.0],[ _id: "dog", value: 2.0],[ _id: "mouse", value: 1.0]]
2  mongoDB.cabal
... ... @@ -1,5 +1,5 @@
1 1 name: mongoDB
2   -version: 0.9.5
  2 +version: 0.10.0
3 3 build-type: Simple
4 4 license: OtherLicense
5 5 license-file: LICENSE

0 comments on commit f7ae5b7

Please sign in to comment.
Something went wrong with that request. Please try again.