Permalink
Browse files

Handle response flags correctly, plus some comment changes

  • Loading branch information...
1 parent b03763b commit 6435bc3cd99447c6ff6b4b1f00c5189ad5c6fbfe Tony Hannan committed Jul 3, 2010
Showing with 107 additions and 41 deletions.
  1. +39 −1 Database/MongoDB.hs
  2. +17 −3 Database/MongoDB/Internal/Protocol.hs
  3. +48 −34 Database/MongoDB/Query.hs
  4. +1 −1 mongoDB.cabal
  5. +2 −2 tutorial.md
View
@@ -1,4 +1,42 @@
--- | Client interface to MongoDB server(s)
+{- |
+Client interface to MongoDB server(s).
+
+Simple example:
+
+>
+> {-# LANGUAGE OverloadedStrings #-}
+>
+> import Database.MongoDB
+>
+> main = do
+> e <- connect (server "127.0.0.1")
+> conn <- either (fail . show) return e
+> e <- runConn run conn
+> either (fail . show) return e
+>
+> run = useDb "baseball" $ do
+> clearTeams
+> insertTeams
+> print' "All Teams" =<< allTeams
+> print' "National League Teams" =<< nationalLeagueTeams
+> print' "New York Teams" =<< newYorkTeams
+>
+> clearTeams = delete (select [] "team")
+>
+> insertTeams = insertMany "team" [
+> ["name" =: "Yankees", "home" =: ["city" =: "New York", "state" =: "NY"], "league" =: "American"],
+> ["name" =: "Mets", "home" =: ["city" =: "New York", "state" =: "NY"], "league" =: "National"],
+> ["name" =: "Phillies", "home" =: ["city" =: "Philadelphia", "state" =: "PA"], "league" =: "National"],
+> ["name" =: "Red Sox", "home" =: ["city" =: "Boston", "state" =: "MA"], "league" =: "American"] ]
+>
+> allTeams = rest =<< find (select [] "team") {sort = ["city" =: 1]}
+>
+> nationalLeagueTeams = rest =<< find (select ["league" =: "National"] "team")
+>
+> newYorkTeams = rest =<< find (select ["home.state" =: "NY"] "team") {project = ["name" =: 1, "league" =: 1]}
+>
+> print' title docs = liftIO $ putStrLn title >> mapM_ print docs
+-}
module Database.MongoDB (
module Data.Bson,
@@ -15,7 +15,7 @@ module Database.MongoDB.Internal.Protocol (
-- ** Request
Request(..), QueryOption(..),
-- ** Reply
- Reply(..),
+ Reply(..), ResponseFlag(..),
-- * Authentication
Username, Password, Nonce, pwHash, pwKey
) where
@@ -238,12 +238,18 @@ qBits = bitOr . map qBit
-- | A reply is a message received in response to a 'Request'
data Reply = Reply {
- rResponseFlag :: Int32, -- ^ 0 = success, non-zero = failure
+ rResponseFlags :: [ResponseFlag],
rCursorId :: CursorId, -- ^ 0 = cursor finished
rStartingFrom :: Int32,
rDocuments :: [Document]
} deriving (Show, Eq)
+data ResponseFlag =
+ CursorNotFound -- ^ Set when getMore is called but the cursor id is not valid at the server. Returned with zero results.
+ | QueryError -- ^ Server error. Results contains one document containing an "$err" field holding the error message.
+ | AwaitCapable -- ^ For backward compatability: Set when the server supports the AwaitData query option. if it doesn't, a replica slave client should sleep a little between getMore's
+ deriving (Show, Eq, Enum)
+
-- * Binary format
replyOpcode :: Opcode
@@ -253,13 +259,21 @@ getReply :: Get (ResponseTo, Reply)
getReply = do
(opcode, responseTo) <- getHeader
unless (opcode == replyOpcode) $ fail $ "expected reply opcode (1) but got " ++ show opcode
- rResponseFlag <- getInt32
+ rResponseFlags <- rFlags <$> getInt32
rCursorId <- getInt64
rStartingFrom <- getInt32
numDocs <- fromIntegral <$> getInt32
rDocuments <- replicateM numDocs getDocument
return (responseTo, Reply{..})
+rFlags :: Int32 -> [ResponseFlag]
+rFlags bits = filter (testBit bits . rBit) [CursorNotFound ..]
+
+rBit :: ResponseFlag -> Int
+rBit CursorNotFound = 0
+rBit QueryError = 1
+rBit AwaitCapable = 3
+
-- * Authentication
type Username = UString
@@ -1,10 +1,10 @@
-- | Query and update documents residing on a MongoDB server(s)
-{-# LANGUAGE OverloadedStrings, RecordWildCards, NamedFieldPuns, TupleSections, FlexibleContexts, FlexibleInstances, UndecidableInstances, MultiParamTypeClasses #-}
+{-# LANGUAGE OverloadedStrings, RecordWildCards, NamedFieldPuns, TupleSections, FlexibleContexts, FlexibleInstances, UndecidableInstances, MultiParamTypeClasses, GeneralizedNewtypeDeriving #-}
module Database.MongoDB.Query (
-- * Connection
- Failure(..), Conn, Connected, runConn,
+ Connected, runConn, Conn, Failure(..),
-- * Database
Database, allDatabases, DbConn, useDb, thisDatabase,
-- ** Authentication
@@ -52,11 +52,22 @@ import Database.MongoDB.Internal.Protocol hiding (Query, send, call)
import Data.Bson
import Data.Word
import Data.Int
-import Data.Maybe (listToMaybe, catMaybes)
+import Data.Maybe (listToMaybe, catMaybes, mapMaybe)
import Data.UString as U (dropWhile, any, tail)
import Database.MongoDB.Internal.Util (loop, (<.>), true1) -- plus Applicative instances of ErrorT & ReaderT
--- * Connection
+-- * Connected
+
+newtype Connected m a = Connected (ErrorT Failure (ReaderT WriteMode (ReaderT Connection m)) a)
+ deriving (Context Connection, Context WriteMode, MonadError Failure, MonadIO, Monad, Applicative, Functor)
+-- ^ Monad with access to a 'Connection' and 'WriteMode' and throws a 'Failure' on connection or server failure
+
+instance MonadTrans Connected where
+ lift = Connected . lift . lift . lift
+
+runConn :: Connected m a -> Connection -> m (Either Failure a)
+-- ^ Run action with access to connection. Return Left Failure if connection or server fails during execution.
+runConn (Connected action) = runReaderT (runReaderT (runErrorT action) Unsafe)
-- | A monad with access to a 'Connection' and 'WriteMode' and throws a 'Failure' on connection or server failure
class (Context Connection m, Context WriteMode m, MonadError Failure m, MonadIO m, Applicative m, Functor m) => Conn m
@@ -72,12 +83,6 @@ data Failure =
instance Error Failure where strMsg = ServerFailure
-type Connected m = ErrorT Failure (ReaderT WriteMode (ReaderT Connection m))
-
-runConn :: Connected m a -> Connection -> m (Either Failure a)
--- ^ Run action with access to connection. Return Left Failure if connection or server fails during execution.
-runConn action = runReaderT (runReaderT (runErrorT action) Unsafe)
-
send :: (Conn m) => [Notice] -> m ()
-- ^ Send notices as a contiguous batch to server with no reply. Raise Failure if connection fails.
send ns = do
@@ -298,7 +303,7 @@ type Limit = Word32
-- ^ Maximum number of documents to return, i.e. cursor will close after iterating over this number of documents. 0 means no limit.
type Order = Document
--- ^ Fields to sort by. Each one is associated with 1 or -1. Eg. @[x =: 1, y =: (-1)]@ means sort by @x@ ascending then @y@ descending
+-- ^ Fields to sort by. Each one is associated with 1 or -1. Eg. @[x =: 1, y =: -1]@ means sort by @x@ ascending then @y@ descending
type BatchSize = Word32
-- ^ The number of document to return in each batch response from the server. 0 means use Mongo default.
@@ -310,10 +315,12 @@ query sel col = Query [] (Select sel col) [] 0 0 [] False 0 []
batchSizeRemainingLimit :: BatchSize -> Limit -> (Int32, Limit)
-- ^ Given batchSize and limit return P.qBatchSize and remaining limit
batchSizeRemainingLimit batchSize limit = if limit == 0
- then (fromIntegral batchSize, 0) -- no limit
- else if 0 < batchSize && batchSize < limit
- then (fromIntegral batchSize, limit - batchSize)
+ then (fromIntegral batchSize', 0) -- no limit
+ else if 0 < batchSize' && batchSize' < limit
+ then (fromIntegral batchSize', limit - batchSize')
else (- fromIntegral limit, 1)
+ where batchSize' = if batchSize == 1 then 2 else batchSize
+ -- batchSize 1 is broken because server converts 1 to -1 meaning limit 1
queryRequest :: Bool -> Query -> Database -> (Request, Limit)
-- ^ Translate Query to Protocol.Query. If first arg is true then add special $explain attribute.
@@ -334,6 +341,12 @@ runQuery :: (DbConn m) => Bool -> [Notice] -> Query -> m CursorState'
-- ^ Send query request and return cursor state
runQuery isExplain ns q = call' ns . queryRequest isExplain q =<< thisDatabase
+call' :: (Conn m) => [Notice] -> (Request, Limit) -> m CursorState'
+-- ^ Send notices and request and return promised cursor state
+call' ns (req, remainingLimit) = do
+ promise <- call ns req
+ return $ Delayed (fmap (fromReply remainingLimit =<<) promise)
+
find :: (DbConn m) => Query -> m Cursor
-- ^ Fetch documents satisfying query
find q@Query{selection, batchSize} = do
@@ -397,15 +410,16 @@ data CursorState = CS Limit CursorId [Document]
fromReply :: Limit -> Reply -> Either Failure CursorState
-- ^ Convert Reply to CursorState or Failure
-fromReply limit Reply{..} = if rResponseFlag == 0
- then Right (CS limit rCursorId rDocuments)
- else Left . ServerFailure $ "Query failure " ++ show rResponseFlag ++ " " ++ show rDocuments
-
-call' :: (Conn m) => [Notice] -> (Request, Limit) -> m CursorState'
--- ^ Send notices and request and return promised cursor state
-call' ns (req, remainingLimit) = do
- promise <- call ns req
- return $ Delayed (fmap (fromReply remainingLimit =<<) promise)
+fromReply limit Reply{..} = case mapMaybe fromResponseFlag rResponseFlags of
+ [] -> Right (CS limit rCursorId rDocuments)
+ err : _ -> Left err
+ where
+ fromResponseFlag :: ResponseFlag -> Maybe Failure
+ -- ^ If response flag indicate failure then Just Failure, otherwise Nothing
+ fromResponseFlag x = case x of
+ AwaitCapable -> Nothing
+ CursorNotFound -> Just . ServerFailure $ "Cursor " ++ show rCursorId ++ " not found"
+ QueryError -> Just . ServerFailure $ "Query failure " ++ show rDocuments
newCursor :: (Conn m) => Database -> Collection -> BatchSize -> CursorState' -> m Cursor
-- ^ Create new cursor. If you don't read all results then close it. Cursor will be closed automatically when all results are read from it or when eventually garbage collected.
@@ -432,9 +446,8 @@ next cursor = modifyCursorState' cursor nextState where
[] -> if cid == 0
then return (CursorState $ CS 0 0 [], Nothing) -- finished
else error $ "server returned empty batch but says more results on server"
- nextBatch fcol batch limit cid = let
- (batchSize, remLimit) = batchSizeRemainingLimit batch limit
- in call' [] (GetMore fcol batchSize cid, remLimit)
+ nextBatch fcol batch limit cid = call' [] (GetMore fcol batchSize cid, remLimit)
+ where (batchSize, remLimit) = batchSizeRemainingLimit batch limit
nextN :: (Conn m) => Int -> Cursor -> m [Document]
-- ^ Return next N documents or less if end is reached
@@ -454,17 +467,18 @@ instance (Conn m) => Resource m Cursor where
-- ** Group
+-- | Groups documents in collection by key then reduces (aggregates) each group
data Group = Group {
gColl :: Collection,
gKey :: GroupKey, -- ^ Fields to group by
- gReduce :: Javascript, -- ^ The reduce function aggregates (reduces) the objects iterated. Typical operations of a reduce function include summing and counting. reduce takes two arguments: the current document being iterated over and the aggregation value.
- gInitial :: Document, -- ^ Initial aggregation value supplied to reduce
+ gReduce :: Javascript, -- ^ @(doc, agg) -> ()@. The reduce function reduces (aggregates) the objects iterated. Typical operations of a reduce function include summing and counting. It takes two arguments, the current document being iterated over and the aggregation value, and updates the aggregate value.
+ gInitial :: Document, -- ^ @agg@. Initial aggregation value supplied to reduce
gCond :: Selector, -- ^ Condition that must be true for a row to be considered. [] means always true.
- gFinalize :: Maybe Javascript -- ^ An optional function to be run on each item in the result set just before the item is returned. Can either modify the item (e.g., add an average field given a count and a total) or return a replacement object (returning a new object with just _id and average fields).
+ gFinalize :: Maybe Javascript -- ^ @agg -> () | result@. An optional function to be run on each item in the result set just before the item is returned. Can either modify the item (e.g., add an average field given a count and a total) or return a replacement object (returning a new object with just _id and average fields).
} deriving (Show, Eq)
data GroupKey = Key [Label] | KeyF Javascript deriving (Show, Eq)
--- ^ Fields to group by, or function returning a "key object" to be used as the grouping key. Use this instead of key to specify a key that is not an existing member of the object (or, to access embedded members).
+-- ^ Fields to group by, or function (@doc -> key@) returning a "key object" to be used as the grouping key. Use KeyF instead of Key to specify a key that is not an existing member of the object (or, to access embedded members).
groupDocument :: Group -> Document
-- ^ Translate Group data into expected document form
@@ -482,12 +496,12 @@ group g = at "retval" <$> runCommand ["group" =: groupDocument g]
-- ** MapReduce
--- | Maps every document in collection to a (key, value) pair, then for each unique key reduces all its associated values to a result. Therefore, the final output is a list of (key, result) pairs, where every key is unique. This is the basic description. There are additional nuances that may be used. See <http://www.mongodb.org/display/DOCS/MapReduce> for details.
+-- | Maps every document in collection to a list of (key, value) pairs, then for each unique key reduces all its associated values from all lists to a single result. There are additional parameters that may be set to tweak this basic operation.
data MapReduce = MapReduce {
rColl :: Collection,
rMap :: MapFun,
rReduce :: ReduceFun,
- rSelect :: Selector, -- ^ Default is []
+ rSelect :: Selector, -- ^ Operate on only those documents selected. Default is [] meaning all documents.
rSort :: Order, -- ^ Default is [] meaning no sort
rLimit :: Limit, -- ^ Default is 0 meaning no limit
rOut :: Maybe Collection, -- ^ Output to given permanent collection, otherwise output to a new temporary collection whose name is returned.
@@ -498,10 +512,10 @@ data MapReduce = MapReduce {
} deriving (Show, Eq)
type MapFun = Javascript
--- ^ @() -> void@. The map function references the variable this to inspect the current object under consideration. A map function must call @emit(key,value)@ at least once, but may be invoked any number of times, as may be appropriate.
+-- ^ @() -> void@. The map function references the variable @this@ to inspect the current object under consideration. The function must call @emit(key,value)@ at least once, but may be invoked any number of times, as may be appropriate.
type ReduceFun = Javascript
--- ^ @(key, value_array) -> value@. The reduce function receives a key and an array of values. To use, reduce the received values, and return a result. The MapReduce engine may invoke reduce functions iteratively; thus, these functions must be idempotent. That is, the following must hold for your reduce function: @for all k, vals : reduce(k, [reduce(k,vals)]) == reduce(k,vals)@. If you need to perform an operation only once, use a finalize function. The output of emit (the 2nd param) and reduce should be the same format to make iterative reduce possible.
+-- ^ @(key, value_array) -> value@. The reduce function receives a key and an array of values and returns an aggregate result value. The MapReduce engine may invoke reduce functions iteratively; thus, these functions must be idempotent. That is, the following must hold for your reduce function: @for all k, vals : reduce(k, [reduce(k,vals)]) == reduce(k,vals)@. If you need to perform an operation only once, use a finalize function. The output of emit (the 2nd param) and reduce should be the same format to make iterative reduce possible.
type FinalizeFun = Javascript
-- ^ @(key, value) -> final_value@. A finalize function may be run after reduction. Such a function is optional and is not necessary for many map/reduce cases. The finalize function takes a key and a value, and returns a finalized value.
View
@@ -1,5 +1,5 @@
Name: mongoDB
-Version: 0.6
+Version: 0.6.1
License: OtherLicense
License-file: LICENSE
Maintainer: Tony Hannan <tony@10gen.com>
View
@@ -55,12 +55,12 @@ Open up a connection to your DB instance, using the standard port:
or for a non-standard port
- > Right con <- connect $ server "127.0.0.1" (PortNumber 666)
+ > Right con <- connect $ Server "127.0.0.1" (PortNumber 666)
*connect* returns Left IOError if connection fails. We are assuming above
it won't fail. If it does you will get a pattern match error.
-Task and Db monad
+Connected monad
-------------------
The current connection is held in a Connected monad, and the current database

0 comments on commit 6435bc3

Please sign in to comment.