Skip to content
Browse files

Single param Network instance. NetworkIO instance of IO that yields I…

…nternet so user does not have to supply it
  • Loading branch information...
1 parent 111d9a2 commit 73012bb430cfeacdbfd947a99e182a178a033af3 Tony Hannan committed Dec 27, 2010
View
13 Database/MongoDB.hs
@@ -6,20 +6,20 @@ Simple example below. Use with language extension /OvererloadedStrings/.
> {-# LANGUAGE OverloadedStrings #-}
>
> import Database.MongoDB
-> import Data.UString (u)
+> import Data.CompactString () -- Show and IsString instances of UString
> import Control.Monad.Trans (liftIO)
>
> main = do
-> pool <- newConnPool Internet 1 (host "127.0.0.1")
+> pool <- newConnPool 1 (host "127.0.0.1")
> e <- access safe Master pool run
> print e
>
> run = use (Database "baseball") $ do
> clearTeams
> insertTeams
-> print' "All Teams" =<< allTeams
-> print' "National League Teams" =<< nationalLeagueTeams
-> print' "New York Teams" =<< newYorkTeams
+> printDocs "All Teams" =<< allTeams
+> printDocs "National League Teams" =<< nationalLeagueTeams
+> printDocs "New York Teams" =<< newYorkTeams
>
> clearTeams = delete (select [] "team")
>
@@ -35,7 +35,8 @@ Simple example below. Use with language extension /OvererloadedStrings/.
>
> newYorkTeams = rest =<< find (select ["home.state" =: u"NY"] "team") {project = ["name" =: (1 :: Int), "league" =: (1 :: Int)]}
>
-> print' title docs = liftIO $ putStrLn title >> mapM_ (print . exclude ["_id"]) docs
+> printDocs title docs = liftIO $ putStrLn title >> mapM_ (print . exclude ["_id"]) docs
+>
-}
module Database.MongoDB (
View
10 Database/MongoDB/Admin.hs
@@ -31,7 +31,7 @@ import Database.MongoDB.Internal.Protocol (pwHash, pwKey)
import Database.MongoDB.Connection (Host, showHostPort)
import Database.MongoDB.Query
import Data.Bson
-import Data.UString (pack, unpack, append, intercalate)
+import Data.UString (pack, append, intercalate)
import Control.Monad.Reader
import qualified Data.HashTable as T
import Data.IORef
@@ -183,8 +183,8 @@ addUser :: (DbAccess m) => Bool -> Username -> Password -> m ()
-- ^ Add user with password with read-only access if bool is True or read-write access if bool is False
addUser readOnly user pass = do
mu <- findOne (select ["user" =: user] "system.users")
- let u = merge ["readOnly" =: readOnly, "pwd" =: pwHash user pass] (maybe ["user" =: user] id mu)
- save "system.users" u
+ let usr = merge ["readOnly" =: readOnly, "pwd" =: pwHash user pass] (maybe ["user" =: user] id mu)
+ save "system.users" usr
removeUser :: (DbAccess m) => Username -> m ()
removeUser user = delete (select ["user" =: user] "system.users")
@@ -205,9 +205,9 @@ copyDatabase (Database fromDb) fromHost mup (Database toDb) = do
let c = ["copydb" =: (1 :: Int), "fromhost" =: showHostPort fromHost, "fromdb" =: fromDb, "todb" =: toDb]
use admin $ case mup of
Nothing -> runCommand c
- Just (u, p) -> do
+ Just (usr, pss) -> do
n <- at "nonce" <$> runCommand ["copydbgetnonce" =: (1 :: Int), "fromhost" =: showHostPort fromHost]
- runCommand $ c ++ ["username" =: u, "nonce" =: n, "key" =: pwKey n u p]
+ runCommand $ c ++ ["username" =: usr, "nonce" =: n, "key" =: pwKey n usr pss]
dropDatabase :: (Access m) => Database -> m Document
-- ^ Delete the given database!
View
47 Database/MongoDB/Connection.hs
@@ -3,28 +3,28 @@
{-# LANGUAGE OverloadedStrings, ScopedTypeVariables, RecordWildCards, NamedFieldPuns, MultiParamTypeClasses, FlexibleContexts, TypeFamilies, DoRec, RankNTypes, FlexibleInstances #-}
module Database.MongoDB.Connection (
- -- * Network
- Network', ANetwork', Internet(..),
+ -- * Pipe
+ Pipe,
-- * Host
Host(..), PortID(..), host, showHostPort, readHostPort, readHostPortM,
-- * ReplicaSet
ReplicaSet(..), Name,
-- * MasterOrSlaveOk
MasterOrSlaveOk(..),
-- * Connection Pool
- Server(..), newConnPool',
+ Service(..),
connHost, replicaSet
) where
import Database.MongoDB.Internal.Protocol as X
-import Network.Abstract (IOE, connect, ANetwork(..))
+import qualified Network.Abstract as C
+import Network.Abstract (IOE, NetworkIO, ANetwork)
import Data.Bson ((=:), at, UString)
import Control.Pipeline as P
import Control.Applicative ((<$>))
import Control.Exception (assert)
import Control.Monad.Error
import Control.Monad.MVar
-import Control.Monad.Context
import Network (HostName, PortID(..))
import Data.Bson (Document, look)
import Text.ParserCombinators.Parsec as T (parse, many1, letter, digit, char, eof, spaces, try, (<|>))
@@ -148,26 +148,23 @@ isMS SlaveOk i = isSecondary i || isPrimary i -}
type Pool' = Pool IOError
--- | A Server is a single server ('Host') or a replica set of servers ('ReplicaSet')
-class Server t where
+-- | A Service is a single server ('Host') or a replica set of servers ('ReplicaSet')
+class Service t where
data ConnPool t
-- ^ A pool of TCP connections ('Pipe's) to a host or a replica set of hosts
- newConnPool :: (Network' n, MonadIO' m) => n -> Int -> t -> m (ConnPool t)
+ newConnPool :: (NetworkIO m) => Int -> t -> m (ConnPool t)
-- ^ Create a ConnectionPool to a host or a replica set of hosts. Actual TCP connection is not attempted until 'getPipe' request, so no IOError can be raised here. Up to N TCP connections will be established to each host.
getPipe :: MasterOrSlaveOk -> ConnPool t -> IOE Pipe
-- ^ Return a TCP connection (Pipe) to the master or a slave in the server. Master must connect to the master, SlaveOk may connect to a slave or master. To spread the load, SlaveOk requests are distributed amongst all hosts in the server. Throw IOError if failed to connect to right type of host (Master/SlaveOk).
killPipes :: ConnPool t -> IO ()
-- ^ Kill all open pipes (TCP Connections). Will cause any users of them to fail. Alternatively you can let them die on their own when they get garbage collected.
-newConnPool' :: (Server t, MonadIO' m, Context ANetwork' m) => Int -> t -> m (ConnPool t)
-newConnPool' poolSize' host' = context >>= \(ANetwork net :: ANetwork') -> newConnPool net poolSize' host'
-
-- ** ConnectionPool Host
-instance Server Host where
+instance Service Host where
data ConnPool Host = HostConnPool {connHost :: Host, connPool :: Pool' Pipe}
-- ^ A pool of TCP connections ('Pipe's) to a server, handed out in round-robin style.
- newConnPool net poolSize' host' = liftIO $ newHostConnPool (ANetwork net) poolSize' host'
+ newConnPool poolSize' host' = liftIO . newHostConnPool poolSize' host' =<< C.network
-- ^ Create a connection pool to server (host or replica set)
getPipe _ = getHostPipe
-- ^ Return a TCP connection (Pipe). If SlaveOk, connect to a slave if available. Round-robin if multiple slaves are available. Throw IOError if failed to connect.
@@ -176,9 +173,9 @@ instance Server Host where
instance Show (ConnPool Host) where
show HostConnPool{connHost} = "ConnPool " ++ show connHost
-newHostConnPool :: ANetwork' -> Int -> Host -> IO (ConnPool Host)
+newHostConnPool :: Int -> Host -> ANetwork -> IO (ConnPool Host)
-- ^ Create a pool of N 'Pipe's (TCP connections) to server. 'getHostPipe' will return one of those pipes, round-robin style.
-newHostConnPool net poolSize' host' = HostConnPool host' <$> newPool Factory{..} poolSize' where
+newHostConnPool poolSize' host' net = HostConnPool host' <$> newPool Factory{..} poolSize' where
newResource = tcpConnect net host'
killResource = P.close
isExpired = P.isClosed
@@ -187,18 +184,18 @@ getHostPipe :: ConnPool Host -> IOE Pipe
-- ^ Return next pipe (TCP connection) in connection pool, round-robin style. Throw IOError if can't connect to host.
getHostPipe (HostConnPool _ pool) = aResource pool
-tcpConnect :: ANetwork' -> Host -> IOE Pipe
+tcpConnect :: ANetwork -> Host -> IOE Pipe
-- ^ Create a TCP connection (Pipe) to the given host. Throw IOError if can't connect.
-tcpConnect net (Host hostname port) = newPipeline =<< connect net (hostname, port)
+tcpConnect net (Host hostname port) = newPipeline =<< C.connect net (C.Server hostname port)
-- ** Connection ReplicaSet
-instance Server ReplicaSet where
+instance Service ReplicaSet where
data ConnPool ReplicaSet = ReplicaSetConnPool {
- network :: ANetwork',
+ network :: ANetwork,
repsetName :: Name,
currentMembers :: MVar [ConnPool Host] } -- master at head after a refresh
- newConnPool net poolSize' repset = liftIO $ newSetConnPool (ANetwork net) poolSize' repset
+ newConnPool poolSize' repset = liftIO . newSetConnPool poolSize' repset =<< C.network
getPipe = getSetPipe
killPipes ReplicaSetConnPool{..} = withMVar currentMembers (mapM_ killPipes)
@@ -209,24 +206,24 @@ replicaSet :: (MonadIO' m) => ConnPool ReplicaSet -> m ReplicaSet
-- ^ Return replicas set name with current members as seed list
replicaSet ReplicaSetConnPool{..} = ReplicaSet repsetName . map connHost <$> readMVar currentMembers
-newSetConnPool :: ANetwork' -> Int -> ReplicaSet -> IO (ConnPool ReplicaSet)
+newSetConnPool :: Int -> ReplicaSet -> ANetwork -> IO (ConnPool ReplicaSet)
-- ^ Create a connection pool to each member of the replica set.
-newSetConnPool net poolSize' repset = assert (not . null $ seedHosts repset) $ do
- currentMembers <- newMVar =<< mapM (newHostConnPool net poolSize') (seedHosts repset)
+newSetConnPool poolSize' repset net = assert (not . null $ seedHosts repset) $ do
+ currentMembers <- newMVar =<< mapM (\h -> newHostConnPool poolSize' h net) (seedHosts repset)
return $ ReplicaSetConnPool net (setName repset) currentMembers
getMembers :: Name -> [ConnPool Host] -> IOE [Host]
-- ^ Get members of replica set, master first. Query supplied connections until config found.
-- TODO: Verify config for request replica set name and not some other replica set. ismaster config should include replica set name in result but currently does not.
getMembers _repsetName connections = hosts <$> untilSuccess (getReplicaInfo <=< getHostPipe) connections
-refreshMembers :: ANetwork' -> Name -> [ConnPool Host] -> IOE [ConnPool Host]
+refreshMembers :: ANetwork -> Name -> [ConnPool Host] -> IOE [ConnPool Host]
-- ^ Update current members with master at head. Reuse unchanged members. Throw IOError if can't connect to any and fetch config. Dropped connections are not closed in case they still have users; they will be closed when garbage collected.
refreshMembers net repsetName connections = do
n <- liftIO . poolSize . connPool $ head connections
mapM (liftIO . connection n) =<< getMembers repsetName connections
where
- connection n host' = maybe (newHostConnPool net n host') return mc where
+ connection n host' = maybe (newHostConnPool n host' net) return mc where
mc = find ((host' ==) . connHost) connections
View
82 Database/MongoDB/Internal/Protocol.hs
@@ -5,8 +5,6 @@ This module is not intended for direct use. Use the high-level interface at "Dat
{-# LANGUAGE RecordWildCards, StandaloneDeriving, OverloadedStrings, FlexibleContexts, TupleSections, TypeSynonymInstances, MultiParamTypeClasses, FlexibleInstances, UndecidableInstances #-}
module Database.MongoDB.Internal.Protocol (
- -- * Network
- Network', ANetwork', Internet(..),
-- * Pipe
Pipe, send, call,
-- * Message
@@ -24,7 +22,6 @@ module Database.MongoDB.Internal.Protocol (
import Prelude as X
import Control.Applicative ((<$>))
import Control.Arrow ((***))
-import System.IO (Handle)
import Data.ByteString.Lazy as B (length, hPut)
import qualified Control.Pipeline as P
import Data.Bson (Document, UString)
@@ -40,58 +37,13 @@ import Data.UString as U (pack, append, toByteString)
import System.IO.Error as E (try)
import Control.Monad.Error
import Control.Monad.Util (whenJust)
-import Network.Abstract (IOE, ANetwork, Network(..), Connection(Connection))
-import Network (connectTo)
-import System.IO (hFlush, hClose)
+import Network.Abstract hiding (send)
+import System.IO (hFlush)
import Database.MongoDB.Internal.Util (hGetN, bitOr)
--- * Network
-
-- Network -> Server -> (Sink, Source)
-- (Sink, Source) -> Pipeline
-type Message = ([Notice], Maybe (Request, RequestId))
--- ^ Write notice(s), write notice(s) with getLastError request, or just query request
--- Note, that requestId will be out of order because request ids will be generated for notices, after the request id supplied was generated. This is ok because the mongo server does not care about order they are just used as unique identifiers.
-
-type Response = (ResponseTo, Reply)
-
-class (Network n Message Response) => Network' n
-instance (Network n Message Response) => Network' n
-
-type ANetwork' = ANetwork Message Response
-
-data Internet = Internet
--- ^ Normal Network instance, i.e. no logging or replay
-
--- | Connect to server. Write messages and receive replies; not thread-safe!
-instance Network Internet Message Response where
- connect _ (hostname, portid) = ErrorT . E.try $ do
- handle <- connectTo hostname portid
- return $ Connection (sink handle) (source handle) (hClose handle)
- where
- sink h (notices, mRequest) = ErrorT . E.try $ do
- forM_ notices $ \n -> writeReq h . (Left n,) =<< genRequestId
- whenJust mRequest $ writeReq h . (Right *** id)
- hFlush h
- source h = ErrorT . E.try $ readResp h
-
-writeReq :: Handle -> (Either Notice Request, RequestId) -> IO ()
-writeReq handle (e, requestId) = do
- hPut handle lenBytes
- hPut handle bytes
- where
- bytes = runPut $ (either putNotice putRequest e) requestId
- lenBytes = encodeSize . toEnum . fromEnum $ B.length bytes
- encodeSize = runPut . putInt32 . (+ 4)
-
-readResp :: Handle -> IO (ResponseTo, Reply)
-readResp handle = do
- len <- fromEnum . decodeSize <$> hGetN handle 4
- runGet getReply <$> hGetN handle len
- where
- decodeSize = subtract 4 . runGet getInt32
-
-- * Pipe
type Pipe = P.Pipeline Message Response
@@ -111,7 +63,35 @@ call pipe notices request = do
check requestId (responseTo, reply) = if requestId == responseTo then reply else
error $ "expected response id (" ++ show responseTo ++ ") to match request id (" ++ show requestId ++ ")"
--- * Messages
+-- * Message
+
+type Message = ([Notice], Maybe (Request, RequestId))
+-- ^ A write notice(s), write notice(s) with getLastError request, or just query request.
+-- Note, that requestId will be out of order because request ids will be generated for notices after the request id supplied was generated. This is ok because the mongo server does not care about order just uniqueness.
+
+instance WriteMessage Message where
+ writeMessage handle (notices, mRequest) = ErrorT . E.try $ do
+ forM_ notices $ \n -> writeReq . (Left n,) =<< genRequestId
+ whenJust mRequest $ writeReq . (Right *** id)
+ hFlush handle
+ where
+ writeReq (e, requestId) = do
+ hPut handle lenBytes
+ hPut handle bytes
+ where
+ bytes = runPut $ (either putNotice putRequest e) requestId
+ lenBytes = encodeSize . toEnum . fromEnum $ B.length bytes
+ encodeSize = runPut . putInt32 . (+ 4)
+
+type Response = (ResponseTo, Reply)
+-- ^ Message received from a Mongo server in response to a Request
+
+instance ReadMessage Response where
+ readMessage handle = ErrorT . E.try $ readResp where
+ readResp = do
+ len <- fromEnum . decodeSize <$> hGetN handle 4
+ runGet getReply <$> hGetN handle len
+ decodeSize = subtract 4 . runGet getInt32
type FullCollection = UString
-- ^ Database name and collection name with period (.) in between. Eg. \"myDb.myCollection\"
View
10 Database/MongoDB/Query.hs
@@ -49,12 +49,12 @@ import Control.Monad.Throw
import Control.Monad.MVar
import qualified Database.MongoDB.Internal.Protocol as P
import Database.MongoDB.Internal.Protocol hiding (Query, QueryOption(..), send, call)
-import Database.MongoDB.Connection (MasterOrSlaveOk(..), Server(..))
+import Database.MongoDB.Connection (MasterOrSlaveOk(..), Service(..))
import Data.Bson
import Data.Word
import Data.Int
import Data.Maybe (listToMaybe, catMaybes)
-import Data.UString as U (dropWhile, any, tail, unpack)
+import Data.UString as U (dropWhile, any, tail)
import Control.Monad.Util (MonadIO', loop)
import Database.MongoDB.Internal.Util ((<.>), true1)
@@ -63,7 +63,7 @@ mapErrorIO f = throwLeft' f . liftIO . runErrorT
-- * Mongo Monad
-access :: (Server s, MonadIO m) => WriteMode -> MasterOrSlaveOk -> ConnPool s -> Action m a -> m (Either Failure a)
+access :: (Service s, MonadIO m) => WriteMode -> MasterOrSlaveOk -> ConnPool s -> Action m a -> m (Either Failure a)
-- ^ Run action under given write and read mode against the server or replicaSet behind given connection pool. Return Left Failure if there is a connection failure or read/write error.
access w mos pool act = do
ePipe <- liftIO . runErrorT $ getPipe mos pool
@@ -124,9 +124,9 @@ thisDatabase = context
auth :: (DbAccess m) => Username -> Password -> m Bool
-- ^ Authenticate with the database (if server is running in secure mode). Return whether authentication was successful or not. Reauthentication is required for every new pipe.
-auth u p = do
+auth usr pss = do
n <- at "nonce" <$> runCommand ["getnonce" =: (1 :: Int)]
- true1 "ok" <$> runCommand ["authenticate" =: (1 :: Int), "user" =: u, "nonce" =: n, "key" =: pwKey n u p]
+ true1 "ok" <$> runCommand ["authenticate" =: (1 :: Int), "user" =: usr, "nonce" =: n, "key" =: pwKey n usr pss]
-- * Collection
View
46 Network/Abstract.hs
@@ -1,32 +1,62 @@
-- | Generalize a network connection to a sink and source
-{-# LANGUAGE MultiParamTypeClasses, ExistentialQuantification, FlexibleContexts, FlexibleInstances, UndecidableInstances #-}
+{-# LANGUAGE MultiParamTypeClasses, ExistentialQuantification, FlexibleContexts, FlexibleInstances, OverlappingInstances, UndecidableInstances #-}
module Network.Abstract where
-import Network (HostName, PortID)
+import System.IO (Handle, hClose)
+import Network (HostName, PortID, connectTo)
import Control.Monad.Error
+import System.IO.Error (try)
+import Control.Monad.Context
+import Control.Monad.Util (MonadIO')
type IOE = ErrorT IOError IO
+-- ^ Be explicit about exception that may be raised.
-type Server = (HostName, PortID)
+data Server i o = Server HostName PortID
+-- ^ A server receives messages of type i and returns messages of type o.
+
+-- | Serialize message over handle
+class WriteMessage i where
+ writeMessage :: Handle -> i -> IOE ()
+
+-- | Deserialize message from handle
+class ReadMessage o where
+ readMessage :: Handle -> IOE o
-- | A network controls connections to other hosts. It may want to overide to log messages or play them back.
--- A server in the network accepts messages of type i and generates messages of type o.
-class Network n i o where
- connect :: n -> Server -> IOE (Connection i o)
+class Network n where
+ connect :: (WriteMessage i, ReadMessage o) => n -> Server i o -> IOE (Connection i o)
-- ^ Connect to Server returning the send sink and receive source, throw IOError if can't connect.
data Connection i o = Connection {
send :: i -> IOE (),
receive :: IOE o,
close :: IO () }
-data ANetwork i o = forall n. (Network n i o) => ANetwork n
+data ANetwork = forall n. (Network n) => ANetwork n
-instance Network (ANetwork i o) i o where
+instance Network (ANetwork) where
connect (ANetwork n) = connect n
+data Internet = Internet
+-- ^ Normal Network instance, i.e. no logging or replay
+
+-- | Connect to server. Write messages and receive replies. Not thread-safe, must be wrapped in Pipeline or something.
+instance Network Internet where
+ connect _ (Server hostname portid) = ErrorT . try $ do
+ handle <- connectTo hostname portid
+ return $ Connection (writeMessage handle) (readMessage handle) (hClose handle)
+
+class (MonadIO' m) => NetworkIO m where
+ network :: m ANetwork
+
+instance (Context ANetwork m, MonadIO' m) => NetworkIO m where
+ network = context
+
+instance NetworkIO IO where
+ network = return (ANetwork Internet)
{- Authors: Tony Hannan <tony@10gen.com>
Copyright 2010 10gen Inc.
View
1 TODO
@@ -10,6 +10,7 @@ Bson
MongoDB
-------
++ Support MapReduce 1.8 version
+ When one connection in a pool fails, close all other since they will likely fail too
+ on insert/update: reject keys that start with "$" or "."
+ dereference dbref
View
3 map-reduce-example.md
@@ -19,7 +19,8 @@ map/reduce queries on:
Prelude> :set prompt "> "
> :set -XOverloadedStrings
> import Database.MongoDB
- > conn <- newConnPool Internet 1 (host "localhost")
+ > import Data.CompactString ()
+ > conn <- newConnPool 1 (host "localhost")
> let run act = access safe Master conn $ use (Database "test") act
> :{
run $ insertMany "mr1" [
View
5 mongoDB.cabal
@@ -1,5 +1,5 @@
name: mongoDB
-version: 0.9
+version: 0.9.1
build-type: Simple
license: OtherLicense
license-file: LICENSE
@@ -16,7 +16,8 @@ build-depends:
nano-md5 -any,
network -any,
parsec -any,
- random -any
+ random -any,
+ compact-string-fix -any
stability: alpha
homepage: http://github.com/TonyGen/mongoDB-haskell
package-url:
View
11 tutorial.md
@@ -41,26 +41,23 @@ Import the MongoDB driver library, and set
OverloadedStrings so literal strings are converted to UTF-8 automatically.
> import Database.MongoDB
+ > import Data.CompactString ()
> :set -XOverloadedStrings
Making A Connection
-------------------
Create a connection pool for your mongo server, using the standard port (27017):
- > pool <- newConnPool Internet 1 $ host "127.0.0.1"
+ > pool <- newConnPool 1 $ host "127.0.0.1"
or for a non-standard port
- > pool <- newConnPool Internet 1 $ Host "127.0.0.1" (PortNumber 30000)
+ > pool <- newConnPool 1 $ Host "127.0.0.1" (PortNumber 30000)
-*newConnPool* takes the *network*, the connection pool size, and the host to connect to. It returns
+*newConnPool* takes the connection pool size, and the host to connect to. It returns
a *ConnPool*, which is a potential pool of TCP connections. They are not created until first
access, so it is not possible to get a connection error here.
-The network parameter allows you to override normal communications to, for example, log
-or replay messages sent and received from servers. *Internet* is the normal communication mode
-with no logging/replay.
-
Note, plain IO code in this driver never raises an exception unless it invokes third party IO
code that does. Driver code that may throw an exception says so in its Monad type,
for example, *ErrorT IOError IO a*.

0 comments on commit 73012bb

Please sign in to comment.
Something went wrong with that request. Please try again.