Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Close all streams on termination #83

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions Network/HTTP2/Arch/Manager.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ module Network.HTTP2.Arch.Manager (
, Action
, start
, setAction
, stop
, stopAfter
, spawnAction
, forkManaged
, forkManagedUnmask
Expand Down Expand Up @@ -38,7 +38,7 @@ type Action = IO ()
noAction :: Action
noAction = return ()

data Command = Stop | Spawn | Add ThreadId | Delete ThreadId
data Command = Stop (Maybe SomeException) | Spawn | Add ThreadId | Delete ThreadId

-- | Manager to manage the thread pool and the timer.
data Manager = Manager (TQueue Command) (IORef Action) T.Manager
Expand All @@ -57,7 +57,7 @@ start timmgr = do
go q tset0 ref = do
x <- atomically $ readTQueue q
case x of
Stop -> kill tset0
Stop err -> kill tset0 err
Spawn -> next tset0
Add newtid -> let tset = add newtid tset0
in go q tset ref
Expand All @@ -75,8 +75,12 @@ setAction :: Manager -> Action -> IO ()
setAction (Manager _ ref _) action = writeIORef ref action

-- | Stopping the manager.
stop :: Manager -> IO ()
stop (Manager q _ _) = atomically $ writeTQueue q Stop
stopAfter :: Manager -> IO a -> (Either SomeException a -> IO b) -> IO b
stopAfter (Manager q _ _) action cleanup = do
mask $ \unmask -> do
ma <- try $ unmask action
atomically $ writeTQueue q $ Stop (either Just (const Nothing) ma)
cleanup ma

-- | Spawning the action.
spawnAction :: Manager -> IO ()
Expand Down Expand Up @@ -132,8 +136,8 @@ del tid set = set'
where
set' = Set.delete tid set

kill :: Set ThreadId -> IO ()
kill set = traverse_ (\tid -> E.throwTo tid KilledByHttp2ThreadPoolManager) set
kill :: Set ThreadId -> Maybe SomeException -> IO ()
kill set err = traverse_ (\tid -> E.throwTo tid $ KilledByHttp2ThreadPoolManager err) set

-- | Killing the IO action of the second argument on timeout.
timeoutKillThread :: Manager -> (T.Handle -> IO ()) -> IO ()
Expand All @@ -148,7 +152,7 @@ timeoutClose (Manager _ _ tmgr) closer = do
th <- T.register tmgr closer
return $ T.tickle th

data KilledByHttp2ThreadPoolManager = KilledByHttp2ThreadPoolManager
data KilledByHttp2ThreadPoolManager = KilledByHttp2ThreadPoolManager (Maybe SomeException)
deriving Show

instance Exception KilledByHttp2ThreadPoolManager where
Expand Down
22 changes: 14 additions & 8 deletions Network/HTTP2/Arch/Receiver.hs
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ stream FrameHeaders header@FrameHeader{flags,streamId} bs ctx (Open (Body q _ _
if endOfStream then do
tbl <- hpackDecodeTrailer frag streamId ctx
writeIORef tlr (Just tbl)
atomically $ writeTQueue q ""
atomically $ writeTQueue q $ Right ""
return HalfClosedRemote
else
-- we don't support continuation here.
Expand Down Expand Up @@ -412,13 +412,13 @@ stream FrameData
E.throwIO $ ConnectionErrorIsSent ProtocolError streamId "too many empty data"
else do
writeIORef bodyLength len
atomically $ writeTQueue q body
atomically $ writeTQueue q $ Right body
if endOfStream then do
case mcl of
Nothing -> return ()
Just cl -> when (cl /= len) $ E.throwIO $ StreamErrorIsSent ProtocolError streamId "actual body length is not the same as content-length"
-- no trailers
atomically $ writeTQueue q ""
atomically $ writeTQueue q $ Right ""
return HalfClosedRemote
else
return s
Expand Down Expand Up @@ -498,11 +498,11 @@ stream _ FrameHeader{streamId} _ _ _ _ = E.throwIO $ StreamErrorIsSent ProtocolE

-- | Type for input streaming.
data Source = Source (Int -> IO ())
(TQueue ByteString)
(TQueue (Either E.SomeException ByteString))
(IORef ByteString)
(IORef Bool)

mkSource :: TQueue ByteString -> (Int -> IO ()) -> IO Source
mkSource :: TQueue (Either E.SomeException ByteString) -> (Int -> IO ()) -> IO Source
mkSource q inform = Source inform q <$> newIORef "" <*> newIORef False

readSource :: Source -> IO ByteString
Expand All @@ -516,12 +516,18 @@ readSource (Source inform q refBS refEOF) = do
inform len
return bs
where
readBS :: IO ByteString
readBS = do
bs0 <- readIORef refBS
if bs0 == "" then do
bs <- atomically $ readTQueue q
when (bs == "") $ writeIORef refEOF True
return bs
mBS <- atomically $ readTQueue q
case mBS of
Left err -> do
writeIORef refEOF True
E.throwIO err
Right bs -> do
when (bs == "") $ writeIORef refEOF True
return bs
else do
writeIORef refBS ""
return bs0
19 changes: 19 additions & 0 deletions Network/HTTP2/Arch/Stream.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

module Network.HTTP2.Arch.Stream where

import Control.Exception
import Data.IORef
import qualified Data.IntMap.Strict as M
import UnliftIO.Concurrent
Expand Down Expand Up @@ -75,3 +76,21 @@ updateAllStreamWindow :: (WindowSize -> WindowSize) -> StreamTable -> IO ()
updateAllStreamWindow adst (StreamTable ref) = do
strms <- M.elems <$> readIORef ref
forM_ strms $ \strm -> atomically $ modifyTVar (streamWindow strm) adst

closeAllStreams :: StreamTable -> Maybe SomeException -> IO ()
closeAllStreams (StreamTable ref) mErr' = do
strms <- atomicModifyIORef' ref $ \m -> (M.empty, m)
forM_ strms $ \strm -> do
st <- readStreamState strm
case st of
Open (Body q _ _ _) ->
atomically $ writeTQueue q $ maybe (Right mempty) Left mErr
_otherwise ->
return ()
where
mErr :: Maybe SomeException
mErr = case mErr' of
Just err | Just ConnectionIsClosed <- fromException err ->
Nothing
_otherwise ->
mErr'
2 changes: 1 addition & 1 deletion Network/HTTP2/Arch/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ data OpenState =
Bool -- End of stream
| NoBody HeaderTable
| HasBody HeaderTable
| Body (TQueue ByteString)
| Body (TQueue (Either SomeException ByteString))
(Maybe Int) -- received Content-Length
-- compared the body length for error checking
(IORef Int) -- actual body length
Expand Down
15 changes: 10 additions & 5 deletions Network/HTTP2/Client/Run.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
module Network.HTTP2.Client.Run where

import Control.Concurrent.STM (check)
import Control.Exception
import UnliftIO.Async
import UnliftIO.Concurrent
import qualified UnliftIO.Exception as E
import UnliftIO.STM

import Imports
Expand Down Expand Up @@ -39,10 +39,15 @@ run ClientConfig{..} conf@Config{..} client = do
let frame = goawayFrame 0 NoError "graceful closing"
enqueueControl (controlQ ctx) $ CFrames Nothing [frame]
return x
ex <- race runBackgroundThreads runClient `E.finally` stop mgr
case ex of
Left () -> undefined -- never reach
Right x -> return x
stopAfter mgr (race runBackgroundThreads runClient) $ \res -> do
closeAllStreams (streamTable ctx) $ either Just (const Nothing) res
case res of
Left err ->
throwIO err
Right (Left ()) ->
undefined -- never reach
Right (Right x) ->
return x

sendRequest :: Context -> Manager -> Scheme -> Authority -> Request -> (Response -> IO a) -> IO a
sendRequest ctx@Context{..} mgr scheme auth (Request req) processResponse = do
Expand Down
10 changes: 8 additions & 2 deletions Network/HTTP2/Server/Run.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
module Network.HTTP2.Server.Run where

import UnliftIO.Async (concurrently_)
import qualified UnliftIO.Exception as E

import Imports
import Network.HTTP2.Arch
import Network.HTTP2.Frame
import Network.HTTP2.Server.Types
import Network.HTTP2.Server.Worker
import Control.Exception

----------------------------------------------------------------

Expand All @@ -33,7 +33,13 @@ run conf@Config{..} server = do
replicateM_ 3 $ spawnAction mgr
let runReceiver = frameReceiver ctx conf
runSender = frameSender ctx conf mgr
concurrently_ runReceiver runSender `E.finally` stop mgr
stopAfter mgr (concurrently_ runReceiver runSender) $ \res -> do
closeAllStreams (streamTable ctx) $ either Just (const Nothing) res
case res of
Left err ->
throwIO err
Right x ->
return x
where
checkPreface = do
preface <- confReadN connectionPrefaceLength
Expand Down
Loading