Permalink
Browse files

Add support for asynchronous notifications

  • Loading branch information...
1 parent 0605d26 commit f41e260db5b5b712d37f4bf3f1111a8e8cc1337f @lpsmith committed Nov 14, 2011
View
@@ -26,6 +26,7 @@ library
Database.PostgreSQL.Simple.QueryResults
Database.PostgreSQL.Simple.Result
Database.PostgreSQL.Simple.Types
+ Database.PostgreSQL.Simple.Notification
other-modules:
Database.PostgreSQL.Base
Database.PostgreSQL.Base.Types
@@ -89,14 +89,15 @@ connect connectInfo@ConnectInfo{..} = liftIO $ withSocketsDo $ do
throttle <- newMVar ()
request <- newEmptyMVar
(source, sink) <- newEdge
+ notifications <- newSink
handle <- connectTo connectHost (PortNumber $ fromIntegral connectPort)
hSetBuffering handle NoBuffering
_ <- forkIO (connectionRouter event throttle sink handle)
_ <- forkIO (requestListener event throttle request)
- _ <- forkIO (responseListener event handle)
+ _ <- forkIO (responseListener event handle notifications)
_ <- forkIO (connectionSender source handle)
types <- newMVar M.empty
- let conn = Connection request event types
+ let conn = Connection request event notifications types
authenticate conn connectInfo
return conn
@@ -118,16 +119,17 @@ requestListener event throttle request = loop
-- but async notices and async notifications are routed by the
-- responseListener process directly.
-responseListener :: MVar Event -> Handle -> IO a
-responseListener event handle = loop
+responseListener :: MVar Event -> Handle -> Sink Notification -> IO a
+responseListener event handle notifications = loop
where
loop = do
rspMsg <- getMessage_ handle `catch` \(e :: ConnectionClosed) -> do
putMVar event (Disconnected e)
throwIO e
- -- FIXME: detect async notices and notifications here and route
- -- them elsewhere
- putMVar event (Response rspMsg)
+ case rspMsg of
+ RspMsg 'A' msg -> writeSink notifications (getNotification msg)
+ RspMsg 'N' _msg -> return () -- FIXME: do something with async notices
+ RspMsg _ _msg -> putMVar event (Response rspMsg)
loop
-- | the ConnectionSender writes data to the network
@@ -176,7 +178,7 @@ writeAndCloseSinkVector arr idx a e = do
--
-- 1. Hooking up new requests to the connectionSender process
--
--- 2. Throttling the rate of requests, so that we don't send
+-- 2. Throttling the rate of requests, so that we don't send
-- requests faster than the database can respond.
--
-- 3. Tracking new requests so that it can determine where
@@ -196,7 +198,7 @@ writeAndCloseSinkVector arr idx a e = do
-- closes the connection as well. But in most cases, a soft shutdown
-- is what you want.
-connectionRouter :: MVar Event -> MVar () -> Sink (Source ReqMsg) -> Handle
+connectionRouter :: MVar Event -> MVar () -> Sink (Source ReqMsg) -> Handle
-> IO a
connectionRouter event throttle sender handle = do
sinks <- V.new maxActiveRequests
@@ -212,7 +214,7 @@ connectionRouter event throttle sender handle = do
loop reqIdx idx
Response msg@(RspMsg 'Z' _block) -> do
-- print msg
- writeAndCloseSinkVector sinks reqIdx msg
+ writeAndCloseSinkVector sinks reqIdx msg
InternalException
_ <- tryPutMVar throttle ()
loop (nextIdx reqIdx) maxIdx
@@ -221,6 +223,7 @@ connectionRouter event throttle sender handle = do
writeSinkVector sinks reqIdx msg
loop reqIdx maxIdx
Disconnected _ -> do
+ -- FIXME: handle lost connections properly
putStrLn "disconnect"
closed reqIdx maxIdx
closed reqIdx maxIdx = do
@@ -455,6 +458,17 @@ setCommandTag block = do
tag = words . concat . map toString . L.toChunks . runGet getString
cmds = ["DELETE","UPDATE","SELECT","MOVE","FETCH"]
+
+getNotification :: L.ByteString -> Notification
+getNotification = runGet parseMsg
+ where
+ parseMsg = do
+ notificationPid <- fromIntegral `fmap` getInt32
+ notificationChannel <- getString
+ notificationData <- getString
+ return (Notification{..})
+
+
-- | Update the row description of the result.
getRowDesc :: MonadState Result m => Map ObjectId String -> L.ByteString -> m ()
getRowDesc types block =
@@ -668,10 +682,10 @@ getMessage_ h = do
if isEOF
then throwIO ConnectionLost
else fail "getMessage_: the impossible just happened"
-
+
getMessage :: PGHandle -> IO (MessageType,L.ByteString)
getMessage (PGHandle source _) = do
- (RspMsg typ block) <- readSource source
+ (RspMsg typ block) <- readSource source
return (maybe UnknownMessageType id $ typeFromChar typ,block)
--------------------------------------------------------------------------------
@@ -53,6 +53,7 @@ data ConnectInfo = ConnectInfo {
data Connection = Connection {
connectionRequest :: MVar Dialog
, connectionEvent :: MVar Event
+ , connectionNotification :: Sink Notification
, connectionObjects :: MVar (Map ObjectId String)
}
@@ -71,6 +72,14 @@ data Event = Request Dialog
| Response RspMsg
| Disconnected ConnectionClosed
+data Notification = Notification
+ { notificationPid :: Int
+ , notificationChannel :: L.ByteString
+ , notificationData :: L.ByteString
+ }
+
+data NotificationChannel = NotificationChannel Connection (Source Notification)
+
-- | Result of a database query.
data Result =
Result {
@@ -103,7 +112,7 @@ data Field = Field {
,fieldFormatCode :: FormatCode
} deriving Show
-data Type =
+data Type =
Short -- ^ 2 bytes, small-range integer
| Long -- ^ 4 bytes, usual choice for integer
| LongLong -- ^ 8 bytes large-range integer
@@ -115,13 +124,13 @@ data Type =
| CharVarying -- ^ character varying(n), varchar(n), variable-length
| Characters -- ^ character(n), char(n), fixed-length
| Text -- ^ text, variable unlimited length
- --
+ --
-- Lazy. Decoded from UTF-8 into Haskell native encoding.
| Boolean -- ^ boolean, 1 byte, state of true or false
| Timestamp -- ^ timestamp /without/ time zone
- --
+ --
-- More information about PostgreSQL’s dates here:
-- <http://www.postgresql.org/docs/current/static/datatype-datetime.html>
| TimestampWithZone -- ^ timestamp /with/ time zone
@@ -155,12 +164,11 @@ newtype Pool = Pool { unPool :: MVar PoolState }
data ConnectionClosed = ConnectionClosed
- deriving (Show, Typeable)
+ deriving (Show, Typeable)
instance Exception ConnectionClosed
data InternalException = InternalException
- deriving (Show, Typeable)
+ deriving (Show, Typeable)
instance Exception InternalException
-
@@ -0,0 +1,35 @@
+-----------------------------------------------------------------------------
+-- |
+-- Module : Database.PostgreSQL.Simple.Notification
+-- Copyright : (c) 2011 Leon P Smith
+-- License : BSD3
+--
+-- Maintainer : Leon P Smith <leon@melding-monads.com>
+--
+-- Asynchronous notification support for PostgreSQL
+--
+-----------------------------------------------------------------------------
+
+module Database.PostgreSQL.Simple.Notification
+ ( -- * Asynchronous Notifications
+ Notification(..)
+ , NotificationChannel
+ , getNotificationChannel
+ , dupNotificationChannel
+ , getNotification
+ ) where
+
+import Control.Concurrent.Edge
+import Database.PostgreSQL.Base.Types
+
+getNotificationChannel :: Connection -> IO NotificationChannel
+getNotificationChannel conn = do
+ NotificationChannel conn `fmap` getSource (connectionNotification conn)
+
+dupNotificationChannel :: NotificationChannel -> IO NotificationChannel
+dupNotificationChannel (NotificationChannel conn source) = do
+ NotificationChannel conn `fmap` dupSource source
+
+getNotification :: NotificationChannel -> IO Notification
+getNotification (NotificationChannel _conn source) = do
+ readSource source

0 comments on commit f41e260

Please sign in to comment.