Skip to content

Commit

Permalink
Merge pull request #70 from kazu-yamamoto/debug-msg
Browse files Browse the repository at this point in the history
debugging flow control
  • Loading branch information
kazu-yamamoto committed Apr 20, 2023
2 parents a41dd74 + 6eccd7f commit d20024d
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 69 deletions.
4 changes: 3 additions & 1 deletion Imports.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ module Imports (
, module Data.List
, module Data.Foldable
, module Data.Int
, module Data.Maybe
, module Data.Monoid
, module Data.Ord
, module Data.String
, module Data.Word
, module Data.Maybe
, module Numeric
, GCBuffer
, withForeignPtr
Expand All @@ -29,6 +30,7 @@ import Data.Int
import Data.List
import Data.Maybe
import Data.Monoid
import Data.String
import Data.Ord
import Data.Word
import Foreign.ForeignPtr
Expand Down
9 changes: 5 additions & 4 deletions Network/HTTP2/Arch/HPACK.hs
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,16 @@ hpackDecodeHeader hdrblk sid ctx = do
if isClient ctx || checkRequestHeader vt then
return tbl
else
E.throwIO $ StreamErrorIsSent ProtocolError sid
E.throwIO $ StreamErrorIsSent ProtocolError sid "illegal header"

hpackDecodeTrailer :: HeaderBlockFragment -> StreamId -> Context -> IO HeaderTable
hpackDecodeTrailer hdrblk sid Context{..} = decodeTokenHeader decodeDynamicTable hdrblk `E.catch` handl
where
handl IllegalHeaderName =
E.throwIO $ StreamErrorIsSent ProtocolError sid
handl _ =
E.throwIO $ StreamErrorIsSent CompressionError sid
E.throwIO $ StreamErrorIsSent ProtocolError sid "illegal trailer"
handl e = do
let msg = fromString $ show e
E.throwIO $ StreamErrorIsSent CompressionError sid msg

{-# INLINE checkRequestHeader #-}
checkRequestHeader :: ValueTable -> Bool
Expand Down
38 changes: 17 additions & 21 deletions Network/HTTP2/Arch/Receiver.hs
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,10 @@ frameReceiver ctx@Context{..} conf@Config{..} = loop 0 `E.catch` sendGoaway
let frame = goawayFrame sid err $ Short.fromShort msg
enqueueControl controlQ $ CFrames Nothing [frame]
enqueueControl controlQ $ CFinish e
| Just e@(StreamErrorIsSent err sid) <- E.fromException se = do
| Just e@(StreamErrorIsSent err sid msg) <- E.fromException se = do
let frame = resetFrame err sid
enqueueControl controlQ $ CFrames Nothing [frame]
let frame' = goawayFrame sid err "closing a connection after sending a stream error"
let frame' = goawayFrame sid err $ Short.fromShort msg
enqueueControl controlQ $ CFrames Nothing [frame']
enqueueControl controlQ $ CFinish e
| Just e@(StreamErrorIsReceived err sid) <- E.fromException se = do
Expand Down Expand Up @@ -174,7 +174,7 @@ controlOrStream ctx@Context{..} conf@Config{..} ftyp header@FrameHeader{streamId
processState :: StreamState -> Context -> Stream -> StreamId -> IO Bool
processState (Open (NoBody tbl@(_,reqvt))) ctx@Context{..} strm@Stream{streamInput} streamId = do
let mcl = fst <$> (getHeaderValue tokenContentLength reqvt >>= C8.readInt)
when (just mcl (/= (0 :: Int))) $ E.throwIO $ StreamErrorIsSent ProtocolError streamId
when (just mcl (/= (0 :: Int))) $ E.throwIO $ StreamErrorIsSent ProtocolError streamId "no body but content-length is not zero"
halfClosedRemote ctx strm
tlr <- newIORef Nothing
let inpObj = InpObj tbl (Just 0) (return "") tlr
Expand All @@ -184,14 +184,14 @@ processState (Open (NoBody tbl@(_,reqvt))) ctx@Context{..} strm@Stream{streamInp
else
putMVar streamInput inpObj
return False
processState (Open (HasBody tbl@(_,reqvt))) ctx@Context{..} strm@Stream{streamInput} streamId = do
processState (Open (HasBody tbl@(_,reqvt))) ctx@Context{..} strm@Stream{streamInput} _streamId = do
let mcl = fst <$> (getHeaderValue tokenContentLength reqvt >>= C8.readInt)
bodyLength <- newIORef 0
tlr <- newIORef Nothing
q <- newTQueueIO
setStreamState ctx strm $ Open (Body q mcl bodyLength tlr)
incref <- newIORef 0
bodySource <- mkSource q $ informWindowUpdate controlQ streamId incref
bodySource <- mkSource q $ informWindowUpdate ctx strm incref
let inpObj = InpObj tbl mcl (readSource bodySource) tlr
if isServer ctx then do
let si = toServerInfo roleInfo
Expand Down Expand Up @@ -245,8 +245,8 @@ getStream' ctx@Context{..} ftyp streamId Nothing
mMaxConc <- maxConcurrentStreams <$> readIORef mySettings
case mMaxConc of
Nothing -> return ()
Just maxConc -> when (cnt >= maxConc) $
E.throwIO $ StreamErrorIsSent RefusedStream streamId
Just maxConc -> when (cnt >= maxConc) $
E.throwIO $ StreamErrorIsSent RefusedStream streamId "exceeds max concurrent"
Just <$> openStream ctx streamId ftyp
| otherwise = undefined -- never reach

Expand Down Expand Up @@ -300,10 +300,9 @@ control FrameGoAway header bs _ _ = do
else
E.throwIO $ ConnectionErrorIsReceived err sid $ Short.toShort msg

control FrameWindowUpdate header@FrameHeader{streamId} bs ctx _ = do
control FrameWindowUpdate header bs ctx _ = do
WindowUpdateFrame n <- guardIt $ decodeWindowUpdateFrame header bs
w <- increaseConnectionWindowSize ctx n
when (isWindowOverflow w) $ E.throwIO $ ConnectionErrorIsSent FlowControlError streamId "control window should be less than 2^31"
increaseConnectionWindowSize ctx n

control _ _ _ _ _ =
-- must not reach here
Expand All @@ -321,7 +320,7 @@ guardIt x = case x of
{-# INLINE checkPriority #-}
checkPriority :: Priority -> StreamId -> IO ()
checkPriority p me
| dep == me = E.throwIO $ StreamErrorIsSent ProtocolError me
| dep == me = E.throwIO $ StreamErrorIsSent ProtocolError me "priority depends on itself"
| otherwise = return ()
where
dep = streamDependency p
Expand Down Expand Up @@ -367,11 +366,10 @@ stream FrameHeaders header@FrameHeader{flags,streamId} bs ctx (Open (Body q _ _

-- ignore data-frame except for flow-control when we're done locally
stream FrameData
FrameHeader{flags,payloadLength}
FrameHeader{flags}
_bs
ctx s@(HalfClosedLocal _)
_ctx s@(HalfClosedLocal _)
_ = do
informConnectionWindowUpdate ctx payloadLength
let endOfStream = testEndStream flags
if endOfStream then do
return HalfClosedRemote
Expand All @@ -383,7 +381,6 @@ stream FrameData
bs
ctx@Context{emptyFrameRate} s@(Open (Body q mcl bodyLength _))

Check warning on line 382 in Network/HTTP2/Arch/Receiver.hs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest, 8.10)

Defined but not used: ‘ctx’

Check warning on line 382 in Network/HTTP2/Arch/Receiver.hs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest, 8.10)

Defined but not used: ‘ctx’

Check warning on line 382 in Network/HTTP2/Arch/Receiver.hs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest, 9.0)

Defined but not used: ‘ctx’

Check warning on line 382 in Network/HTTP2/Arch/Receiver.hs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest, 9.0)

Defined but not used: ‘ctx’

Check warning on line 382 in Network/HTTP2/Arch/Receiver.hs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest, 9.2)

Defined but not used: ‘ctx’

Check warning on line 382 in Network/HTTP2/Arch/Receiver.hs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest, 9.2)

Defined but not used: ‘ctx’

Check warning on line 382 in Network/HTTP2/Arch/Receiver.hs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest, 9.4)

Defined but not used: ‘ctx’

Check warning on line 382 in Network/HTTP2/Arch/Receiver.hs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest, 9.4)

