Skip to content

Commit

Permalink
Handle response flags correctly, plus some comment changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Tony Hannan committed Jul 3, 2010
1 parent b03763b commit 6435bc3
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 41 deletions.
40 changes: 39 additions & 1 deletion Database/MongoDB.hs
@@ -1,4 +1,42 @@
-- | Client interface to MongoDB server(s)
{- |
Client interface to MongoDB server(s).
Simple example:
>
> {-# LANGUAGE OverloadedStrings #-}
>
> import Database.MongoDB
>
> main = do
> e <- connect (server "127.0.0.1")
> conn <- either (fail . show) return e
> e <- runConn run conn
> either (fail . show) return e
>
> run = useDb "baseball" $ do
> clearTeams
> insertTeams
> print' "All Teams" =<< allTeams
> print' "National League Teams" =<< nationalLeagueTeams
> print' "New York Teams" =<< newYorkTeams
>
> clearTeams = delete (select [] "team")
>
> insertTeams = insertMany "team" [
> ["name" =: "Yankees", "home" =: ["city" =: "New York", "state" =: "NY"], "league" =: "American"],
> ["name" =: "Mets", "home" =: ["city" =: "New York", "state" =: "NY"], "league" =: "National"],
> ["name" =: "Phillies", "home" =: ["city" =: "Philadelphia", "state" =: "PA"], "league" =: "National"],
> ["name" =: "Red Sox", "home" =: ["city" =: "Boston", "state" =: "MA"], "league" =: "American"] ]
>
> allTeams = rest =<< find (select [] "team") {sort = ["city" =: 1]}
>
> nationalLeagueTeams = rest =<< find (select ["league" =: "National"] "team")
>
> newYorkTeams = rest =<< find (select ["home.state" =: "NY"] "team") {project = ["name" =: 1, "league" =: 1]}
>
> print' title docs = liftIO $ putStrLn title >> mapM_ print docs
-}

module Database.MongoDB (
module Data.Bson,
Expand Down
20 changes: 17 additions & 3 deletions Database/MongoDB/Internal/Protocol.hs
Expand Up @@ -15,7 +15,7 @@ module Database.MongoDB.Internal.Protocol (
-- ** Request
Request(..), QueryOption(..),
-- ** Reply
Reply(..),
Reply(..), ResponseFlag(..),
-- * Authentication
Username, Password, Nonce, pwHash, pwKey
) where
Expand Down Expand Up @@ -238,12 +238,18 @@ qBits = bitOr . map qBit

-- | A reply is a message received in response to a 'Request'
data Reply = Reply {
rResponseFlag :: Int32, -- ^ 0 = success, non-zero = failure
rResponseFlags :: [ResponseFlag],
rCursorId :: CursorId, -- ^ 0 = cursor finished
rStartingFrom :: Int32,
rDocuments :: [Document]
} deriving (Show, Eq)

data ResponseFlag =
CursorNotFound -- ^ Set when getMore is called but the cursor id is not valid at the server. Returned with zero results.
| QueryError -- ^ Server error. Results contains one document containing an "$err" field holding the error message.
| AwaitCapable -- ^ For backward compatability: Set when the server supports the AwaitData query option. if it doesn't, a replica slave client should sleep a little between getMore's
deriving (Show, Eq, Enum)

-- * Binary format

replyOpcode :: Opcode
Expand All @@ -253,13 +259,21 @@ getReply :: Get (ResponseTo, Reply)
getReply = do
(opcode, responseTo) <- getHeader
unless (opcode == replyOpcode) $ fail $ "expected reply opcode (1) but got " ++ show opcode
rResponseFlag <- getInt32
rResponseFlags <- rFlags <$> getInt32
rCursorId <- getInt64
rStartingFrom <- getInt32
numDocs <- fromIntegral <$> getInt32
rDocuments <- replicateM numDocs getDocument
return (responseTo, Reply{..})

rFlags :: Int32 -> [ResponseFlag]
rFlags bits = filter (testBit bits . rBit) [CursorNotFound ..]

rBit :: ResponseFlag -> Int
rBit CursorNotFound = 0
rBit QueryError = 1
rBit AwaitCapable = 3

-- * Authentication

type Username = UString
Expand Down
82 changes: 48 additions & 34 deletions Database/MongoDB/Query.hs
@@ -1,10 +1,10 @@
-- | Query and update documents residing on a MongoDB server(s)

{-# LANGUAGE OverloadedStrings, RecordWildCards, NamedFieldPuns, TupleSections, FlexibleContexts, FlexibleInstances, UndecidableInstances, MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings, RecordWildCards, NamedFieldPuns, TupleSections, FlexibleContexts, FlexibleInstances, UndecidableInstances, MultiParamTypeClasses, GeneralizedNewtypeDeriving #-}

module Database.MongoDB.Query (
-- * Connection
Failure(..), Conn, Connected, runConn,
Connected, runConn, Conn, Failure(..),
-- * Database
Database, allDatabases, DbConn, useDb, thisDatabase,
-- ** Authentication
Expand Down Expand Up @@ -52,11 +52,22 @@ import Database.MongoDB.Internal.Protocol hiding (Query, send, call)
import Data.Bson
import Data.Word
import Data.Int
import Data.Maybe (listToMaybe, catMaybes)
import Data.Maybe (listToMaybe, catMaybes, mapMaybe)
import Data.UString as U (dropWhile, any, tail)
import Database.MongoDB.Internal.Util (loop, (<.>), true1) -- plus Applicative instances of ErrorT & ReaderT

-- * Connection
-- * Connected

newtype Connected m a = Connected (ErrorT Failure (ReaderT WriteMode (ReaderT Connection m)) a)
deriving (Context Connection, Context WriteMode, MonadError Failure, MonadIO, Monad, Applicative, Functor)
-- ^ Monad with access to a 'Connection' and 'WriteMode' and throws a 'Failure' on connection or server failure

instance MonadTrans Connected where
lift = Connected . lift . lift . lift

runConn :: Connected m a -> Connection -> m (Either Failure a)
-- ^ Run action with access to connection. Return Left Failure if connection or server fails during execution.
runConn (Connected action) = runReaderT (runReaderT (runErrorT action) Unsafe)

-- | A monad with access to a 'Connection' and 'WriteMode' and throws a 'Failure' on connection or server failure
class (Context Connection m, Context WriteMode m, MonadError Failure m, MonadIO m, Applicative m, Functor m) => Conn m
Expand All @@ -72,12 +83,6 @@ data Failure =

instance Error Failure where strMsg = ServerFailure

type Connected m = ErrorT Failure (ReaderT WriteMode (ReaderT Connection m))

runConn :: Connected m a -> Connection -> m (Either Failure a)
-- ^ Run action with access to connection. Return Left Failure if connection or server fails during execution.
runConn action = runReaderT (runReaderT (runErrorT action) Unsafe)

send :: (Conn m) => [Notice] -> m ()
-- ^ Send notices as a contiguous batch to server with no reply. Raise Failure if connection fails.
send ns = do
Expand Down Expand Up @@ -298,7 +303,7 @@ type Limit = Word32
-- ^ Maximum number of documents to return, i.e. cursor will close after iterating over this number of documents. 0 means no limit.

type Order = Document
-- ^ Fields to sort by. Each one is associated with 1 or -1. Eg. @[x =: 1, y =: (-1)]@ means sort by @x@ ascending then @y@ descending
-- ^ Fields to sort by. Each one is associated with 1 or -1. Eg. @[x =: 1, y =: -1]@ means sort by @x@ ascending then @y@ descending

type BatchSize = Word32
-- ^ The number of document to return in each batch response from the server. 0 means use Mongo default.
Expand All @@ -310,10 +315,12 @@ query sel col = Query [] (Select sel col) [] 0 0 [] False 0 []
batchSizeRemainingLimit :: BatchSize -> Limit -> (Int32, Limit)
-- ^ Given batchSize and limit return P.qBatchSize and remaining limit
batchSizeRemainingLimit batchSize limit = if limit == 0
then (fromIntegral batchSize, 0) -- no limit
else if 0 < batchSize && batchSize < limit
then (fromIntegral batchSize, limit - batchSize)
then (fromIntegral batchSize', 0) -- no limit
else if 0 < batchSize' && batchSize' < limit
then (fromIntegral batchSize', limit - batchSize')
else (- fromIntegral limit, 1)
where batchSize' = if batchSize == 1 then 2 else batchSize
-- batchSize 1 is broken because server converts 1 to -1 meaning limit 1

queryRequest :: Bool -> Query -> Database -> (Request, Limit)
-- ^ Translate Query to Protocol.Query. If first arg is true then add special $explain attribute.
Expand All @@ -334,6 +341,12 @@ runQuery :: (DbConn m) => Bool -> [Notice] -> Query -> m CursorState'
-- ^ Send query request and return cursor state
runQuery isExplain ns q = call' ns . queryRequest isExplain q =<< thisDatabase

call' :: (Conn m) => [Notice] -> (Request, Limit) -> m CursorState'
-- ^ Send notices and request and return promised cursor state
call' ns (req, remainingLimit) = do
promise <- call ns req
return $ Delayed (fmap (fromReply remainingLimit =<<) promise)

find :: (DbConn m) => Query -> m Cursor
-- ^ Fetch documents satisfying query
find q@Query{selection, batchSize} = do
Expand Down Expand Up @@ -397,15 +410,16 @@ data CursorState = CS Limit CursorId [Document]

fromReply :: Limit -> Reply -> Either Failure CursorState
-- ^ Convert Reply to CursorState or Failure
fromReply limit Reply{..} = if rResponseFlag == 0
then Right (CS limit rCursorId rDocuments)
else Left . ServerFailure $ "Query failure " ++ show rResponseFlag ++ " " ++ show rDocuments

call' :: (Conn m) => [Notice] -> (Request, Limit) -> m CursorState'
-- ^ Send notices and request and return promised cursor state
call' ns (req, remainingLimit) = do
promise <- call ns req
return $ Delayed (fmap (fromReply remainingLimit =<<) promise)
fromReply limit Reply{..} = case mapMaybe fromResponseFlag rResponseFlags of
[] -> Right (CS limit rCursorId rDocuments)
err : _ -> Left err
where
fromResponseFlag :: ResponseFlag -> Maybe Failure
-- ^ If response flag indicate failure then Just Failure, otherwise Nothing
fromResponseFlag x = case x of
AwaitCapable -> Nothing
CursorNotFound -> Just . ServerFailure $ "Cursor " ++ show rCursorId ++ " not found"
QueryError -> Just . ServerFailure $ "Query failure " ++ show rDocuments

newCursor :: (Conn m) => Database -> Collection -> BatchSize -> CursorState' -> m Cursor
-- ^ Create new cursor. If you don't read all results then close it. Cursor will be closed automatically when all results are read from it or when eventually garbage collected.
Expand All @@ -432,9 +446,8 @@ next cursor = modifyCursorState' cursor nextState where
[] -> if cid == 0
then return (CursorState $ CS 0 0 [], Nothing) -- finished
else error $ "server returned empty batch but says more results on server"
nextBatch fcol batch limit cid = let
(batchSize, remLimit) = batchSizeRemainingLimit batch limit
in call' [] (GetMore fcol batchSize cid, remLimit)
nextBatch fcol batch limit cid = call' [] (GetMore fcol batchSize cid, remLimit)
where (batchSize, remLimit) = batchSizeRemainingLimit batch limit

nextN :: (Conn m) => Int -> Cursor -> m [Document]
-- ^ Return next N documents or less if end is reached
Expand All @@ -454,17 +467,18 @@ instance (Conn m) => Resource m Cursor where

-- ** Group

-- | Groups documents in collection by key then reduces (aggregates) each group
data Group = Group {
gColl :: Collection,
gKey :: GroupKey, -- ^ Fields to group by
gReduce :: Javascript, -- ^ The reduce function aggregates (reduces) the objects iterated. Typical operations of a reduce function include summing and counting. reduce takes two arguments: the current document being iterated over and the aggregation value.
gInitial :: Document, -- ^ Initial aggregation value supplied to reduce
gReduce :: Javascript, -- ^ @(doc, agg) -> ()@. The reduce function reduces (aggregates) the objects iterated. Typical operations of a reduce function include summing and counting. It takes two arguments, the current document being iterated over and the aggregation value, and updates the aggregate value.
gInitial :: Document, -- ^ @agg@. Initial aggregation value supplied to reduce
gCond :: Selector, -- ^ Condition that must be true for a row to be considered. [] means always true.
gFinalize :: Maybe Javascript -- ^ An optional function to be run on each item in the result set just before the item is returned. Can either modify the item (e.g., add an average field given a count and a total) or return a replacement object (returning a new object with just _id and average fields).
gFinalize :: Maybe Javascript -- ^ @agg -> () | result@. An optional function to be run on each item in the result set just before the item is returned. Can either modify the item (e.g., add an average field given a count and a total) or return a replacement object (returning a new object with just _id and average fields).
} deriving (Show, Eq)

data GroupKey = Key [Label] | KeyF Javascript deriving (Show, Eq)
-- ^ Fields to group by, or function returning a "key object" to be used as the grouping key. Use this instead of key to specify a key that is not an existing member of the object (or, to access embedded members).
-- ^ Fields to group by, or function (@doc -> key@) returning a "key object" to be used as the grouping key. Use KeyF instead of Key to specify a key that is not an existing member of the object (or, to access embedded members).

groupDocument :: Group -> Document
-- ^ Translate Group data into expected document form
Expand All @@ -482,12 +496,12 @@ group g = at "retval" <$> runCommand ["group" =: groupDocument g]

-- ** MapReduce

-- | Maps every document in collection to a (key, value) pair, then for each unique key reduces all its associated values to a result. Therefore, the final output is a list of (key, result) pairs, where every key is unique. This is the basic description. There are additional nuances that may be used. See <http://www.mongodb.org/display/DOCS/MapReduce> for details.
-- | Maps every document in collection to a list of (key, value) pairs, then for each unique key reduces all its associated values from all lists to a single result. There are additional parameters that may be set to tweak this basic operation.
data MapReduce = MapReduce {
rColl :: Collection,
rMap :: MapFun,
rReduce :: ReduceFun,
rSelect :: Selector, -- ^ Default is []
rSelect :: Selector, -- ^ Operate on only those documents selected. Default is [] meaning all documents.
rSort :: Order, -- ^ Default is [] meaning no sort
rLimit :: Limit, -- ^ Default is 0 meaning no limit
rOut :: Maybe Collection, -- ^ Output to given permanent collection, otherwise output to a new temporary collection whose name is returned.
Expand All @@ -498,10 +512,10 @@ data MapReduce = MapReduce {
} deriving (Show, Eq)

type MapFun = Javascript
-- ^ @() -> void@. The map function references the variable this to inspect the current object under consideration. A map function must call @emit(key,value)@ at least once, but may be invoked any number of times, as may be appropriate.
-- ^ @() -> void@. The map function references the variable @this@ to inspect the current object under consideration. The function must call @emit(key,value)@ at least once, but may be invoked any number of times, as may be appropriate.

type ReduceFun = Javascript
-- ^ @(key, value_array) -> value@. The reduce function receives a key and an array of values. To use, reduce the received values, and return a result. The MapReduce engine may invoke reduce functions iteratively; thus, these functions must be idempotent. That is, the following must hold for your reduce function: @for all k, vals : reduce(k, [reduce(k,vals)]) == reduce(k,vals)@. If you need to perform an operation only once, use a finalize function. The output of emit (the 2nd param) and reduce should be the same format to make iterative reduce possible.
-- ^ @(key, value_array) -> value@. The reduce function receives a key and an array of values and returns an aggregate result value. The MapReduce engine may invoke reduce functions iteratively; thus, these functions must be idempotent. That is, the following must hold for your reduce function: @for all k, vals : reduce(k, [reduce(k,vals)]) == reduce(k,vals)@. If you need to perform an operation only once, use a finalize function. The output of emit (the 2nd param) and reduce should be the same format to make iterative reduce possible.

type FinalizeFun = Javascript
-- ^ @(key, value) -> final_value@. A finalize function may be run after reduction. Such a function is optional and is not necessary for many map/reduce cases. The finalize function takes a key and a value, and returns a finalized value.
Expand Down
2 changes: 1 addition & 1 deletion mongoDB.cabal
@@ -1,5 +1,5 @@
Name: mongoDB
Version: 0.6
Version: 0.6.1
License: OtherLicense
License-file: LICENSE
Maintainer: Tony Hannan <tony@10gen.com>
Expand Down
4 changes: 2 additions & 2 deletions tutorial.md
Expand Up @@ -55,12 +55,12 @@ Open up a connection to your DB instance, using the standard port:

or for a non-standard port

> Right con <- connect $ server "127.0.0.1" (PortNumber 666)
> Right con <- connect $ Server "127.0.0.1" (PortNumber 666)

*connect* returns Left IOError if connection fails. We are assuming above
it won't fail. If it does you will get a pattern match error.

Task and Db monad
Connected monad
-------------------

The current connection is held in a Connected monad, and the current database
Expand Down

0 comments on commit 6435bc3

Please sign in to comment.