Browse files

Second attempt at implementing `fold`

This solves all the issues mentioned in bce0d38

This should be the first implementation in the right ballpark with regard
to correctness.  However, there is still room for improvement:

1. Error messages
     A. appears to come from "query", "execute", etc.
     B. queries appear with parameters substituted

2. Network performance
     A. eliminate a round trip on start up when the fold is auto-wrapped in
          a transaction
     B. eliminate a round trip on start up by fetching the first rows at the
          same time the cursor is opened
     C. often eliminate a round trip at the end by detecting when there
          are fewer rows than requested.
     D. interleave processing and network IO to a better degree

3. Haskell performance
     A.  No need to marshal a block of rows into a list before feeding them
          to the folding function.
  • Loading branch information...
1 parent 7235b54 commit 6457e41491a256ffbbfefd470039e2fd3310a931 @lpsmith committed Jan 24, 2012
Showing with 95 additions and 73 deletions.
  1. +95 −73 src/Database/PostgreSQL/Simple.hs
View
168 src/Database/PostgreSQL/Simple.hs
@@ -69,8 +69,13 @@ module Database.PostgreSQL.Simple
, query
, query_
-- * Queries that stream results
+ , FetchQuantity(..)
+ , FoldOptions
+ , defaultFoldOptions
, fold
+ , foldWithOptions
, fold_
+ , foldWithOptions_
, forEach
, forEach_
-- * Statements that do not return results
@@ -93,7 +98,7 @@ import Blaze.ByteString.Builder (Builder, fromByteString, toByteString)
import Blaze.ByteString.Builder.Char8 (fromChar)
import Control.Applicative ((<$>), pure)
import Control.Concurrent.MVar
-import Control.Exception (Exception, bracket, onException, throw, throwIO)
+import Control.Exception (Exception, bracket, onException, throw, throwIO, finally)
import Control.Monad (foldM)
import Control.Monad.Fix (fix)
import Data.ByteString (ByteString)
@@ -313,22 +318,38 @@ query_ conn q@(Query que) = do
-- using 'execute' instead of 'query').
--
-- * 'ResultError': result conversion failed.
-fold :: (QueryParams q, QueryResults r) =>
- Connection
- -> Query -- ^ Query template.
- -> q -- ^ Query parameters.
- -> a -- ^ Initial state for result consumer.
- -> (a -> r -> IO a) -- ^ Result consumer.
- -> IO a
-fold conn template qs z f = withConnection conn $ \c -> do
- success <- PQ.sendQuery c =<< formatQuery conn template qs
- if success
- then finishFold conn c template z f
- else do
- msg <- maybe "fold error" id <$> PQ.errorMessage c
- throwIO $ SqlError { sqlNativeError = -1
- , sqlErrorMsg = msg
- , sqlState = "" }
+
+fold :: ( QueryResults row, QueryParams params )
+ => Connection
+ -> Query
+ -> params
+ -> a
+ -> (a -> row -> IO a)
+ -> IO a
+fold = foldWithOptions defaultFoldOptions
+
+data FetchQuantity
+ = Automatic
+ | Fixed !Int
+
+newtype FoldOptions
+ = FoldOptions {
+ fetchQuantity :: FetchQuantity
+ }
+
+defaultFoldOptions = FoldOptions { fetchQuantity = Automatic }
+
+foldWithOptions :: ( QueryResults row, QueryParams params )
+ => FoldOptions
+ -> Connection
+ -> Query
+ -> params
+ -> a
+ -> (a -> row -> IO a)
+ -> IO a
+foldWithOptions opts conn template qs a f = do
+ q <- formatQuery conn template qs
+ doFold opts conn template (Query q) a f
-- | A version of 'fold' that does not perform query substitution.
fold_ :: (QueryResults r) =>
@@ -337,15 +358,63 @@ fold_ :: (QueryResults r) =>
-> a -- ^ Initial state for result consumer.
-> (a -> r -> IO a) -- ^ Result consumer.
-> IO a
-fold_ conn q@(Query que) z f = withConnection conn $ \c -> do
- success <- PQ.sendQuery c que
- if success
- then finishFold conn c q z f
- else do
- msg <- maybe "fold_ error" id <$> PQ.errorMessage c
- throwIO $ SqlError { sqlNativeError = -1
- , sqlErrorMsg = msg
- , sqlState = "" }
+fold_ = foldWithOptions_ defaultFoldOptions
+
+
+foldWithOptions_ :: (QueryResults r) =>
+ FoldOptions
+ -> Connection
+ -> Query -- ^ Query.
+ -> a -- ^ Initial state for result consumer.
+ -> (a -> r -> IO a) -- ^ Result consumer.
+ -> IO a
+foldWithOptions_ opts conn query a f = doFold opts conn query query a f
+
+
+doFold :: ( QueryResults row )
+ => FoldOptions
+ -> Connection
+ -> Query
+ -> Query
+ -> a
+ -> (a -> row -> IO a)
+ -> IO a
+doFold opts conn _template q a f = do
+ stat <- withConnection conn PQ.transactionStatus
+ case stat of
+ PQ.TransIdle -> withTransaction conn go
+ PQ.TransInTrans -> go
+ PQ.TransActive -> fail "foldWithOpts FIXME: PQ.TransActive"
+ -- This _shouldn't_ occur in the current incarnation of
+ -- the library, as we aren't using libpq asynchronously.
+ -- However, it could occur in future incarnations of
+ -- this library or if client code uses the Internal module
+ -- to use raw libpq commands on postgresql-simple connections.
+ PQ.TransInError -> fail "foldWithOpts FIXME: PQ.TransInError"
+ -- This should be turned into a better error message.
+ -- It is probably a bad idea to automatically roll
+ -- back the transaction and start another.
+ PQ.TransUnknown -> fail "foldWithOpts FIXME: PQ.TransUnknown"
+ -- Not sure what this means.
+ where
+ go = do
+ -- FIXME: what about name clashes with already-declared cursors?
+ _ <- execute_ conn ("DECLARE fold NO SCROLL CURSOR FOR "
+ `mappend` q)
+ loop a `finally` execute_ conn "CLOSE fold"
+
+-- FIXME: choose the Automatic chunkSize more intelligently
+-- One possibility is to use the type of the results, although this
+-- still isn't a perfect solution, given that common types (e.g. text)
+-- are of highly variable size.
+-- A refinement of this technique is to pick this number adaptively
+-- as results are read in from the database.
+ chunkSize = case fetchQuantity opts of
+ Automatic -> 256
+ Fixed n -> n
+ loop a = do
+ rs <- query conn "FETCH FORWARD ? FROM fold" (Only chunkSize)
+ if null rs then return a else foldM f a rs >>= loop
-- | A version of 'fold' that does not transform a state value.
forEach :: (QueryParams q, QueryResults r) =>
@@ -404,53 +473,6 @@ finishQuery conn q result = do
, sqlErrorMsg = B.concat [ "query: ", statusmsg
, ": ", errormsg ]}
-
-finishFold :: (QueryResults r) =>
- Connection
- -> PQ.Connection
- -> Query -- ^ Query.
- -> a -- ^ Initial state for result consumer.
- -> (a -> r -> IO a) -- ^ Result consumer.
- -> IO a
-finishFold conn c q a_ f = loop a_
- where
- loop a = do
- mres <- PQ.getResult c
- case mres of
- Nothing -> return a
- Just result -> do
- stat <- PQ.resultStatus result
- case stat of
- PQ.TuplesOk -> do
- ncols <- PQ.nfields result
- fields <- forM' 0 (ncols-1) $ \column -> do
- type_oid <- PQ.ftype result column
- typename <- getTypename conn type_oid
- return Field{..}
- nrows <- PQ.ntuples result
- a' <- foldM (\a row -> do
- values <- forM' 0 (ncols-1) (PQ.getvalue result row)
- case convertResults fields values of
- Left err -> clear c >> throwIO err
- Right r -> f a r)
- a [0..nrows-1]
- loop a'
- _ -> do
- errormsg <- maybe "" id <$> PQ.resultErrorMessage result
- statusmsg <- PQ.resStatus stat
- state <- maybe "" id <$> PQ.resultErrorField result PQ.DiagSqlstate
- clear c
- throwIO $ SqlError {
- sqlState = state,
- sqlNativeError = fromEnum stat,
- sqlErrorMsg = B.concat [ "fold: ", statusmsg
- , ": ", errormsg ]}
- clear c = do
- mres <- PQ.getResult c
- case mres of
- Nothing -> return ()
- Just _ -> clear c
-
-- | Execute an action inside a SQL transaction.
--
-- This function initiates a transaction with a \"@begin

0 comments on commit 6457e41

Please sign in to comment.