Defined but not used: ‘ctx’

Check warning on line 382 in Network/HTTP2/Arch/Receiver.hs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest, 9.6)

Defined but not used: ‘ctx’

Check warning on line 382 in Network/HTTP2/Arch/Receiver.hs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest, 9.6)

Defined but not used: ‘ctx’

Check warning on line 382 in Network/HTTP2/Arch/Receiver.hs

View workflow job for this annotation

GitHub Actions / build (macOS-latest, 8.10)

Defined but not used: ‘ctx’

Check warning on line 382 in Network/HTTP2/Arch/Receiver.hs

View workflow job for this annotation

GitHub Actions / build (macOS-latest, 9.0)

Defined but not used: ‘ctx’

Check warning on line 382 in Network/HTTP2/Arch/Receiver.hs

View workflow job for this annotation

GitHub Actions / build (macOS-latest, 9.2)

Defined but not used: ‘ctx’

Check warning on line 382 in Network/HTTP2/Arch/Receiver.hs

View workflow job for this annotation

GitHub Actions / build (macOS-latest, 9.4)

Defined but not used: ‘ctx’

Check warning on line 382 in Network/HTTP2/Arch/Receiver.hs

View workflow job for this annotation

GitHub Actions / build (macOS-latest, 9.6)

Defined but not used: ‘ctx’

