Skip to content

Commit

Permalink
Merge PR #119
Browse files Browse the repository at this point in the history
  • Loading branch information
kazu-yamamoto committed Jun 7, 2024
2 parents f777d5e + 597261d commit 398a5c5
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 36 deletions.
13 changes: 2 additions & 11 deletions Network/HTTP2/Client/Run.hs
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,6 @@ sendRequest ctx@Context{..} mgr scheme auth (Request req) = do
-- the ordering of responses can be out-of-order.
-- But for clients, the ordering must be maintained.
-- To implement this, 'outputQStreamID' is used.
-- Also, for 'OutBodyStreaming', TBQ must not be empty
-- when its 'Output' is enqueued into 'outputQ'.
-- Otherwise, it would be re-enqueue because of empty
-- resulting in out-of-order.
-- To implement this, 'tbqNonEmpty' is used.
let hdr1, hdr2 :: [Header]
hdr1
| scheme /= "" = (":scheme", scheme) : hdr0
Expand Down Expand Up @@ -208,19 +203,15 @@ sendStreaming
-> IO ()
sendStreaming Context{..} mgr req sid newstrm strmbdy = do
tbq <- newTBQueueIO 10 -- fixme: hard coding: 10
tbqNonEmpty <- newTVarIO False
forkManagedUnmask mgr $ \unmask -> do
let push b = atomically $ do
writeTBQueue tbq (StreamingBuilder b)
writeTVar tbqNonEmpty True
let push b = atomically $ writeTBQueue tbq (StreamingBuilder b)
flush = atomically $ writeTBQueue tbq StreamingFlush
finished = atomically $ writeTBQueue tbq $ StreamingFinished (decCounter mgr)
incCounter mgr
strmbdy unmask push flush `finally` finished
atomically $ do
sidOK <- readTVar outputQStreamID
ready <- readTVar tbqNonEmpty
check (sidOK == sid && ready)
check (sidOK == sid)
writeTVar outputQStreamID (sid + 2)
writeTQueue outputQ $ Output newstrm req OObj (Just tbq) (return ())

Expand Down
4 changes: 3 additions & 1 deletion Network/HTTP2/H2/Receiver.hs
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,9 @@ data Source
(IORef Bool)

mkSource
:: TQueue (Either E.SomeException (ByteString, Bool)) -> (Int -> IO ()) -> IO Source
:: TQueue (Either E.SomeException (ByteString, Bool))
-> (Int -> IO ())
-> IO Source
mkSource q inform = Source inform q <$> newIORef "" <*> newIORef False

readSource :: Source -> IO (ByteString, Bool)
Expand Down
64 changes: 40 additions & 24 deletions Network/HTTP2/H2/Sender.hs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,26 @@ frameSender
sentinel
out
reqflush
output out@(Output strm (OutObj hdr body tlrmkr) OObj mtbq _) off0 lim = do
output (Output strm obj OObj mtbq sentinel) off0 _lim = do
outputObj strm obj mtbq sentinel off0
output out@(Output strm _ (OPush ths pid) _ _) off0 lim = do
-- Creating a push promise header
-- Frame id should be associated stream id from the client.
let sid = streamNumber strm
len <- pushPromise pid sid ths off0
off <- flushIfNecessary $ off0 + frameHeaderLength + len
output out{outputType = OObj} off lim
output _ _ _ = undefined -- never reach

----------------------------------------------------------------
outputObj
:: Stream
-> OutObj
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Offset
-> IO Offset
outputObj strm obj@(OutObj hdr body tlrmkr) mtbq sentinel off0 = do
-- Header frame and Continuation frame
let sid = streamNumber strm
endOfStream = case body of
Expand All @@ -180,6 +199,7 @@ frameSender
-- the stream from stream table.
when endOfStream $ halfClosedLocal ctx strm Finished
off <- flushIfNecessary off'
let setOutputType otyp = Output strm obj otyp mtbq sentinel
case body of
OutBodyNone -> return off
OutBodyFile (FileSpec path fileoff bytecount) -> do
Expand All @@ -188,40 +208,33 @@ frameSender
Closer closer -> timeoutClose mgr closer
Refresher refresher -> return refresher
let next = fillFileBodyGetNext pread fileoff bytecount refresh
out' = out{outputType = ONext next tlrmkr}
output out' off lim
out' = setOutputType $ ONext next tlrmkr
outputOrEnqueueAgain out' off
OutBodyBuilder builder -> do
let next = fillBuilderBodyGetNext builder
out' = out{outputType = ONext next tlrmkr}
output out' off lim
OutBodyStreaming _ ->
output (setNextForStreaming mtbq tlrmkr out) off lim
OutBodyStreamingUnmask _ ->
output (setNextForStreaming mtbq tlrmkr out) off lim
output out@(Output strm _ (OPush ths pid) _ _) off0 lim = do
-- Creating a push promise header
-- Frame id should be associated stream id from the client.
let sid = streamNumber strm
len <- pushPromise pid sid ths off0
off <- flushIfNecessary $ off0 + frameHeaderLength + len
output out{outputType = OObj} off lim
output _ _ _ = undefined -- never reach
out' = setOutputType $ ONext next tlrmkr
outputOrEnqueueAgain out' off
OutBodyStreaming _ -> do
let out' = setOutputType $ nextForStreaming mtbq tlrmkr
outputOrEnqueueAgain out' off
OutBodyStreamingUnmask _ -> do
let out' = setOutputType $ nextForStreaming mtbq tlrmkr
outputOrEnqueueAgain out' off

----------------------------------------------------------------
setNextForStreaming
nextForStreaming
:: Maybe (TBQueue StreamingChunk)
-> TrailersMaker
-> Output Stream
-> Output Stream
setNextForStreaming mtbq tlrmkr out =
-> OutputType
nextForStreaming mtbq tlrmkr =
let tbq = fromJust mtbq
takeQ = atomically $ tryReadTBQueue tbq
next = fillStreamBodyGetNext takeQ
in out{outputType = ONext next tlrmkr}
in ONext next tlrmkr

----------------------------------------------------------------
outputOrEnqueueAgain :: Output Stream -> Offset -> IO Offset
outputOrEnqueueAgain out@(Output strm _ otyp _ _) off = E.handle resetStream $ do
outputOrEnqueueAgain out@(Output strm obj otyp mtbq sentinel) off = E.handle resetStream $ do
state <- readStreamState strm
if isHalfClosedLocal state
then return off
Expand All @@ -230,11 +243,14 @@ frameSender
-- Checking if all push are done.
forkAndEnqueueWhenReady wait outputQ out{outputType = OObj} mgr
return off
OObj ->
-- Send headers immediately, without waiting for data
-- No need to check the streaming window (applies to DATA frames only)
outputObj strm obj mtbq sentinel off
_ -> case mtbq of
Just tbq -> checkStreaming tbq
_ -> checkStreamWindowSize
where
mtbq = outputStrmQ out
checkStreaming tbq = do
isEmpty <- atomically $ isEmptyTBQueue tbq
if isEmpty
Expand Down

0 comments on commit 398a5c5

Please sign in to comment.