From 823eca5efd3e2441b7e72a1c38ad1dcaaf10c6b1 Mon Sep 17 00:00:00 2001 From: Bryan O'Sullivan Date: Mon, 2 May 2011 20:09:26 -0700 Subject: [PATCH] Initial commit. Code taken from the riak client. --- .hgignore | 10 +++ Data/Pool.hs | 192 ++++++++++++++++++++++++++++++++++++++++++++++++ LICENSE | 30 ++++++++ README.markdown | 27 +++++++ Setup.lhs | 3 + pool.cabal | 50 +++++++++++++ 6 files changed, 312 insertions(+) create mode 100644 .hgignore create mode 100644 Data/Pool.hs create mode 100644 LICENSE create mode 100644 README.markdown create mode 100755 Setup.lhs create mode 100644 pool.cabal diff --git a/.hgignore b/.hgignore new file mode 100644 index 0000000..51da823 --- /dev/null +++ b/.hgignore @@ -0,0 +1,10 @@ +.*\.(?:aux|h[ip]|o|orig|out|pdf|prof|ps|rej)$ +^(?:dist|\.DS_Store)$ +^tests/(?:qc) + +syntax: glob +cabal-dev +*~ +.*.swp +.\#* +\#* diff --git a/Data/Pool.hs b/Data/Pool.hs new file mode 100644 index 0000000..03df4ce --- /dev/null +++ b/Data/Pool.hs @@ -0,0 +1,192 @@ +{-# LANGUAGE NamedFieldPuns, RecordWildCards, ScopedTypeVariables #-} + +-- | +-- Module: Data.Pool +-- Copyright: (c) 2011 MailRank, Inc. +-- License: BSD3 +-- Maintainer: Bryan O'Sullivan +-- Stability: experimental +-- Portability: portable +-- +-- A high-performance striped pooling abstraction for managing +-- flexibly-sized collections of resources such as database +-- connections. +-- +-- \"Striped\" means that a single 'Pool' consists of several +-- sub-pools, each managed independently. A stripe size of 1 is fine +-- for many applications, and probably what you should choose by +-- default. Larger stripe sizes will lead to reduced contention in +-- high-performance multicore applications, at a trade-off of causing +-- the maximum number of simultaneous resources in use to grow. +module Data.Pool + ( + Pool(idleTime, maxResources, numStripes) + , createPool + , withResource + ) where + +import Control.Applicative ((<$>)) +import Control.Concurrent (forkIO, killThread, myThreadId, threadDelay) +import Control.Concurrent.STM +import Control.Exception (SomeException, catch, onException) +import Control.Monad (forM_, forever, join, liftM2, unless, when) +import Data.Hashable (hash) +import Data.List (partition) +import Data.Time.Clock (NominalDiffTime, UTCTime, diffUTCTime, getCurrentTime) +import Prelude hiding (catch) +import System.Mem.Weak (addFinalizer) +import qualified Data.Vector as V + +-- | A single resource pool entry. +data Entry a = Entry { + entry :: a + , lastUse :: UTCTime + -- ^ Time of last return. + } + +-- | A single striped pool. +data LocalPool a = LocalPool { + inUse :: TVar Int + -- ^ Count of open entries (both idle and in use). + , entries :: TVar [Entry a] + -- ^ Idle entries. + } + +data Pool a = Pool { + create :: IO a + -- ^ Action for creating a new entry to add to the pool. + , destroy :: a -> IO () + -- ^ Action for destroying an entry that is now done with. + , numStripes :: Int + -- ^ Stripe count. The number of distinct sub-pools to maintain. + -- The smallest acceptable value is 1. + , idleTime :: NominalDiffTime + -- ^ Amount of time for which an unused resource is kept alive. + -- The smallest acceptable value is 0.5 seconds. + -- + -- The elapsed time before closing may be a little longer than + -- requested, as the reaper thread wakes at 2-second intervals. + , maxResources :: Int + -- ^ Maximum number of resources to maintain per stripe. The + -- smallest acceptable value is 1. + -- + -- Requests for resources will block if this limit is reached on a + -- single stripe, even if other stripes have idle resources + -- available. + , localPools :: V.Vector (LocalPool a) + -- ^ Per-capability resource pools. + } + +instance Show (Pool a) where + show Pool{..} = "Pool {numStripes = " ++ show numStripes ++ ", " ++ + "idleTime = " ++ show idleTime ++ ", " ++ + "maxResources = " ++ show maxResources ++ "}" + +createPool + :: IO a + -- ^ Action that creates a new resource. + -> (a -> IO ()) + -- ^ Action that destroys an existing resource. + -> Int + -- ^ Stripe count. The number of distinct sub-pools to maintain. + -- The smallest acceptable value is 1. + -> NominalDiffTime + -- ^ Amount of time for which an unused resource is kept open. + -- The smallest acceptable value is 0.5 seconds. + -- + -- The elapsed time before destroying a resource may be a little + -- longer than requested, as the reaper thread wakes at 1-second + -- intervals. + -> Int + -- ^ Maximum number of resources to keep open per stripe. The + -- smallest acceptable value is 1. + -- + -- Requests for resources will block if this limit is reached on a + -- single stripe, even if other stripes have idle resources + -- available. + -> IO (Pool a) +createPool create destroy numStripes idleTime maxResources = do + when (numStripes < 1) $ + modError "pool " $ "invalid stripe count " ++ show numStripes + when (idleTime < 0.5) $ + modError "pool " $ "invalid idle time " ++ show idleTime + when (maxResources < 1) $ + modError "pool " $ "invalid maximum resource count " ++ show maxResources + localPools <- atomically . V.replicateM numStripes $ + liftM2 LocalPool (newTVar 0) (newTVar []) + reaperId <- forkIO $ reaper destroy idleTime localPools + let p = Pool { + create + , destroy + , numStripes + , idleTime + , maxResources + , localPools + } + addFinalizer p $ killThread reaperId + return p + +-- | Periodically go through all pools, closing any resources that +-- have been left idle for too long. +reaper :: (a -> IO ()) -> NominalDiffTime -> V.Vector (LocalPool a) -> IO () +reaper destroy idleTime pools = forever $ do + threadDelay (1 * 1000000) + now <- getCurrentTime + let isStale Entry{..} = now `diffUTCTime` lastUse > idleTime + V.forM_ pools $ \LocalPool{..} -> do + resources <- atomically $ do + (stale,fresh) <- partition isStale <$> readTVar entries + unless (null stale) $ do + writeTVar entries fresh + modifyTVar_ inUse (subtract (length stale)) + return (map entry stale) + forM_ resources $ \resource -> do + -- debug "reaper" "destroying idle resource" + destroy resource `catch` \(_::SomeException) -> return () + +-- | Temporarily take a resource from a 'Pool', perform an action with +-- it, and return it to the pool afterwards. +-- +-- * If the pool has an idle resource available, it is used +-- immediately. +-- +-- * Otherwise, if the maximum number of resources has not yet been +-- reached, a new resource is created and used. +-- +-- * If the maximum number of resources has been reached, this +-- function blocks until a resource becomes available. +-- +-- If the action throws an exception of any type, the resource is +-- destroyed, and not returned to the pool. +-- +-- It probably goes without saying that you should never manually +-- destroy a pooled resource, as doing so will almost certainly cause +-- a subsequent user (who expects the resource to be valid) to throw +-- an exception. +withResource :: Pool a -> (a -> IO b) -> IO b +withResource Pool{..} act = do + i <- ((`mod` numStripes) . hash) <$> myThreadId + let LocalPool{..} = localPools V.! i + resource <- join . atomically $ do + ents <- readTVar entries + case ents of + (Entry{..}:es) -> writeTVar entries es >> return create + [] -> do + used <- readTVar inUse + when (used == maxResources) retry + writeTVar inUse $! used + 1 + return $ do + create `onException` atomically (modifyTVar_ inUse (subtract 1)) + ret <- act resource `onException` do + destroy resource `catch` \(_::SomeException) -> return () + atomically (modifyTVar_ inUse (subtract 1)) + now <- getCurrentTime + atomically $ modifyTVar_ entries (Entry resource now:) + return ret + +modifyTVar_ :: TVar a -> (a -> a) -> STM () +modifyTVar_ v f = readTVar v >>= \a -> writeTVar v $! f a + +modError :: String -> String -> a +modError func msg = + error $ "Data.Pool." ++ func ++ ": " ++ msg diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..a6fb08a --- /dev/null +++ b/LICENSE @@ -0,0 +1,30 @@ +Copyright (c) 2011, MailRank, Inc. + +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: + +1. Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + +2. Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + +3. Neither the name of the author nor the names of his contributors + may be used to endorse or promote products derived from this software + without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE CONTRIBUTORS ``AS IS'' AND ANY EXPRESS +OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE FOR +ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS +OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) +HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. diff --git a/README.markdown b/README.markdown new file mode 100644 index 0000000..1e8e228 --- /dev/null +++ b/README.markdown @@ -0,0 +1,27 @@ +# Welcome to aeson + +aeson is a fast Haskell library for working with JSON data. + +# Join in! + +We are happy to receive bug reports, fixes, documentation enhancements, +and other improvements. + +Please report bugs via the +[github issue tracker](http://github.com/mailrank/aeson/issues). + +Master [git repository](http://github.com/mailrank/aeson): + +* `git clone git://github.com/mailrank/aeson.git` + +There's also a [Mercurial mirror](http://bitbucket.org/bos/aeson): + +* `hg clone http://bitbucket.org/bos/aeson` + +(You can create and contribute changes using either git or Mercurial.) + +Authors +------- + +This library is written and maintained by Bryan O'Sullivan, +. diff --git a/Setup.lhs b/Setup.lhs new file mode 100755 index 0000000..5bde0de --- /dev/null +++ b/Setup.lhs @@ -0,0 +1,3 @@ +#!/usr/bin/env runhaskell +> import Distribution.Simple +> main = defaultMain diff --git a/pool.cabal b/pool.cabal new file mode 100644 index 0000000..7f799f4 --- /dev/null +++ b/pool.cabal @@ -0,0 +1,50 @@ +name: pool +version: 0.1.0.0 +synopsis: A high-performance striped resource pooling implementation +description: + A high-performance striped pooling abstraction for managing + flexibly-sized collections of resources such as database + connections. + +homepage: http://github.com/mailrank/pool +license: BSD3 +license-file: LICENSE +author: Bryan O'Sullivan +maintainer: Bryan O'Sullivan +copyright: Copyright 2011 MailRank, Inc. +category: Data +build-type: Simple +extra-source-files: + README.markdown + +cabal-version: >=1.8 + +flag developer + description: operate in developer mode + default: False + +library + exposed-modules: + Data.Pool + + build-depends: + base == 4.*, + hashable, + stm, + time, + vector >= 0.7 + + if flag(developer) + ghc-options: -Werror + ghc-prof-options: -auto-all + cpp-options: -DASSERTS -DDEBUG + + ghc-options: -Wall + +source-repository head + type: git + location: http://github.com/mailrank/pool + +source-repository head + type: mercurial + location: http://bitbucket.org/bos/pool