Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce forkManaged #74

Merged
merged 2 commits into from
Apr 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion Network/HTTP2/Arch/Manager.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ module Network.HTTP2.Arch.Manager (
, setAction
, stop
, spawnAction
, addMyId
, forkManaged
, deleteMyId
, timeoutKillThread
, timeoutClose
) where

import Control.Exception
import Data.Foldable
import Data.IORef
import Data.Set (Set)
Expand Down Expand Up @@ -77,13 +78,34 @@ stop (Manager q _ _) = atomically $ writeTQueue q Stop
spawnAction :: Manager -> IO ()
spawnAction (Manager q _ _) = atomically $ writeTQueue q Spawn

----------------------------------------------------------------

-- | Fork managed thread
--
-- This guarantees that the thread ID is added to the manager's queue before
-- the thread starts, and is removed again when the thread terminates
-- (normally or abnormally).
forkManaged :: Manager -> IO () -> IO ()
forkManaged mgr io =
void $ mask_ $ forkIOWithUnmask $ \unmask -> do
addMyId mgr
r <- unmask io `onException` deleteMyId mgr
deleteMyId mgr
return r

-- | Adding my thread id to the kill-thread list on stopping.
--
-- This is not part of the public API; see 'forkManaged' instead.
addMyId :: Manager -> IO ()
addMyId (Manager q _ _) = do
tid <- myThreadId
atomically $ writeTQueue q $ Add tid

-- | Deleting my thread id from the kill-thread list on stopping.
--
-- This is /only/ necessary when you want to remove the thread's ID from
-- the manager /before/ the thread terminates (thereby assuming responsibility
-- for thread cleanup yourself).
deleteMyId :: Manager -> IO ()
deleteMyId (Manager q _ _) = do
tid <- myThreadId
Expand Down
10 changes: 2 additions & 8 deletions Network/HTTP2/Arch/Queue.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,17 @@

module Network.HTTP2.Arch.Queue where

import UnliftIO.Concurrent (forkIO)
import UnliftIO.Exception (bracket)
import UnliftIO.STM

import Imports
import Network.HTTP2.Arch.Manager
import Network.HTTP2.Arch.Types

{-# INLINE forkAndEnqueueWhenReady #-}
forkAndEnqueueWhenReady :: IO () -> TQueue (Output Stream) -> Output Stream -> Manager -> IO ()
forkAndEnqueueWhenReady wait outQ out mgr = bracket setup teardown $ \_ ->
void . forkIO $ do
forkAndEnqueueWhenReady wait outQ out mgr =
forkManaged mgr $ do
wait
enqueueOutput outQ out
where
setup = addMyId mgr
teardown _ = deleteMyId mgr

{-# INLINE enqueueOutput #-}
enqueueOutput :: TQueue (Output Stream) -> Output Stream -> IO ()
Expand Down
4 changes: 1 addition & 3 deletions Network/HTTP2/Client/Run.hs
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,7 @@ sendRequest ctx@Context{..} mgr scheme auth (Request req) processResponse = do
OutBodyStreaming strmbdy -> do
tbq <- newTBQueueIO 10 -- fixme: hard coding: 10
tbqNonMmpty <- newTVarIO False
let setup = addMyId mgr
let teardown _ = deleteMyId mgr
E.bracket setup teardown $ \_ -> void $ forkIO $ do
forkManaged mgr $ do
let push b = atomically $ do
writeTBQueue tbq (StreamingBuilder b)
writeTVar tbqNonMmpty True
Expand Down
3 changes: 3 additions & 0 deletions Network/HTTP2/Server/Worker.hs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ response wc@WorkerConf{..} mgr th tconf strm (Request req) (Response rsp) pps =
flush = atomically $ writeTBQueue tbq StreamingFlush
strmbdy push flush
atomically $ writeTBQueue tbq StreamingFinished
-- Remove the thread's ID from the manager's queue, to ensure the that the
-- manager will not terminate it before we are done. (The thread ID was
-- added implicitly when the worker was spawned by the manager).
deleteMyId mgr
where
(_,reqvt) = inpObjHeaders req
Expand Down
3 changes: 2 additions & 1 deletion test/HTTP2/ClientSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ runClient sc au hd = runTCPClient host port $ runHTTP2Client
cliconf = ClientConfig sc au 20
runHTTP2Client s = E.bracket (allocSimpleConfig s 4096)
freeSimpleConfig
(\conf -> run cliconf conf client)
(\conf -> run cliconf conf $ \sendRequest ->
client sendRequest)
client sendRequest = do
let req = requestNoBody methodGet "/" hd
sendRequest req $ \rsp -> do
Expand Down