Check warning on line 382 in Network/HTTP2/Arch/Receiver.hs

View workflow job for this annotation

GitHub Actions / build (windows-latest, 8.10)

Defined but not used: ‘ctx’

Check warning on line 382 in Network/HTTP2/Arch/Receiver.hs

View workflow job for this annotation

GitHub Actions / build (windows-latest, 9.0)

Defined but not used: ‘ctx’

Check warning on line 382 in Network/HTTP2/Arch/Receiver.hs

View workflow job for this annotation

GitHub Actions / build (windows-latest, 9.2)

Defined but not used: ‘ctx’

Check warning on line 382 in Network/HTTP2/Arch/Receiver.hs

View workflow job for this annotation

GitHub Actions / build (windows-latest, 9.4)

Defined but not used: ‘ctx’

Check warning on line 382 in Network/HTTP2/Arch/Receiver.hs

View workflow job for this annotation

GitHub Actions / build (windows-latest, 9.6)

Defined but not used: ‘ctx’
_ = do
informConnectionWindowUpdate ctx payloadLength
DataFrame body <- guardIt $ decodeDataFrame header bs
len0 <- readIORef bodyLength
let len = len0 + payloadLength
Expand All @@ -400,7 +397,7 @@ stream FrameData
if endOfStream then do
case mcl of
Nothing -> return ()
Just cl -> when (cl /= len) $ E.throwIO $ StreamErrorIsSent ProtocolError streamId
Just cl -> when (cl /= len) $ E.throwIO $ StreamErrorIsSent ProtocolError streamId "actual body length is not the same as content-length"
-- no trailers
atomically $ writeTQueue q ""
return HalfClosedRemote
Expand Down Expand Up @@ -434,10 +431,9 @@ stream FrameContinuation FrameHeader{flags,streamId} frag ctx s@(Open (Continued
else
return $ Open $ Continued rfrags' siz' n' endOfStream

stream FrameWindowUpdate header@FrameHeader{streamId} bs _ s strm = do
stream FrameWindowUpdate header bs _ s strm = do
WindowUpdateFrame n <- guardIt $ decodeWindowUpdateFrame header bs
w <- increaseStreamWindowSize strm n
when (isWindowOverflow w) $ E.throwIO $ StreamErrorIsSent FlowControlError streamId
increaseStreamWindowSize strm n
return s

stream FrameRSTStream header@FrameHeader{streamId} bs ctx _ strm = do
Expand All @@ -458,8 +454,8 @@ stream FrameContinuation FrameHeader{streamId} _ _ _ _ = E.throwIO $ ConnectionE
stream _ FrameHeader{streamId} _ _ (Open Continued{}) _ = E.throwIO $ ConnectionErrorIsSent ProtocolError streamId "an illegal frame follows header/continuation frames"
-- Ignore frames to streams we have just reset, per section 5.1.
stream _ _ _ _ st@(Closed (ResetByMe _)) _ = return st
stream FrameData FrameHeader{streamId} _ _ _ _ = E.throwIO $ StreamErrorIsSent StreamClosed streamId
stream _ FrameHeader{streamId} _ _ _ _ = E.throwIO $ StreamErrorIsSent ProtocolError streamId
stream FrameData FrameHeader{streamId} _ _ _ _ = E.throwIO $ StreamErrorIsSent StreamClosed streamId $ fromString ("illegal data frame for " ++ show streamId)
stream _ FrameHeader{streamId} _ _ _ _ = E.throwIO $ StreamErrorIsSent ProtocolError streamId $ fromString ("illegal frame for " ++ show streamId)

----------------------------------------------------------------

Expand Down
2 changes: 1 addition & 1 deletion Network/HTTP2/Arch/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ data HTTP2Error =
| ConnectionErrorIsReceived ErrorCode StreamId ReasonPhrase
| ConnectionErrorIsSent ErrorCode StreamId ReasonPhrase
| StreamErrorIsReceived ErrorCode StreamId
| StreamErrorIsSent ErrorCode StreamId
| StreamErrorIsSent ErrorCode StreamId ReasonPhrase
| BadThingHappen E.SomeException
deriving (Show, Typeable)

Expand Down
91 changes: 49 additions & 42 deletions Network/HTTP2/Arch/Window.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
module Network.HTTP2.Arch.Window where

import Data.IORef
import qualified UnliftIO.Exception as E
import UnliftIO.STM

import Imports
Expand All @@ -15,48 +16,6 @@ import Network.HTTP2.Arch.Stream
import Network.HTTP2.Arch.Types
import Network.HTTP2.Frame

----------------------------------------------------------------
-- Receiving window update

increaseStreamWindowSize :: Stream -> Int -> IO WindowSize
increaseStreamWindowSize Stream{streamWindow} n = atomically $ do
w0 <- readTVar streamWindow
let w1 = w0 + n
writeTVar streamWindow w1
return w1

increaseConnectionWindowSize :: Context -> Int -> IO WindowSize
increaseConnectionWindowSize Context{txConnectionWindow} n = atomically $ do
w0 <- readTVar txConnectionWindow
let w1 = w0 + n
writeTVar txConnectionWindow w1
return w1

----------------------------------------------------------------
-- Sending window update

decreaseWindowSize :: Context -> Stream -> WindowSize -> IO ()
decreaseWindowSize Context{txConnectionWindow} Stream{streamWindow} siz = do
atomically $ modifyTVar' txConnectionWindow (subtract siz)
atomically $ modifyTVar' streamWindow (subtract siz)

informWindowUpdate :: TQueue Control -> StreamId -> IORef Int -> Int -> IO ()
informWindowUpdate _ _ _ 0 = return ()
informWindowUpdate controlQ sid incref len = do
-- incref is occupied by the receiver thread
w0 <- readIORef incref
let w1 = w0 + len
if w1 >= defaultWindowSize then do -- fixme
let frame = windowUpdateFrame sid w1
enqueueControl controlQ $ CFrames Nothing [frame]
writeIORef incref 0
else
writeIORef incref w1

informConnectionWindowUpdate :: Context -> Int -> IO ()
informConnectionWindowUpdate Context{..} =
informWindowUpdate controlQ 0 rxConnectionInc

getStreamWindowSize :: Stream -> IO WindowSize
getStreamWindowSize Stream{streamWindow} = readTVarIO streamWindow

Expand All @@ -73,6 +32,54 @@ waitConnectionWindowSize Context{txConnectionWindow} = do
w <- readTVar txConnectionWindow
checkSTM (w > 0)

----------------------------------------------------------------
-- Receiving window update

increaseWindowSize :: StreamId -> TVar WindowSize -> WindowSize -> IO ()
increaseWindowSize sid tvar n = do
w <- atomically $ do
w0 <- readTVar tvar
let w1 = w0 + n
writeTVar tvar w1
return w1
when (isWindowOverflow w) $ do
let msg = fromString ("window update for stream " ++ show sid ++ " is overflow")
err = if isControl sid then ConnectionErrorIsSent
else StreamErrorIsSent
E.throwIO $ err FlowControlError sid msg

increaseStreamWindowSize :: Stream -> WindowSize -> IO ()
increaseStreamWindowSize Stream{streamNumber,streamWindow} n =
increaseWindowSize streamNumber streamWindow n

increaseConnectionWindowSize :: Context -> Int -> IO ()
increaseConnectionWindowSize Context{txConnectionWindow} n =
increaseWindowSize 0 txConnectionWindow n

decreaseWindowSize :: Context -> Stream -> WindowSize -> IO ()
decreaseWindowSize Context{txConnectionWindow} Stream{streamWindow} siz = do
atomically $ modifyTVar' txConnectionWindow (subtract siz)
atomically $ modifyTVar' streamWindow (subtract siz)

----------------------------------------------------------------
-- Sending window update

informWindowUpdate :: Context -> Stream -> IORef Int -> Int -> IO ()
informWindowUpdate _ _ _ 0 = return ()
informWindowUpdate Context{controlQ,rxConnectionInc} Stream{streamNumber} streamInc len = do
join $ atomicModifyIORef rxConnectionInc $ modify 0
join $ atomicModifyIORef streamInc $ modify streamNumber
where
modify sid w0
| w1 < thresh = (w1, return ())
| otherwise = let frame = windowUpdateFrame sid w1
cframe = CFrames Nothing [frame]
action = enqueueControl controlQ cframe
in (0, action)
where
thresh = defaultWindowSize -- fixme
w1 = w0 + len

----------------------------------------------------------------

-- max: 2,147,483,647 (2^31-1) is too large.
Expand Down

0 comments on commit d20024d

Please sign in to comment.