Permalink
Browse files

Closing a manager is fully atomic

  • Loading branch information...
1 parent 7d67c7d commit 10ae73b53de300086aa7b6f1f6593d2394882fc7 @snoyberg committed Jan 13, 2012
Showing with 35 additions and 24 deletions.
  1. +35 −24 Network/HTTP/Conduit/Manager.hs
@@ -38,9 +38,8 @@ import Control.Monad.Trans.Resource
, newRef, readRef', writeRef
, safeFromIOBase
)
-import Control.Concurrent (forkIO, ThreadId, killThread, threadDelay)
+import Control.Concurrent (forkIO, threadDelay)
import Data.Time (UTCTime, getCurrentTime, addUTCTime)
-import Control.Monad (forever)
import Network (connectTo, PortID (PortNumber))
import Data.Certificate.X509 (X509)
@@ -53,9 +52,10 @@ import Network.HTTP.Conduit.Request
-- | Keeps track of open connections for keep-alive. May be used
-- concurrently by multiple threads.
data Manager = Manager
- { mConns :: !(I.IORef (Map.Map ConnKey (NonEmptyList ConnInfo)))
+ { mConns :: !(I.IORef (Maybe (Map.Map ConnKey (NonEmptyList ConnInfo))))
+ -- ^ @Nothing@ indicates that the manager is closed.
, mMaxConns :: !Int
- , mReaper :: ThreadId
+ -- ^ This is a per-@ConnKey@ value.
}
data NonEmptyList a =
@@ -71,24 +71,26 @@ takeSocket :: Manager -> ConnKey -> IO (Maybe ConnInfo)
takeSocket man key =
I.atomicModifyIORef (mConns man) go
where
- go m =
+ go Nothing = (Nothing, Nothing)
+ go (Just m) =
case Map.lookup key m of
- Nothing -> (m, Nothing)
- Just (One a _) -> (Map.delete key m, Just a)
- Just (Cons a _ _ rest) -> (Map.insert key rest m, Just a)
+ Nothing -> (Just m, Nothing)
+ Just (One a _) -> (Just $ Map.delete key m, Just a)
+ Just (Cons a _ _ rest) -> (Just $ Map.insert key rest m, Just a)
putSocket :: Manager -> ConnKey -> ConnInfo -> IO ()
putSocket man key ci = do
now <- getCurrentTime
msock <- I.atomicModifyIORef (mConns man) (go now)
maybe (return ()) connClose msock
where
- go now m =
+ go _ Nothing = (Nothing, Just ci)
+ go now (Just m) =
case Map.lookup key m of
- Nothing -> (Map.insert key (One ci now) m, Nothing)
+ Nothing -> (Just $ Map.insert key (One ci now) m, Nothing)
Just l ->
let (l', mx) = addToList now (mMaxConns man) ci l
- in (Map.insert key l' m, mx)
+ in (Just $ Map.insert key l' m, mx)
-- | Add a new element to the list, up to the given maximum number. If we're
-- already at the maximum, return the new value as leftover.
@@ -110,19 +112,29 @@ newManagerCount count = snd <$> withIO (newManagerIO count) closeManager
-- | Create a 'Manager' which will never be destroyed.
newManagerIO :: Int -> IO Manager
newManagerIO count = do
- mapRef <- I.newIORef Map.empty
- reaper <- forkIO $ reap mapRef
- return $ Manager mapRef count reaper
+ mapRef <- I.newIORef (Just Map.empty)
+ _ <- forkIO $ reap mapRef
+ return $ Manager mapRef count
-- | Collect and destroy any stale connections.
-reap :: I.IORef (Map.Map ConnKey (NonEmptyList ConnInfo)) -> IO ()
-reap mapRef = forever $ mask_ $ do
- threadDelay (5 * 1000 * 1000)
- now <- getCurrentTime
- let isNotStale time = 30 `addUTCTime` time >= now
- toDestroy <- I.atomicModifyIORef mapRef (findStale isNotStale)
- mapM_ safeConnClose toDestroy
+reap :: I.IORef (Maybe (Map.Map ConnKey (NonEmptyList ConnInfo))) -> IO ()
+reap mapRef =
+ mask_ loop
where
+ loop = do
+ threadDelay (5 * 1000 * 1000)
+ now <- getCurrentTime
+ let isNotStale time = 30 `addUTCTime` time >= now
+ mtoDestroy <- I.atomicModifyIORef mapRef (findStaleWrap isNotStale)
+ case mtoDestroy of
+ Nothing -> return () -- manager is closed
+ Just toDestroy -> do
+ mapM_ safeConnClose toDestroy
+ loop
+ findStaleWrap _ Nothing = (Nothing, Nothing)
+ findStaleWrap isNotStale (Just m) =
+ let (x, y) = findStale isNotStale m
+ in (Just x, Just y)
findStale isNotStale =
findStale' id id . Map.toList
where
@@ -163,9 +175,8 @@ withManager f = runResourceT $ newManager >>= f
-- 'Manager' can be reused if desired.
closeManager :: Manager -> IO ()
closeManager manager = do
@meteficha

meteficha Jan 14, 2012

Contributor

I guess we need mask here to avoid receiving an async exception before closing all connections.

@snoyberg

snoyberg Jan 14, 2012

Owner

Good catch, thanks!

- killThread $ mReaper manager
- m <- I.atomicModifyIORef (mConns manager) $ \x -> (Map.empty, x)
- mapM_ (nonEmptyMapM_ safeConnClose) $ Map.elems m
+ m <- I.atomicModifyIORef (mConns manager) $ \x -> (Nothing, x)
+ mapM_ (nonEmptyMapM_ safeConnClose) $ maybe [] Map.elems m
safeConnClose :: ConnInfo -> IO ()
safeConnClose ci = connClose ci `catch` \(_::SomeException) -> return ()

1 comment on commit 10ae73b

Contributor

meteficha commented on 10ae73b Jan 14, 2012

Good move!

Please sign in to comment.