Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Notification handling #43

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 4 additions & 4 deletions hasql.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ description:
.
The API is completely disinfected from exceptions. All error-reporting is explicit and is presented using the 'Either' type.
homepage:
https://github.com/nikita-volkov/hasql
https://github.com/nikita-volkov/hasql
bug-reports:
https://github.com/nikita-volkov/hasql/issues
https://github.com/nikita-volkov/hasql/issues
author:
Nikita Volkov <nikita.y.volkov@mail.ru>
maintainer:
Expand Down Expand Up @@ -67,6 +67,7 @@ library
Hasql.Settings
Hasql.Connection
Hasql.Query
Hasql.Notification
build-depends:
-- parsing:
attoparsec >= 0.10 && < 0.14,
Expand Down Expand Up @@ -145,7 +146,7 @@ test-suite tasty


benchmark benchmark
type:
type:
exitcode-stdio-1.0
hs-source-dirs:
benchmark
Expand Down Expand Up @@ -190,4 +191,3 @@ benchmark benchmark
transformers,
base-prelude,
base

5 changes: 5 additions & 0 deletions library/Hasql/Connection/Impl.hs
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,8 @@ release (Connection pqConnectionRef _ _) =
withConnectionRef :: MVar LibPQ.Connection -> (LibPQ.Connection -> IO a) -> IO a
withConnectionRef =
withMVar

{-# INLINE withConnection #-}
withConnection :: Connection -> (LibPQ.Connection -> IO a) -> IO a
withConnection (Connection connectionRef _ _) =
withConnectionRef connectionRef
167 changes: 167 additions & 0 deletions library/Hasql/Notification.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
{-# LANGUAGE CPP #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RecordWildCards #-}

-----------------------------------------------------------------------------
-- |
-- Module : Hasql.Notification
-- Copyright : (c) 2011-2015 Leon P Smith
-- (c) 2012 Joey Adams
-- License : MIT
--
-- Maintainer : Nikita Volkov <nikita.y.volkov@mail.ru>
--
-- Support for receiving asynchronous notifications via PostgreSQL's
-- Listen/Notify mechanism. See
-- <http://www.postgresql.org/docs/9.4/static/sql-notify.html> for more
-- information.
--
-- Note that on Windows, @getNotification@ currently uses a polling loop
-- of 1 second to check for more notifications, due to some inadequacies
-- in GHC's IO implementation and interface on that platform. See GHC
-- issue #7353 for more information. While this workaround is less than
-- ideal, notifications are still better than polling the database directly.
-- Notifications do not create any extra work for the backend, and are
-- likely cheaper on the client side as well.
--
-- <http://hackage.haskell.org/trac/ghc/ticket/7353>
--
-----------------------------------------------------------------------------

module Hasql.Notification
( Notification(..)
, getNotification
, getNotificationNonBlocking
, getBackendPID
) where

import Hasql.Prelude
import Hasql.Connection.Impl

import Control.Exception ( throwIO )
import qualified Data.ByteString as B
import qualified Database.PostgreSQL.LibPQ as PQ
import GHC.IO.Exception ( IOError(..) )
import System.Posix.Types ( CPid )

#if defined(mingw32_HOST_OS)
import Control.Concurrent ( threadDelay )
#elif !MIN_VERSION_base(4,7,0)
import Control.Concurrent ( threadWaitRead )
#else
import GHC.Conc ( atomically )
import Control.Concurrent ( threadWaitReadSTM )
#endif

data Notification = Notification
{ notificationPid :: !CPid
, notificationChannel :: !B.ByteString
, notificationData :: !B.ByteString
}

convertNotice :: PQ.Notify -> Notification
convertNotice PQ.Notify{..}
= Notification { notificationPid = notifyBePid
, notificationChannel = notifyRelname
, notificationData = notifyExtra }

-- | Returns a single notification. If no notifications are available,
-- 'getNotification' blocks until one arrives.
--
-- It is safe to call 'getNotification' on a connection that is concurrently
-- being used for other purposes, note however that PostgreSQL does not
-- deliver notifications while a connection is inside a transaction.

getNotification :: Connection -> IO (Either IOError Notification)
getNotification conn = join $ withConnection conn fetch
where
funcName = "Hasql.Notification.getNotification"

fetch c = do
PQ.notifies c >>= \case
Just msg -> return (return $! (Right $! convertNotice msg))
Nothing -> do
PQ.socket c >>= \case
Nothing -> return (return (Left fdError))
#if defined(mingw32_HOST_OS)
-- threadWaitRead doesn't work for sockets on Windows, so just
-- poll for input every second (PQconsumeInput is non-blocking).
--
-- We could call select(), but FFI calls can't be interrupted
-- with async exceptions, whereas threadDelay can.
Just _fd -> do
return (threadDelay 1000000 >> loop)
#elif !MIN_VERSION_base(4,7,0)
-- Technically there's a race condition that is usually benign.
-- If the connection is closed or reset after we drop the
-- lock, and then the fd index is reallocated to a new
-- descriptor before we call threadWaitRead, then
-- we could end up waiting on the wrong descriptor.
--
-- Now, if the descriptor becomes readable promptly, then
-- it's no big deal as we'll wake up and notice the change
-- on the next iteration of the loop. But if are very
-- unlucky, then we could end up waiting a long time.
Just fd -> do
return $ try (threadWaitRead fd) >>= \case
Left err -> return (Left (setLoc err))
Right _ -> loop
#else
-- This case fixes the race condition above. By registering
-- our interest in the descriptor before we drop the lock,
-- there is no opportunity for the descriptor index to be
-- reallocated on us.
--
-- (That is, assuming there isn't concurrently executing
-- code that manipulates the descriptor without holding
-- the lock... but such a major bug is likely to exhibit
-- itself in an at least somewhat more dramatic fashion.)
Just fd -> do
(waitRead, _) <- threadWaitReadSTM fd
return $ try (atomically waitRead) >>= \case
Left err -> return (Left (setLoc err))
Right _ -> loop

#endif

loop = join $ withConnection conn $ \c -> do
void $ PQ.consumeInput c
fetch c

setLoc :: IOError -> IOError
setLoc err = err { ioe_location = funcName }

fdError :: IOError
fdError = IOError {
ioe_handle = Nothing,
ioe_type = ResourceVanished,
ioe_location = funcName,
ioe_description = "failed to fetch file descriptor (did the connection time out?)",
ioe_errno = Nothing,
ioe_filename = Nothing
}

-- | Non-blocking variant of 'getNotification'. Returns a single notification,
-- if available. If no notifications are available, returns 'Nothing'.

getNotificationNonBlocking :: Connection -> IO (Maybe Notification)
getNotificationNonBlocking conn =
withConnection conn $ \c -> do
PQ.notifies c >>= \case
Just msg -> return $! Just $! convertNotice msg
Nothing -> do
void $ PQ.consumeInput c
PQ.notifies c >>= \case
Just msg -> return $! Just $! convertNotice msg
Nothing -> return Nothing

-- | Returns the process 'CPid' of the backend server process
-- handling this connection.
--
-- The backend PID is useful for debugging purposes and for comparison
-- to NOTIFY messages (which include the PID of the notifying backend
-- process). Note that the PID belongs to a process executing on the
-- database server host, not the local host!

getBackendPID :: Connection -> IO CPid
getBackendPID conn = withConnection conn PQ.backendPID