Skip to content

Commit

Permalink
Synchronize user- and evalation thread with a BoundedChan.
Browse files Browse the repository at this point in the history
This is much cleaner and slightly faster but costs some memory.
  • Loading branch information
informatikr committed Mar 28, 2012
1 parent e953e05 commit 6aa198e
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 29 deletions.
1 change: 1 addition & 0 deletions hedis.cabal
Expand Up @@ -66,6 +66,7 @@ library
exposed-modules: Database.Redis
build-depends: attoparsec == 0.10.*,
base == 4.*,
BoundedChan == 1.0.*,
bytestring == 0.9.*,
bytestring-lexing == 0.4.*,
mtl == 2.*,
Expand Down
45 changes: 18 additions & 27 deletions src/Database/Redis/Core.hs
Expand Up @@ -16,7 +16,8 @@ module Database.Redis.Core (
import Prelude hiding (catch)
import Control.Applicative
import Control.Monad.Reader
import Control.Concurrent
import Control.Concurrent (ThreadId, forkIO, killThread)
import Control.Concurrent.BoundedChan
import Control.Concurrent.STM
import Control.Exception
import qualified Data.Attoparsec as P
Expand Down Expand Up @@ -85,45 +86,35 @@ runRedisInternal env (Redis redis) = runReaderT redis env
--
-- Create with 'newEnv'. Modified by 'recv' and 'send'.
data RedisEnv = Env
{ envHandle :: Handle -- ^ Connection socket-handle.
, envReplies :: TVar [Reply] -- ^ Reply thunks.
, envThunkCnt :: TVar Integer -- ^ Number of thunks in 'envThunkChan'.
, envEvalTId :: ThreadId -- ^ 'ThreadID' of the evaluator thread.
{ envHandle :: Handle -- ^ Connection socket-handle.
, envReplies :: TVar [Reply] -- ^ Reply thunks.
, envThunks :: BoundedChan Reply -- ^ Syncs user and eval threads.
, envEvalTId :: ThreadId -- ^ 'ThreadID' of the evaluator thread.
}

-- |Create a new 'RedisEnv'
newEnv :: Handle -> IO RedisEnv
newEnv envHandle = do
replies <- lazify <$> hGetReplies envHandle
envReplies <- newTVarIO replies
envThunkCnt <- newTVarIO 0
envEvalTId <- forkIO $ forceThunks envThunkCnt replies
envThunks <- newBoundedChan 1000
envEvalTId <- forkIO $ forceThunks envThunks
return Env{..}
where
lazify rs = head rs : lazify (tail rs)

forceThunks :: TVar Integer -> [Reply] -> IO ()
forceThunks thunkCnt = go
where
go [] = return ()
go (r:rs) = do
-- wait for a thunk
atomically $ do
cnt <- readTVar thunkCnt
guard (cnt > 0)
writeTVar thunkCnt (cnt-1)
r `seq` go rs
lazify rs = head rs : lazify (tail rs)
forceThunks thunks = do
t <- readChan thunks
t `seq` forceThunks thunks

recv :: (MonadRedis m) => m Reply
recv = liftRedis $ Redis $ do
Env{..} <- ask
liftIO $ atomically $ do
-- limit the amount of reply-thunks per connection.
cnt <- readTVar envThunkCnt
guard $ cnt < 1000
writeTVar envThunkCnt (cnt+1)
r:rs <- readTVar envReplies
writeTVar envReplies rs
liftIO $ do
r <- atomically $ do
r:rs <- readTVar envReplies
writeTVar envReplies rs
return r
writeChan envThunks r
return r

send :: (MonadRedis m) => [B.ByteString] -> m ()
Expand Down
2 changes: 1 addition & 1 deletion test/Test.hs
Expand Up @@ -59,7 +59,7 @@ testConstantSpacePipelining :: Test
testConstantSpacePipelining = testCase "constant-space pipelining" $ do
-- This testcase should not exceed the maximum heap size, as set in
-- the run-test.sh script.
replicateM_ 10000 ping
replicateM_ 100000 ping
-- If the program didn't crash, pipelining takes constant memory.
assert True

Expand Down
2 changes: 1 addition & 1 deletion test/run-test.sh
Expand Up @@ -4,7 +4,7 @@ echo "---------------"
echo "program output:"
echo "---------------"
# The -M argument limits heap size for 'testConstantSpacePipelining'.
./dist/build/hedis-test/hedis-test +RTS -M1m
./dist/build/hedis-test/hedis-test +RTS -M5m

echo "---------------"
hpc markup --destdir=test/coverage hedis-test.tix
Expand Down

0 comments on commit 6aa198e

Please sign in to comment.