Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Move connection pooling code out to the pool package.

  • Loading branch information...
commit fd0e151d2fba9080ef57a53ccb090c8054bb9de1 1 parent dd029e1
@bos bos authored
Showing with 41 additions and 129 deletions.
  1. +2 −4 riak.cabal
  2. +39 −125 src/Network/Riak/Connection/Pool.hs
View
6 riak.cabal
@@ -86,16 +86,14 @@ library
blaze-builder,
bytestring,
containers,
- hashable >= 1.0.1.2,
network >= 2.3,
+ pool >= 0.1.0.2,
protocol-buffers >= 1.8.0,
pureMD5,
random,
riak-protobuf >= 0.14.0.0,
- stm,
text >= 0.11.0.6,
- time,
- vector >= 0.7
+ time
if flag(debug)
cpp-options: -DASSERTS -DDEBUG
View
164 src/Network/Riak/Connection/Pool.hs
@@ -10,14 +10,8 @@
-- Portability: portable
--
-- A high-performance striped pooling abstraction for managing
--- connections to a Riak cluster.
---
--- \"Striped\" means that a single 'Pool' consists of several
--- sub-pools, each managed independently. A stripe size of 1 is fine
--- for many applications, and probably what you should choose by
--- default. Larger stripe sizes will lead to reduced contention in
--- high-performance multicore applications, at a trade-off of causing
--- the maximum number of simultaneous connections to grow.
+-- connections to a Riak cluster. This is a thin wrapper around
+-- 'Data.Pool'.
module Network.Riak.Connection.Pool
(
Pool
@@ -29,36 +23,11 @@ module Network.Riak.Connection.Pool
, withConnection
) where
-import Control.Applicative ((<$>))
-import Control.Concurrent (forkIO, killThread, myThreadId, threadDelay)
-import Control.Concurrent.STM
-import Control.Exception (SomeException, catch, onException)
-import Control.Monad (forM_, forever, join, liftM2, unless, when)
-import Data.Hashable (hash)
-import Data.List (partition)
-import Data.Time.Clock (NominalDiffTime, UTCTime, diffUTCTime, getCurrentTime)
+import Data.Time.Clock (NominalDiffTime)
import Data.Typeable (Typeable)
-import Network.Riak.Connection.Internal (connect, disconnect, makeClientID)
-import Network.Riak.Debug (debug)
-import Network.Riak.Types (Client(clientID), Connection)
-import Prelude hiding (catch)
-import System.Mem.Weak (addFinalizer)
-import qualified Data.Vector as V
-
--- | A single connection pool entry.
-data Entry = Entry {
- connection :: Connection
- , lastUse :: UTCTime
- -- ^ Time of last return.
- }
-
--- | A single striped pool.
-data LocalPool = LocalPool {
- connected :: TVar Int
- -- ^ Count of open connections (both idle and in use).
- , entries :: TVar [Entry]
- -- ^ Idle entries.
- }
+import Network.Riak (Client(clientID), Connection, connect, disconnect)
+import Network.Riak.Connection (makeClientID)
+import qualified Data.Pool as Pool
-- | A pool of connections to a Riak server.
--
@@ -71,31 +40,14 @@ data Pool = Pool {
client :: Client
-- ^ Client specification. The client ID is ignored, and always
-- regenerated automatically for each new connection.
- , numStripes :: Int
- -- ^ Stripe count. The number of distinct sub-pools to maintain.
- -- The smallest acceptable value is 1.
- , idleTime :: NominalDiffTime
- -- ^ Amount of time for which an unused connection is kept open.
- -- The smallest acceptable value is 0.5 seconds.
- --
- -- The elapsed time before closing may be a little longer than
- -- requested, as the reaper thread wakes at 2-second intervals.
- , maxConnections :: Int
- -- ^ Maximum number of connections to keep open per stripe. The
- -- smallest acceptable value is 1.
- --
- -- Requests for connections will block if this limit is reached on
- -- a single stripe, even if other stripes have idle connections
- -- available.
- , localPools :: V.Vector LocalPool
- -- ^ Per-capability connection pools.
+ , pool :: Pool.Pool Connection
} deriving (Typeable)
instance Show Pool where
- show Pool{..} = "Pool { client = " ++ show client ++ ", " ++
- "numStripes = " ++ show numStripes ++ ", " ++
- "idleTime = " ++ show idleTime ++ ", " ++
- "maxConnections = " ++ show maxConnections ++ "}"
+ show p = "Pool { client = " ++ show (client p) ++ ", " ++
+ "numStripes = " ++ show (numStripes p) ++ ", " ++
+ "idleTime = " ++ show (idleTime p) ++ ", " ++
+ "maxConnections = " ++ show (maxConnections p) ++ "}"
instance Eq Pool where
a == b = client a == client b && numStripes a == numStripes b &&
@@ -122,45 +74,34 @@ create :: Client
-- on a single stripe, even if other stripes have idle
-- connections available.
-> IO Pool
-create client numStripes idleTime maxConnections = do
- when (numStripes < 1) $
- modError "pool " $ "invalid stripe count " ++ show numStripes
- when (idleTime < 0.5) $
- modError "pool " $ "invalid idle time " ++ show idleTime
- when (maxConnections < 1) $
- modError "pool " $ "invalid maximum connection count " ++
- show maxConnections
- localPools <- atomically . V.replicateM numStripes $
- liftM2 LocalPool (newTVar 0) (newTVar [])
- reaperId <- forkIO $ reaper idleTime localPools
- let p = Pool {
- client
- , numStripes
- , idleTime
- , maxConnections
- , localPools
- }
- addFinalizer p $ killThread reaperId
- return p
+create client ns it mc =
+ Pool client `fmap` Pool.createPool c disconnect ns it mc
+ where c = do
+ cid <- makeClientID
+ connect client { clientID = cid }
+
+-- | Stripe count. The number of distinct sub-pools to maintain. The
+-- smallest acceptable value is 1.
+numStripes :: Pool -> Int
+numStripes = Pool.numStripes . pool
+
+-- | Amount of time for which an unused connection is kept open. The
+-- smallest acceptable value is 0.5 seconds.
+--
+-- The elapsed time before closing may be a little longer than
+-- requested, as the reaper thread wakes at 1-second intervals.
+idleTime :: Pool -> NominalDiffTime
+idleTime = Pool.idleTime . pool
+
+-- | Maximum number of connections to keep open per stripe. The
+-- smallest acceptable value is 1.
+--
+-- Requests for connections will block if this limit is reached on a
+-- single stripe, even if other stripes have idle connections
+-- available.
+maxConnections :: Pool -> Int
+maxConnections = Pool.maxResources . pool
--- | Periodically go through all pools, closing any connections that
--- have been left idle for too long.
-reaper :: NominalDiffTime -> V.Vector LocalPool -> IO ()
-reaper idleTime pools = forever $ do
- threadDelay (2 * 1000000)
- now <- getCurrentTime
- let isStale Entry{..} = now `diffUTCTime` lastUse > idleTime
- V.forM_ pools $ \LocalPool{..} -> do
- conns <- atomically $ do
- (stale,fresh) <- partition isStale <$> readTVar entries
- unless (null stale) $ do
- writeTVar entries fresh
- modifyTVar_ connected (subtract (length stale))
- return (map connection stale)
- forM_ conns $ \conn -> do
- debug "reaper" "closing idle connection"
- disconnect conn `catch` \(_::SomeException) -> return ()
-
-- | Temporarily take a connection from a 'Pool', perform an action
-- with it, and return it to the pool afterwards.
--
@@ -181,31 +122,4 @@ reaper idleTime pools = forever $ do
-- 'disconnect' on a connection, as doing so will cause a subsequent
-- user (who expects the connection to be valid) to throw an exception.
withConnection :: Pool -> (Connection -> IO a) -> IO a
-withConnection Pool{..} act = do
- i <- ((`mod` numStripes) . hash) <$> myThreadId
- let LocalPool{..} = localPools V.! i
- conn <- join . atomically $ do
- ents <- readTVar entries
- case ents of
- (Entry{..}:es) -> writeTVar entries es >> return (return connection)
- [] -> do
- inUse <- readTVar connected
- when (inUse == maxConnections) retry
- writeTVar connected $! inUse + 1
- return $ do
- cid <- makeClientID
- connect client { clientID = cid }
- `onException` atomically (modifyTVar_ connected (subtract 1))
- ret <- act conn `onException` do
- disconnect conn `catch` \(_::SomeException) -> return ()
- atomically (modifyTVar_ connected (subtract 1))
- now <- getCurrentTime
- atomically $ modifyTVar_ entries (Entry conn now:)
- return ret
-
-modifyTVar_ :: TVar a -> (a -> a) -> STM ()
-modifyTVar_ v f = readTVar v >>= \a -> writeTVar v $! f a
-
-modError :: String -> String -> a
-modError func msg =
- error $ "Network.Riak.Connection.Pool." ++ func ++ ": " ++ msg
+withConnection = Pool.withResource . pool
Please sign in to comment.
Something went wrong with that request. Please try again.