Skip to content

Commit

Permalink
refactoring window update
Browse files Browse the repository at this point in the history
window update 0 is sent only when stream's bodies are comsumed
  • Loading branch information
kazu-yamamoto committed Apr 12, 2023
1 parent 3e615cc commit 6eccd7f
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 43 deletions.
22 changes: 8 additions & 14 deletions Network/HTTP2/Arch/Receiver.hs
Original file line number Diff line number Diff line change
Expand Up @@ -184,14 +184,14 @@ processState (Open (NoBody tbl@(_,reqvt))) ctx@Context{..} strm@Stream{streamInp
else
putMVar streamInput inpObj
return False
processState (Open (HasBody tbl@(_,reqvt))) ctx@Context{..} strm@Stream{streamInput} streamId = do
processState (Open (HasBody tbl@(_,reqvt))) ctx@Context{..} strm@Stream{streamInput} _streamId = do
let mcl = fst <$> (getHeaderValue tokenContentLength reqvt >>= C8.readInt)
bodyLength <- newIORef 0
tlr <- newIORef Nothing
q <- newTQueueIO
setStreamState ctx strm $ Open (Body q mcl bodyLength tlr)
incref <- newIORef 0
bodySource <- mkSource q $ informWindowUpdate controlQ streamId incref
bodySource <- mkSource q $ informWindowUpdate ctx strm incref
let inpObj = InpObj tbl mcl (readSource bodySource) tlr
if isServer ctx then do
let si = toServerInfo roleInfo
Expand Down Expand Up @@ -300,10 +300,9 @@ control FrameGoAway header bs _ _ = do
else
E.throwIO $ ConnectionErrorIsReceived err sid $ Short.toShort msg

control FrameWindowUpdate header@FrameHeader{streamId} bs ctx _ = do
control FrameWindowUpdate header bs ctx _ = do
WindowUpdateFrame n <- guardIt $ decodeWindowUpdateFrame header bs
w <- increaseConnectionWindowSize ctx n
when (isWindowOverflow w) $ E.throwIO $ ConnectionErrorIsSent FlowControlError streamId "control window should be less than 2^31"
increaseConnectionWindowSize ctx n

control _ _ _ _ _ =
-- must not reach here
Expand Down Expand Up @@ -367,11 +366,10 @@ stream FrameHeaders header@FrameHeader{flags,streamId} bs ctx (Open (Body q _ _

-- ignore data-frame except for flow-control when we're done locally
stream FrameData
FrameHeader{flags,payloadLength}
FrameHeader{flags}
_bs
ctx s@(HalfClosedLocal _)
_ctx s@(HalfClosedLocal _)
_ = do
informConnectionWindowUpdate ctx payloadLength
let endOfStream = testEndStream flags
if endOfStream then do
return HalfClosedRemote
Expand All @@ -383,7 +381,6 @@ stream FrameData
bs
ctx@Context{emptyFrameRate} s@(Open (Body q mcl bodyLength _))
_ = do
informConnectionWindowUpdate ctx payloadLength
DataFrame body <- guardIt $ decodeDataFrame header bs
len0 <- readIORef bodyLength
let len = len0 + payloadLength
Expand Down Expand Up @@ -434,12 +431,9 @@ stream FrameContinuation FrameHeader{flags,streamId} frag ctx s@(Open (Continued
else
return $ Open $ Continued rfrags' siz' n' endOfStream

stream FrameWindowUpdate header@FrameHeader{streamId} bs _ s strm = do
stream FrameWindowUpdate header bs _ s strm = do
WindowUpdateFrame n <- guardIt $ decodeWindowUpdateFrame header bs
w <- increaseStreamWindowSize strm n
when (isWindowOverflow w) $ do
let msg = fromString ("window update for " ++ show streamId ++ " is overflow")
E.throwIO $ StreamErrorIsSent FlowControlError streamId msg
increaseStreamWindowSize strm n
return s

stream FrameRSTStream header@FrameHeader{streamId} bs ctx _ strm = do
Expand Down
65 changes: 36 additions & 29 deletions Network/HTTP2/Arch/Window.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
module Network.HTTP2.Arch.Window where

import Data.IORef
import qualified UnliftIO.Exception as E
import UnliftIO.STM

import Imports
Expand Down Expand Up @@ -34,19 +35,26 @@ waitConnectionWindowSize Context{txConnectionWindow} = do
----------------------------------------------------------------
-- Receiving window update

increaseStreamWindowSize :: Stream -> Int -> IO WindowSize
increaseStreamWindowSize Stream{streamWindow} n = atomically $ do
w0 <- readTVar streamWindow
let w1 = w0 + n
writeTVar streamWindow w1
return w1

increaseConnectionWindowSize :: Context -> Int -> IO WindowSize
increaseConnectionWindowSize Context{txConnectionWindow} n = atomically $ do
w0 <- readTVar txConnectionWindow
let w1 = w0 + n
writeTVar txConnectionWindow w1
return w1
increaseWindowSize :: StreamId -> TVar WindowSize -> WindowSize -> IO ()
increaseWindowSize sid tvar n = do
w <- atomically $ do
w0 <- readTVar tvar
let w1 = w0 + n
writeTVar tvar w1
return w1
when (isWindowOverflow w) $ do
let msg = fromString ("window update for stream " ++ show sid ++ " is overflow")
err = if isControl sid then ConnectionErrorIsSent
else StreamErrorIsSent
E.throwIO $ err FlowControlError sid msg

increaseStreamWindowSize :: Stream -> WindowSize -> IO ()
increaseStreamWindowSize Stream{streamNumber,streamWindow} n =
increaseWindowSize streamNumber streamWindow n

increaseConnectionWindowSize :: Context -> Int -> IO ()
increaseConnectionWindowSize Context{txConnectionWindow} n =
increaseWindowSize 0 txConnectionWindow n

decreaseWindowSize :: Context -> Stream -> WindowSize -> IO ()
decreaseWindowSize Context{txConnectionWindow} Stream{streamWindow} siz = do
Expand All @@ -56,22 +64,21 @@ decreaseWindowSize Context{txConnectionWindow} Stream{streamWindow} siz = do
----------------------------------------------------------------
-- Sending window update

informWindowUpdate :: TQueue Control -> StreamId -> IORef Int -> Int -> IO ()
informWindowUpdate _ _ _ 0 = return ()
informWindowUpdate controlQ sid incref len = do
-- incref is occupied by the receiver thread
w0 <- readIORef incref
let w1 = w0 + len
if w1 >= defaultWindowSize then do -- fixme
let frame = windowUpdateFrame sid w1
enqueueControl controlQ $ CFrames Nothing [frame]
writeIORef incref 0
else
writeIORef incref w1

informConnectionWindowUpdate :: Context -> Int -> IO ()
informConnectionWindowUpdate Context{..} =
informWindowUpdate controlQ 0 rxConnectionInc
informWindowUpdate :: Context -> Stream -> IORef Int -> Int -> IO ()
informWindowUpdate _ _ _ 0 = return ()
informWindowUpdate Context{controlQ,rxConnectionInc} Stream{streamNumber} streamInc len = do
join $ atomicModifyIORef rxConnectionInc $ modify 0
join $ atomicModifyIORef streamInc $ modify streamNumber
where
modify sid w0
| w1 < thresh = (w1, return ())
| otherwise = let frame = windowUpdateFrame sid w1
cframe = CFrames Nothing [frame]
action = enqueueControl controlQ cframe
in (0, action)
where
thresh = defaultWindowSize -- fixme
w1 = w0 + len

----------------------------------------------------------------

Expand Down

0 comments on commit 6eccd7f

Please sign in to comment.