Skip to content

Commit

Permalink
Merging the fix for "Weak ThreadId still leaks threads" (#488)
Browse files Browse the repository at this point in the history
  • Loading branch information
kazu-yamamoto committed Dec 16, 2015
2 parents aabf2d4 + 9ab44cf commit 0cd3664
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 67 deletions.
63 changes: 41 additions & 22 deletions auto-update/Control/Reaper.hs
@@ -1,4 +1,5 @@
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE BangPatterns #-}

-- | This module provides the ability to create reapers: dedicated cleanup
-- threads. These threads will automatically spawn and die based on the
Expand All @@ -22,10 +23,9 @@ module Control.Reaper (
) where

import Control.AutoUpdate.Util (atomicModifyIORef')
import Control.Concurrent (forkIO, threadDelay)
import Control.Concurrent (forkIO, threadDelay, killThread, ThreadId)
import Control.Exception (mask_)
import Control.Monad (join, void)
import Data.IORef (IORef, newIORef, readIORef)
import Data.IORef (IORef, newIORef, readIORef, writeIORef)

-- | Settings for creating a reaper. This type has two parameters:
-- @workload@ gives the entire workload, whereas @item@ gives an
Expand Down Expand Up @@ -96,6 +96,8 @@ data Reaper workload item = Reaper {
-- | Stopping the reaper thread if exists.
-- The current workload is returned.
, reaperStop :: IO workload
-- | Killing the reaper thread immediately if exists.
, reaperKill :: IO ()
}

-- | State of reaper.
Expand All @@ -110,10 +112,12 @@ data State workload = NoReaper -- ^ No reaper thread
mkReaper :: ReaperSettings workload item -> IO (Reaper workload item)
mkReaper settings@ReaperSettings{..} = do
stateRef <- newIORef NoReaper
tidRef <- newIORef Nothing
return Reaper {
reaperAdd = update settings stateRef
reaperAdd = add settings stateRef tidRef
, reaperRead = readRef stateRef
, reaperStop = stop stateRef
, reaperKill = kill tidRef
}
where
readRef stateRef = do
Expand All @@ -125,41 +129,56 @@ mkReaper settings@ReaperSettings{..} = do
case mx of
NoReaper -> (NoReaper, reaperEmpty)
Workload x -> (Workload reaperEmpty, x)
kill tidRef = do
mtid <- readIORef tidRef
case mtid of
Nothing -> return ()
Just tid -> killThread tid

update :: ReaperSettings workload item -> IORef (State workload) -> item
-> IO ()
update settings@ReaperSettings{..} stateRef item =
mask_ $ join $ atomicModifyIORef' stateRef cons
add :: ReaperSettings workload item
-> IORef (State workload) -> IORef (Maybe ThreadId)
-> item -> IO ()
add settings@ReaperSettings{..} stateRef tidRef item =
mask_ $ do
next <- atomicModifyIORef' stateRef cons
next
where
cons NoReaper = (Workload $ reaperCons item reaperEmpty
,spawn settings stateRef)
cons (Workload wl) = (Workload $ reaperCons item wl
,return ())
cons NoReaper = let !wl = reaperCons item reaperEmpty
in (Workload wl, spawn settings stateRef tidRef)
cons (Workload wl) = let wl' = reaperCons item wl
in (Workload wl', return ())

spawn :: ReaperSettings workload item -> IORef (State workload) -> IO ()
spawn settings stateRef = void . forkIO $ reaper settings stateRef
spawn :: ReaperSettings workload item
-> IORef (State workload) -> IORef (Maybe ThreadId)
-> IO ()
spawn settings stateRef tidRef = do
tid <- forkIO $ reaper settings stateRef tidRef
writeIORef tidRef $ Just tid

reaper :: ReaperSettings workload item -> IORef (State workload) -> IO ()
reaper settings@ReaperSettings{..} stateRef = do
reaper :: ReaperSettings workload item
-> IORef (State workload) -> IORef (Maybe ThreadId)
-> IO ()
reaper settings@ReaperSettings{..} stateRef tidRef = do
threadDelay reaperDelay
-- Getting the current jobs. Push an empty job to the reference.
wl <- atomicModifyIORef' stateRef swapWithEmpty
-- Do the jobs. A function to merge the left jobs and
-- new jobs is returned.
merge <- reaperAction wl
!merge <- reaperAction wl
-- Merging the left jobs and new jobs.
-- If there is no jobs, this thread finishes.
join $ atomicModifyIORef' stateRef (check merge)
next <- atomicModifyIORef' stateRef (check merge)
next
where
swapWithEmpty NoReaper = error "Control.Reaper.reaper: unexpected NoReaper (1)"
swapWithEmpty (Workload wl) = (Workload reaperEmpty, wl)

check _ NoReaper = error "Control.Reaper.reaper: unexpected NoReaper (2)"
check merge (Workload wl)
-- If there is no job, reaper is terminated.
| reaperNull wl' = (NoReaper, return ())
| reaperNull wl' = (NoReaper, writeIORef tidRef Nothing)
-- If there are jobs, carry them out.
| otherwise = (Workload wl', reaper settings stateRef)
| otherwise = (Workload wl', reaper settings stateRef tidRef)
where
wl' = merge wl

Expand All @@ -175,8 +194,8 @@ mkListAction :: (item -> IO (Maybe item'))
mkListAction f =
go id
where
go front [] = return front
go front (x:xs) = do
go !front [] = return front
go !front (x:xs) = do
my <- f x
let front' =
case my of
Expand Down
79 changes: 34 additions & 45 deletions warp/Network/Wai/Handler/Warp/Timeout.hs
@@ -1,7 +1,4 @@
{-# LANGUAGE CPP #-}
{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE MagicHash #-}
{-# LANGUAGE UnboxedTuples #-} -- for GHC 7.4 or earlier

module Network.Wai.Handler.Warp.Timeout (
-- ** Types
Expand All @@ -11,6 +8,7 @@ module Network.Wai.Handler.Warp.Timeout (
-- ** Manager
, initialize
, stopManager
, killManager
, withManager
-- ** Registration
, register
Expand All @@ -24,25 +22,12 @@ module Network.Wai.Handler.Warp.Timeout (
, TimeoutThread (..)
) where

#ifndef MIN_VERSION_base
#define MIN_VERSION_base(x,y,z) 1
#endif

#if MIN_VERSION_base(4,6,0)
import Control.Concurrent (mkWeakThreadId, ThreadId)
#else
import GHC.Conc (ThreadId(..))
import GHC.Exts (mkWeak#)
import GHC.IO (IO (IO))
#endif
import Control.Concurrent (myThreadId)
import qualified Control.Exception as E
import GHC.Weak (Weak (..))
import Control.Reaper
import Data.Typeable (Typeable)
import Network.Wai.Handler.Warp.IORef (IORef)
import qualified Network.Wai.Handler.Warp.IORef as I
import System.Mem.Weak (deRefWeak)
import Data.Typeable (Typeable)
import Control.Reaper

----------------------------------------------------------------

Expand All @@ -53,7 +38,7 @@ type Manager = Reaper [Handle] Handle
type TimeoutAction = IO ()

-- | A handle used by 'Manager'
data Handle = Handle TimeoutAction (IORef State)
data Handle = Handle !(IORef TimeoutAction) !(IORef State)

data State = Active -- Manager turns it to Inactive.
| Inactive -- Manager removes it with timeout action.
Expand All @@ -70,10 +55,11 @@ initialize timeout = mkReaper defaultReaperSettings
, reaperDelay = timeout
}
where
prune m@(Handle onTimeout iactive) = do
state <- I.atomicModifyIORef' iactive (\x -> (inactivate x, x))
prune m@(Handle actionRef stateRef) = do
state <- I.atomicModifyIORef' stateRef (\x -> (inactivate x, x))
case state of
Inactive -> do
onTimeout <- I.readIORef actionRef
onTimeout `E.catch` ignoreAll
return Nothing
Canceled -> return Nothing
Expand All @@ -84,68 +70,71 @@ initialize timeout = mkReaper defaultReaperSettings

----------------------------------------------------------------

-- | Stopping timeout manager.
-- | Stopping timeout manager with onTimeout fired.
stopManager :: Manager -> IO ()
stopManager mgr = E.mask_ (reaperStop mgr >>= mapM_ fire)
where
fire (Handle onTimeout _) = onTimeout `E.catch` ignoreAll
fire (Handle actionRef _) = do
onTimeout <- I.readIORef actionRef
onTimeout `E.catch` ignoreAll

ignoreAll :: E.SomeException -> IO ()
ignoreAll _ = return ()

-- | Killing timeout manager immediately without firing onTimeout.
killManager :: Manager -> IO ()
killManager mgr = E.mask_ (reaperStop mgr >>= mapM_ fire)
where
fire (Handle actionRef _) = do
onTimeout <- I.readIORef actionRef
onTimeout `E.catch` ignoreAll

----------------------------------------------------------------

-- | Registering a timeout action.
register :: Manager -> TimeoutAction -> IO Handle
register mgr onTimeout = do
iactive <- I.newIORef Active
let h = Handle onTimeout iactive
actionRef <- I.newIORef onTimeout
stateRef <- I.newIORef Active
let h = Handle actionRef stateRef
reaperAdd mgr h
return h

-- | Registering a timeout action of killing this thread.
registerKillThread :: Manager -> IO Handle
registerKillThread m = do
wtid <- myThreadId >>= mkWeakThreadId
register m $ killIfExist wtid

-- If ThreadId is hold referred by a strong reference,
-- it leaks even after the thread is killed.
-- So, let's use a weak reference so that CG can throw ThreadId away.
-- deRefWeak checks if ThreadId referenced by the weak reference
-- exists. If exists, it means that the thread is alive.
killIfExist :: Weak ThreadId -> TimeoutAction
killIfExist wtid = deRefWeak wtid >>= maybe (return ()) (`E.throwTo` TimeoutThread)
-- If we hold ThreadId, the stack and data of the thread is leaked.
-- If we hold Weak ThreadId, the stack is released. However, its
-- data is still leaked probably because of a bug of GHC.
-- So, let's just use ThreadId and release ThreadId by
-- overriding the timeout action by "cancel".
tid <- myThreadId
register m $ E.throwTo tid TimeoutThread

data TimeoutThread = TimeoutThread
deriving Typeable
instance E.Exception TimeoutThread
instance Show TimeoutThread where
show TimeoutThread = "Thread killed by Warp's timeout reaper"

#if !MIN_VERSION_base(4,6,0)
mkWeakThreadId :: ThreadId -> IO (Weak ThreadId)
mkWeakThreadId t@(ThreadId t#) = IO $ \s ->
case mkWeak# t# t Nothing s of
(# s1, w #) -> (# s1, Weak w #)
#endif

----------------------------------------------------------------

-- | Setting the state to active.
-- 'Manager' turns active to inactive repeatedly.
tickle :: Handle -> IO ()
tickle (Handle _ iactive) = I.writeIORef iactive Active
tickle (Handle _ stateRef) = I.writeIORef stateRef Active

-- | Setting the state to canceled.
-- 'Manager' eventually removes this without timeout action.
cancel :: Handle -> IO ()
cancel (Handle _ iactive) = I.writeIORef iactive Canceled
cancel (Handle actionRef stateRef) = do
I.writeIORef actionRef (return ()) -- ensuring to release ThreadId
I.writeIORef stateRef Canceled

-- | Setting the state to paused.
-- 'Manager' does not change the value.
pause :: Handle -> IO ()
pause (Handle _ iactive) = I.writeIORef iactive Paused
pause (Handle _ stateRef) = I.writeIORef stateRef Paused

-- | Setting the paused state to active.
-- This is an alias to 'tickle'.
Expand Down

0 comments on commit 0cd3664

Please sign in to comment.