From fae7852296db76e7c2aa8f858390bf768bcfd79f Mon Sep 17 00:00:00 2001 From: Kazu Yamamoto Date: Wed, 16 Dec 2015 11:34:33 +0900 Subject: [PATCH 1/6] introducing IORef to onTimeout so that cancel can override onTimeout. --- warp/Network/Wai/Handler/Warp/Timeout.hs | 68 +++++++++--------------- 1 file changed, 24 insertions(+), 44 deletions(-) diff --git a/warp/Network/Wai/Handler/Warp/Timeout.hs b/warp/Network/Wai/Handler/Warp/Timeout.hs index be8191682..5a611530e 100644 --- a/warp/Network/Wai/Handler/Warp/Timeout.hs +++ b/warp/Network/Wai/Handler/Warp/Timeout.hs @@ -1,7 +1,4 @@ -{-# LANGUAGE CPP #-} {-# LANGUAGE DeriveDataTypeable #-} -{-# LANGUAGE MagicHash #-} -{-# LANGUAGE UnboxedTuples #-} -- for GHC 7.4 or earlier module Network.Wai.Handler.Warp.Timeout ( -- ** Types @@ -24,25 +21,12 @@ module Network.Wai.Handler.Warp.Timeout ( , TimeoutThread (..) ) where -#ifndef MIN_VERSION_base -#define MIN_VERSION_base(x,y,z) 1 -#endif - -#if MIN_VERSION_base(4,6,0) -import Control.Concurrent (mkWeakThreadId, ThreadId) -#else -import GHC.Conc (ThreadId(..)) -import GHC.Exts (mkWeak#) -import GHC.IO (IO (IO)) -#endif import Control.Concurrent (myThreadId) import qualified Control.Exception as E -import GHC.Weak (Weak (..)) +import Control.Reaper +import Data.Typeable (Typeable) import Network.Wai.Handler.Warp.IORef (IORef) import qualified Network.Wai.Handler.Warp.IORef as I -import System.Mem.Weak (deRefWeak) -import Data.Typeable (Typeable) -import Control.Reaper ---------------------------------------------------------------- @@ -53,7 +37,7 @@ type Manager = Reaper [Handle] Handle type TimeoutAction = IO () -- | A handle used by 'Manager' -data Handle = Handle TimeoutAction (IORef State) +data Handle = Handle !(IORef TimeoutAction) !(IORef State) data State = Active -- Manager turns it to Inactive. | Inactive -- Manager removes it with timeout action. @@ -70,10 +54,11 @@ initialize timeout = mkReaper defaultReaperSettings , reaperDelay = timeout } where - prune m@(Handle onTimeout iactive) = do - state <- I.atomicModifyIORef' iactive (\x -> (inactivate x, x)) + prune m@(Handle actionRef stateRef) = do + state <- I.atomicModifyIORef' stateRef (\x -> (inactivate x, x)) case state of Inactive -> do + onTimeout <- I.readIORef actionRef onTimeout `E.catch` ignoreAll return Nothing Canceled -> return Nothing @@ -88,7 +73,9 @@ initialize timeout = mkReaper defaultReaperSettings stopManager :: Manager -> IO () stopManager mgr = E.mask_ (reaperStop mgr >>= mapM_ fire) where - fire (Handle onTimeout _) = onTimeout `E.catch` ignoreAll + fire (Handle actionRef _) = do + onTimeout <- I.readIORef actionRef + onTimeout `E.catch` ignoreAll ignoreAll :: E.SomeException -> IO () ignoreAll _ = return () @@ -98,24 +85,22 @@ ignoreAll _ = return () -- | Registering a timeout action. register :: Manager -> TimeoutAction -> IO Handle register mgr onTimeout = do - iactive <- I.newIORef Active - let h = Handle onTimeout iactive + actionRef <- I.newIORef onTimeout + stateRef <- I.newIORef Active + let h = Handle actionRef stateRef reaperAdd mgr h return h -- | Registering a timeout action of killing this thread. registerKillThread :: Manager -> IO Handle registerKillThread m = do - wtid <- myThreadId >>= mkWeakThreadId - register m $ killIfExist wtid - --- If ThreadId is hold referred by a strong reference, --- it leaks even after the thread is killed. --- So, let's use a weak reference so that CG can throw ThreadId away. --- deRefWeak checks if ThreadId referenced by the weak reference --- exists. If exists, it means that the thread is alive. -killIfExist :: Weak ThreadId -> TimeoutAction -killIfExist wtid = deRefWeak wtid >>= maybe (return ()) (`E.throwTo` TimeoutThread) + -- If we hold ThreadId, the stack and data of the thread is leaked. + -- If we hold Weak ThreadId, the stack is released. However, its + -- data is still leaked probably because of a bug of GHC. + -- So, let's just use ThreadId and release ThreadId by + -- overriding the timeout action by "cancel". + tid <- myThreadId + register m $ E.throwTo tid TimeoutThread data TimeoutThread = TimeoutThread deriving Typeable @@ -123,29 +108,24 @@ instance E.Exception TimeoutThread instance Show TimeoutThread where show TimeoutThread = "Thread killed by Warp's timeout reaper" -#if !MIN_VERSION_base(4,6,0) -mkWeakThreadId :: ThreadId -> IO (Weak ThreadId) -mkWeakThreadId t@(ThreadId t#) = IO $ \s -> - case mkWeak# t# t Nothing s of - (# s1, w #) -> (# s1, Weak w #) -#endif - ---------------------------------------------------------------- -- | Setting the state to active. -- 'Manager' turns active to inactive repeatedly. tickle :: Handle -> IO () -tickle (Handle _ iactive) = I.writeIORef iactive Active +tickle (Handle _ stateRef) = I.writeIORef stateRef Active -- | Setting the state to canceled. -- 'Manager' eventually removes this without timeout action. cancel :: Handle -> IO () -cancel (Handle _ iactive) = I.writeIORef iactive Canceled +cancel (Handle actionRef stateRef) = do + I.writeIORef actionRef (return ()) -- ensuring to release ThreadId + I.writeIORef stateRef Canceled -- | Setting the state to paused. -- 'Manager' does not change the value. pause :: Handle -> IO () -pause (Handle _ iactive) = I.writeIORef iactive Paused +pause (Handle _ stateRef) = I.writeIORef stateRef Paused -- | Setting the paused state to active. -- This is an alias to 'tickle'. From b9a6b329f8605072f2b20dae3390bfe6fd7b2e7c Mon Sep 17 00:00:00 2001 From: Kazu Yamamoto Date: Wed, 16 Dec 2015 11:11:23 +0900 Subject: [PATCH 2/6] better naming and cosmetic change. --- auto-update/Control/Reaper.hs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/auto-update/Control/Reaper.hs b/auto-update/Control/Reaper.hs index b045ad8ce..eb7b59bc3 100644 --- a/auto-update/Control/Reaper.hs +++ b/auto-update/Control/Reaper.hs @@ -111,7 +111,7 @@ mkReaper :: ReaperSettings workload item -> IO (Reaper workload item) mkReaper settings@ReaperSettings{..} = do stateRef <- newIORef NoReaper return Reaper { - reaperAdd = update settings stateRef + reaperAdd = add settings stateRef , reaperRead = readRef stateRef , reaperStop = stop stateRef } @@ -126,9 +126,9 @@ mkReaper settings@ReaperSettings{..} = do NoReaper -> (NoReaper, reaperEmpty) Workload x -> (Workload reaperEmpty, x) -update :: ReaperSettings workload item -> IORef (State workload) -> item +add :: ReaperSettings workload item -> IORef (State workload) -> item -> IO () -update settings@ReaperSettings{..} stateRef item = +add settings@ReaperSettings{..} stateRef item = mask_ $ join $ atomicModifyIORef' stateRef cons where cons NoReaper = (Workload $ reaperCons item reaperEmpty @@ -157,7 +157,7 @@ reaper settings@ReaperSettings{..} stateRef = do check _ NoReaper = error "Control.Reaper.reaper: unexpected NoReaper (2)" check merge (Workload wl) -- If there is no job, reaper is terminated. - | reaperNull wl' = (NoReaper, return ()) + | reaperNull wl' = (NoReaper, return ()) -- If there are jobs, carry them out. | otherwise = (Workload wl', reaper settings stateRef) where From 6c36bf1711612311c3a91be4a93caccdddcc6266 Mon Sep 17 00:00:00 2001 From: Kazu Yamamoto Date: Wed, 16 Dec 2015 11:13:00 +0900 Subject: [PATCH 3/6] stopping using join. Probably this is not necessary. But I want to ensure proper tail calls. --- auto-update/Control/Reaper.hs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/auto-update/Control/Reaper.hs b/auto-update/Control/Reaper.hs index eb7b59bc3..134231a0c 100644 --- a/auto-update/Control/Reaper.hs +++ b/auto-update/Control/Reaper.hs @@ -24,7 +24,7 @@ module Control.Reaper ( import Control.AutoUpdate.Util (atomicModifyIORef') import Control.Concurrent (forkIO, threadDelay) import Control.Exception (mask_) -import Control.Monad (join, void) +import Control.Monad (void) import Data.IORef (IORef, newIORef, readIORef) -- | Settings for creating a reaper. This type has two parameters: @@ -129,7 +129,9 @@ mkReaper settings@ReaperSettings{..} = do add :: ReaperSettings workload item -> IORef (State workload) -> item -> IO () add settings@ReaperSettings{..} stateRef item = - mask_ $ join $ atomicModifyIORef' stateRef cons + mask_ $ do + next <- atomicModifyIORef' stateRef cons + next where cons NoReaper = (Workload $ reaperCons item reaperEmpty ,spawn settings stateRef) @@ -149,7 +151,8 @@ reaper settings@ReaperSettings{..} stateRef = do merge <- reaperAction wl -- Merging the left jobs and new jobs. -- If there is no jobs, this thread finishes. - join $ atomicModifyIORef' stateRef (check merge) + next <- atomicModifyIORef' stateRef (check merge) + next where swapWithEmpty NoReaper = error "Control.Reaper.reaper: unexpected NoReaper (1)" swapWithEmpty (Workload wl) = (Workload reaperEmpty, wl) From a3ee2c582a016b76c01ee4afe65e946a57390d45 Mon Sep 17 00:00:00 2001 From: Kazu Yamamoto Date: Wed, 16 Dec 2015 11:16:52 +0900 Subject: [PATCH 4/6] making reaper more strict. --- auto-update/Control/Reaper.hs | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/auto-update/Control/Reaper.hs b/auto-update/Control/Reaper.hs index 134231a0c..f28ef62ca 100644 --- a/auto-update/Control/Reaper.hs +++ b/auto-update/Control/Reaper.hs @@ -1,4 +1,5 @@ {-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE BangPatterns #-} -- | This module provides the ability to create reapers: dedicated cleanup -- threads. These threads will automatically spawn and die based on the @@ -133,10 +134,10 @@ add settings@ReaperSettings{..} stateRef item = next <- atomicModifyIORef' stateRef cons next where - cons NoReaper = (Workload $ reaperCons item reaperEmpty - ,spawn settings stateRef) - cons (Workload wl) = (Workload $ reaperCons item wl - ,return ()) + cons NoReaper = let !wl = reaperCons item reaperEmpty + in (Workload wl, spawn settings stateRef) + cons (Workload wl) = let wl' = reaperCons item wl + in (Workload wl', return ()) spawn :: ReaperSettings workload item -> IORef (State workload) -> IO () spawn settings stateRef = void . forkIO $ reaper settings stateRef @@ -148,7 +149,7 @@ reaper settings@ReaperSettings{..} stateRef = do wl <- atomicModifyIORef' stateRef swapWithEmpty -- Do the jobs. A function to merge the left jobs and -- new jobs is returned. - merge <- reaperAction wl + !merge <- reaperAction wl -- Merging the left jobs and new jobs. -- If there is no jobs, this thread finishes. next <- atomicModifyIORef' stateRef (check merge) @@ -178,8 +179,8 @@ mkListAction :: (item -> IO (Maybe item')) mkListAction f = go id where - go front [] = return front - go front (x:xs) = do + go !front [] = return front + go !front (x:xs) = do my <- f x let front' = case my of From 0e7292d3208ea654a80e0473d77338007cbeecfe Mon Sep 17 00:00:00 2001 From: Kazu Yamamoto Date: Wed, 16 Dec 2015 11:46:02 +0900 Subject: [PATCH 5/6] providing a new API - reaperKill. --- auto-update/Control/Reaper.hs | 43 +++++++++++++++++++++++------------ 1 file changed, 29 insertions(+), 14 deletions(-) diff --git a/auto-update/Control/Reaper.hs b/auto-update/Control/Reaper.hs index f28ef62ca..81e3fe7a0 100644 --- a/auto-update/Control/Reaper.hs +++ b/auto-update/Control/Reaper.hs @@ -23,10 +23,9 @@ module Control.Reaper ( ) where import Control.AutoUpdate.Util (atomicModifyIORef') -import Control.Concurrent (forkIO, threadDelay) +import Control.Concurrent (forkIO, threadDelay, killThread, ThreadId) import Control.Exception (mask_) -import Control.Monad (void) -import Data.IORef (IORef, newIORef, readIORef) +import Data.IORef (IORef, newIORef, readIORef, writeIORef) -- | Settings for creating a reaper. This type has two parameters: -- @workload@ gives the entire workload, whereas @item@ gives an @@ -97,6 +96,8 @@ data Reaper workload item = Reaper { -- | Stopping the reaper thread if exists. -- The current workload is returned. , reaperStop :: IO workload + -- | Killing the reaper thread immediately if exists. + , reaperKill :: IO () } -- | State of reaper. @@ -111,10 +112,12 @@ data State workload = NoReaper -- ^ No reaper thread mkReaper :: ReaperSettings workload item -> IO (Reaper workload item) mkReaper settings@ReaperSettings{..} = do stateRef <- newIORef NoReaper + tidRef <- newIORef Nothing return Reaper { - reaperAdd = add settings stateRef + reaperAdd = add settings stateRef tidRef , reaperRead = readRef stateRef , reaperStop = stop stateRef + , reaperKill = kill tidRef } where readRef stateRef = do @@ -126,24 +129,36 @@ mkReaper settings@ReaperSettings{..} = do case mx of NoReaper -> (NoReaper, reaperEmpty) Workload x -> (Workload reaperEmpty, x) + kill tidRef = do + mtid <- readIORef tidRef + case mtid of + Nothing -> return () + Just tid -> killThread tid -add :: ReaperSettings workload item -> IORef (State workload) -> item - -> IO () -add settings@ReaperSettings{..} stateRef item = +add :: ReaperSettings workload item + -> IORef (State workload) -> IORef (Maybe ThreadId) + -> item -> IO () +add settings@ReaperSettings{..} stateRef tidRef item = mask_ $ do next <- atomicModifyIORef' stateRef cons next where cons NoReaper = let !wl = reaperCons item reaperEmpty - in (Workload wl, spawn settings stateRef) + in (Workload wl, spawn settings stateRef tidRef) cons (Workload wl) = let wl' = reaperCons item wl in (Workload wl', return ()) -spawn :: ReaperSettings workload item -> IORef (State workload) -> IO () -spawn settings stateRef = void . forkIO $ reaper settings stateRef +spawn :: ReaperSettings workload item + -> IORef (State workload) -> IORef (Maybe ThreadId) + -> IO () +spawn settings stateRef tidRef = do + tid <- forkIO $ reaper settings stateRef tidRef + writeIORef tidRef $ Just tid -reaper :: ReaperSettings workload item -> IORef (State workload) -> IO () -reaper settings@ReaperSettings{..} stateRef = do +reaper :: ReaperSettings workload item + -> IORef (State workload) -> IORef (Maybe ThreadId) + -> IO () +reaper settings@ReaperSettings{..} stateRef tidRef = do threadDelay reaperDelay -- Getting the current jobs. Push an empty job to the reference. wl <- atomicModifyIORef' stateRef swapWithEmpty @@ -161,9 +176,9 @@ reaper settings@ReaperSettings{..} stateRef = do check _ NoReaper = error "Control.Reaper.reaper: unexpected NoReaper (2)" check merge (Workload wl) -- If there is no job, reaper is terminated. - | reaperNull wl' = (NoReaper, return ()) + | reaperNull wl' = (NoReaper, writeIORef tidRef Nothing) -- If there are jobs, carry them out. - | otherwise = (Workload wl', reaper settings stateRef) + | otherwise = (Workload wl', reaper settings stateRef tidRef) where wl' = merge wl From 9ab44cf4665b71548fd51f54b1d45921605042e0 Mon Sep 17 00:00:00 2001 From: Kazu Yamamoto Date: Wed, 16 Dec 2015 11:49:01 +0900 Subject: [PATCH 6/6] providing killManager. --- warp/Network/Wai/Handler/Warp/Timeout.hs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/warp/Network/Wai/Handler/Warp/Timeout.hs b/warp/Network/Wai/Handler/Warp/Timeout.hs index 5a611530e..075c273a2 100644 --- a/warp/Network/Wai/Handler/Warp/Timeout.hs +++ b/warp/Network/Wai/Handler/Warp/Timeout.hs @@ -8,6 +8,7 @@ module Network.Wai.Handler.Warp.Timeout ( -- ** Manager , initialize , stopManager + , killManager , withManager -- ** Registration , register @@ -69,7 +70,7 @@ initialize timeout = mkReaper defaultReaperSettings ---------------------------------------------------------------- --- | Stopping timeout manager. +-- | Stopping timeout manager with onTimeout fired. stopManager :: Manager -> IO () stopManager mgr = E.mask_ (reaperStop mgr >>= mapM_ fire) where @@ -80,6 +81,14 @@ stopManager mgr = E.mask_ (reaperStop mgr >>= mapM_ fire) ignoreAll :: E.SomeException -> IO () ignoreAll _ = return () +-- | Killing timeout manager immediately without firing onTimeout. +killManager :: Manager -> IO () +killManager mgr = E.mask_ (reaperStop mgr >>= mapM_ fire) + where + fire (Handle actionRef _) = do + onTimeout <- I.readIORef actionRef + onTimeout `E.catch` ignoreAll + ---------------------------------------------------------------- -- | Registering a timeout action.