From 2eff8e0099d5fd039f459077f983057ec6fcefbf Mon Sep 17 00:00:00 2001 From: Kazu Yamamoto Date: Wed, 12 Apr 2023 11:57:37 +0900 Subject: [PATCH 1/3] style only --- Network/HTTP2/Arch/Window.hs | 38 ++++++++++++++++++------------------ 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/Network/HTTP2/Arch/Window.hs b/Network/HTTP2/Arch/Window.hs index 6dabf088..dcb5f0ad 100644 --- a/Network/HTTP2/Arch/Window.hs +++ b/Network/HTTP2/Arch/Window.hs @@ -15,6 +15,22 @@ import Network.HTTP2.Arch.Stream import Network.HTTP2.Arch.Types import Network.HTTP2.Frame +getStreamWindowSize :: Stream -> IO WindowSize +getStreamWindowSize Stream{streamWindow} = readTVarIO streamWindow + +getConnectionWindowSize :: Context -> IO WindowSize +getConnectionWindowSize Context{txConnectionWindow} = readTVarIO txConnectionWindow + +waitStreamWindowSize :: Stream -> IO () +waitStreamWindowSize Stream{streamWindow} = atomically $ do + w <- readTVar streamWindow + checkSTM (w > 0) + +waitConnectionWindowSize :: Context -> STM () +waitConnectionWindowSize Context{txConnectionWindow} = do + w <- readTVar txConnectionWindow + checkSTM (w > 0) + ---------------------------------------------------------------- -- Receiving window update @@ -32,14 +48,14 @@ increaseConnectionWindowSize Context{txConnectionWindow} n = atomically $ do writeTVar txConnectionWindow w1 return w1 ----------------------------------------------------------------- --- Sending window update - decreaseWindowSize :: Context -> Stream -> WindowSize -> IO () decreaseWindowSize Context{txConnectionWindow} Stream{streamWindow} siz = do atomically $ modifyTVar' txConnectionWindow (subtract siz) atomically $ modifyTVar' streamWindow (subtract siz) +---------------------------------------------------------------- +-- Sending window update + informWindowUpdate :: TQueue Control -> StreamId -> IORef Int -> Int -> IO () informWindowUpdate _ _ _ 0 = return () informWindowUpdate controlQ sid incref len = do @@ -57,22 +73,6 @@ informConnectionWindowUpdate :: Context -> Int -> IO () informConnectionWindowUpdate Context{..} = informWindowUpdate controlQ 0 rxConnectionInc -getStreamWindowSize :: Stream -> IO WindowSize -getStreamWindowSize Stream{streamWindow} = readTVarIO streamWindow - -getConnectionWindowSize :: Context -> IO WindowSize -getConnectionWindowSize Context{txConnectionWindow} = readTVarIO txConnectionWindow - -waitStreamWindowSize :: Stream -> IO () -waitStreamWindowSize Stream{streamWindow} = atomically $ do - w <- readTVar streamWindow - checkSTM (w > 0) - -waitConnectionWindowSize :: Context -> STM () -waitConnectionWindowSize Context{txConnectionWindow} = do - w <- readTVar txConnectionWindow - checkSTM (w > 0) - ---------------------------------------------------------------- -- max: 2,147,483,647 (2^31-1) is too large. From 3e615ccf24fb740a8c8edf4dc138716e952a8021 Mon Sep 17 00:00:00 2001 From: Kazu Yamamoto Date: Wed, 12 Apr 2023 11:57:54 +0900 Subject: [PATCH 2/3] inform debug message for stream error --- Imports.hs | 4 +++- Network/HTTP2/Arch/HPACK.hs | 9 +++++---- Network/HTTP2/Arch/Receiver.hs | 22 ++++++++++++---------- Network/HTTP2/Arch/Types.hs | 2 +- 4 files changed, 21 insertions(+), 16 deletions(-) diff --git a/Imports.hs b/Imports.hs index ddbf0217..7ec1ced1 100644 --- a/Imports.hs +++ b/Imports.hs @@ -8,10 +8,11 @@ module Imports ( , module Data.List , module Data.Foldable , module Data.Int + , module Data.Maybe , module Data.Monoid , module Data.Ord + , module Data.String , module Data.Word - , module Data.Maybe , module Numeric , GCBuffer , withForeignPtr @@ -29,6 +30,7 @@ import Data.Int import Data.List import Data.Maybe import Data.Monoid +import Data.String import Data.Ord import Data.Word import Foreign.ForeignPtr diff --git a/Network/HTTP2/Arch/HPACK.hs b/Network/HTTP2/Arch/HPACK.hs index 89d725cf..023cf31a 100644 --- a/Network/HTTP2/Arch/HPACK.hs +++ b/Network/HTTP2/Arch/HPACK.hs @@ -69,15 +69,16 @@ hpackDecodeHeader hdrblk sid ctx = do if isClient ctx || checkRequestHeader vt then return tbl else - E.throwIO $ StreamErrorIsSent ProtocolError sid + E.throwIO $ StreamErrorIsSent ProtocolError sid "illegal header" hpackDecodeTrailer :: HeaderBlockFragment -> StreamId -> Context -> IO HeaderTable hpackDecodeTrailer hdrblk sid Context{..} = decodeTokenHeader decodeDynamicTable hdrblk `E.catch` handl where handl IllegalHeaderName = - E.throwIO $ StreamErrorIsSent ProtocolError sid - handl _ = - E.throwIO $ StreamErrorIsSent CompressionError sid + E.throwIO $ StreamErrorIsSent ProtocolError sid "illegal trailer" + handl e = do + let msg = fromString $ show e + E.throwIO $ StreamErrorIsSent CompressionError sid msg {-# INLINE checkRequestHeader #-} checkRequestHeader :: ValueTable -> Bool diff --git a/Network/HTTP2/Arch/Receiver.hs b/Network/HTTP2/Arch/Receiver.hs index cd994263..d6695c03 100644 --- a/Network/HTTP2/Arch/Receiver.hs +++ b/Network/HTTP2/Arch/Receiver.hs @@ -73,10 +73,10 @@ frameReceiver ctx@Context{..} conf@Config{..} = loop 0 `E.catch` sendGoaway let frame = goawayFrame sid err $ Short.fromShort msg enqueueControl controlQ $ CFrames Nothing [frame] enqueueControl controlQ $ CFinish e - | Just e@(StreamErrorIsSent err sid) <- E.fromException se = do + | Just e@(StreamErrorIsSent err sid msg) <- E.fromException se = do let frame = resetFrame err sid enqueueControl controlQ $ CFrames Nothing [frame] - let frame' = goawayFrame sid err "closing a connection after sending a stream error" + let frame' = goawayFrame sid err $ Short.fromShort msg enqueueControl controlQ $ CFrames Nothing [frame'] enqueueControl controlQ $ CFinish e | Just e@(StreamErrorIsReceived err sid) <- E.fromException se = do @@ -174,7 +174,7 @@ controlOrStream ctx@Context{..} conf@Config{..} ftyp header@FrameHeader{streamId processState :: StreamState -> Context -> Stream -> StreamId -> IO Bool processState (Open (NoBody tbl@(_,reqvt))) ctx@Context{..} strm@Stream{streamInput} streamId = do let mcl = fst <$> (getHeaderValue tokenContentLength reqvt >>= C8.readInt) - when (just mcl (/= (0 :: Int))) $ E.throwIO $ StreamErrorIsSent ProtocolError streamId + when (just mcl (/= (0 :: Int))) $ E.throwIO $ StreamErrorIsSent ProtocolError streamId "no body but content-length is not zero" halfClosedRemote ctx strm tlr <- newIORef Nothing let inpObj = InpObj tbl (Just 0) (return "") tlr @@ -245,8 +245,8 @@ getStream' ctx@Context{..} ftyp streamId Nothing mMaxConc <- maxConcurrentStreams <$> readIORef mySettings case mMaxConc of Nothing -> return () - Just maxConc -> when (cnt >= maxConc) $ - E.throwIO $ StreamErrorIsSent RefusedStream streamId + Just maxConc -> when (cnt >= maxConc) $ + E.throwIO $ StreamErrorIsSent RefusedStream streamId "exceeds max concurrent" Just <$> openStream ctx streamId ftyp | otherwise = undefined -- never reach @@ -321,7 +321,7 @@ guardIt x = case x of {-# INLINE checkPriority #-} checkPriority :: Priority -> StreamId -> IO () checkPriority p me - | dep == me = E.throwIO $ StreamErrorIsSent ProtocolError me + | dep == me = E.throwIO $ StreamErrorIsSent ProtocolError me "priority depends on itself" | otherwise = return () where dep = streamDependency p @@ -400,7 +400,7 @@ stream FrameData if endOfStream then do case mcl of Nothing -> return () - Just cl -> when (cl /= len) $ E.throwIO $ StreamErrorIsSent ProtocolError streamId + Just cl -> when (cl /= len) $ E.throwIO $ StreamErrorIsSent ProtocolError streamId "actual body length is not the same as content-length" -- no trailers atomically $ writeTQueue q "" return HalfClosedRemote @@ -437,7 +437,9 @@ stream FrameContinuation FrameHeader{flags,streamId} frag ctx s@(Open (Continued stream FrameWindowUpdate header@FrameHeader{streamId} bs _ s strm = do WindowUpdateFrame n <- guardIt $ decodeWindowUpdateFrame header bs w <- increaseStreamWindowSize strm n - when (isWindowOverflow w) $ E.throwIO $ StreamErrorIsSent FlowControlError streamId + when (isWindowOverflow w) $ do + let msg = fromString ("window update for " ++ show streamId ++ " is overflow") + E.throwIO $ StreamErrorIsSent FlowControlError streamId msg return s stream FrameRSTStream header@FrameHeader{streamId} bs ctx _ strm = do @@ -458,8 +460,8 @@ stream FrameContinuation FrameHeader{streamId} _ _ _ _ = E.throwIO $ ConnectionE stream _ FrameHeader{streamId} _ _ (Open Continued{}) _ = E.throwIO $ ConnectionErrorIsSent ProtocolError streamId "an illegal frame follows header/continuation frames" -- Ignore frames to streams we have just reset, per section 5.1. stream _ _ _ _ st@(Closed (ResetByMe _)) _ = return st -stream FrameData FrameHeader{streamId} _ _ _ _ = E.throwIO $ StreamErrorIsSent StreamClosed streamId -stream _ FrameHeader{streamId} _ _ _ _ = E.throwIO $ StreamErrorIsSent ProtocolError streamId +stream FrameData FrameHeader{streamId} _ _ _ _ = E.throwIO $ StreamErrorIsSent StreamClosed streamId $ fromString ("illegal data frame for " ++ show streamId) +stream _ FrameHeader{streamId} _ _ _ _ = E.throwIO $ StreamErrorIsSent ProtocolError streamId $ fromString ("illegal frame for " ++ show streamId) ---------------------------------------------------------------- diff --git a/Network/HTTP2/Arch/Types.hs b/Network/HTTP2/Arch/Types.hs index 121dcfe3..727495d6 100644 --- a/Network/HTTP2/Arch/Types.hs +++ b/Network/HTTP2/Arch/Types.hs @@ -212,7 +212,7 @@ data HTTP2Error = | ConnectionErrorIsReceived ErrorCode StreamId ReasonPhrase | ConnectionErrorIsSent ErrorCode StreamId ReasonPhrase | StreamErrorIsReceived ErrorCode StreamId - | StreamErrorIsSent ErrorCode StreamId + | StreamErrorIsSent ErrorCode StreamId ReasonPhrase | BadThingHappen E.SomeException deriving (Show, Typeable) From 6eccd7f54923134a81498975daac74d61a469970 Mon Sep 17 00:00:00 2001 From: Kazu Yamamoto Date: Wed, 12 Apr 2023 16:19:46 +0900 Subject: [PATCH 3/3] refactoring window update window update 0 is sent only when stream's bodies are comsumed --- Network/HTTP2/Arch/Receiver.hs | 22 +++++------- Network/HTTP2/Arch/Window.hs | 65 +++++++++++++++++++--------------- 2 files changed, 44 insertions(+), 43 deletions(-) diff --git a/Network/HTTP2/Arch/Receiver.hs b/Network/HTTP2/Arch/Receiver.hs index d6695c03..d867776a 100644 --- a/Network/HTTP2/Arch/Receiver.hs +++ b/Network/HTTP2/Arch/Receiver.hs @@ -184,14 +184,14 @@ processState (Open (NoBody tbl@(_,reqvt))) ctx@Context{..} strm@Stream{streamInp else putMVar streamInput inpObj return False -processState (Open (HasBody tbl@(_,reqvt))) ctx@Context{..} strm@Stream{streamInput} streamId = do +processState (Open (HasBody tbl@(_,reqvt))) ctx@Context{..} strm@Stream{streamInput} _streamId = do let mcl = fst <$> (getHeaderValue tokenContentLength reqvt >>= C8.readInt) bodyLength <- newIORef 0 tlr <- newIORef Nothing q <- newTQueueIO setStreamState ctx strm $ Open (Body q mcl bodyLength tlr) incref <- newIORef 0 - bodySource <- mkSource q $ informWindowUpdate controlQ streamId incref + bodySource <- mkSource q $ informWindowUpdate ctx strm incref let inpObj = InpObj tbl mcl (readSource bodySource) tlr if isServer ctx then do let si = toServerInfo roleInfo @@ -300,10 +300,9 @@ control FrameGoAway header bs _ _ = do else E.throwIO $ ConnectionErrorIsReceived err sid $ Short.toShort msg -control FrameWindowUpdate header@FrameHeader{streamId} bs ctx _ = do +control FrameWindowUpdate header bs ctx _ = do WindowUpdateFrame n <- guardIt $ decodeWindowUpdateFrame header bs - w <- increaseConnectionWindowSize ctx n - when (isWindowOverflow w) $ E.throwIO $ ConnectionErrorIsSent FlowControlError streamId "control window should be less than 2^31" + increaseConnectionWindowSize ctx n control _ _ _ _ _ = -- must not reach here @@ -367,11 +366,10 @@ stream FrameHeaders header@FrameHeader{flags,streamId} bs ctx (Open (Body q _ _ -- ignore data-frame except for flow-control when we're done locally stream FrameData - FrameHeader{flags,payloadLength} + FrameHeader{flags} _bs - ctx s@(HalfClosedLocal _) + _ctx s@(HalfClosedLocal _) _ = do - informConnectionWindowUpdate ctx payloadLength let endOfStream = testEndStream flags if endOfStream then do return HalfClosedRemote @@ -383,7 +381,6 @@ stream FrameData bs ctx@Context{emptyFrameRate} s@(Open (Body q mcl bodyLength _)) _ = do - informConnectionWindowUpdate ctx payloadLength DataFrame body <- guardIt $ decodeDataFrame header bs len0 <- readIORef bodyLength let len = len0 + payloadLength @@ -434,12 +431,9 @@ stream FrameContinuation FrameHeader{flags,streamId} frag ctx s@(Open (Continued else return $ Open $ Continued rfrags' siz' n' endOfStream -stream FrameWindowUpdate header@FrameHeader{streamId} bs _ s strm = do +stream FrameWindowUpdate header bs _ s strm = do WindowUpdateFrame n <- guardIt $ decodeWindowUpdateFrame header bs - w <- increaseStreamWindowSize strm n - when (isWindowOverflow w) $ do - let msg = fromString ("window update for " ++ show streamId ++ " is overflow") - E.throwIO $ StreamErrorIsSent FlowControlError streamId msg + increaseStreamWindowSize strm n return s stream FrameRSTStream header@FrameHeader{streamId} bs ctx _ strm = do diff --git a/Network/HTTP2/Arch/Window.hs b/Network/HTTP2/Arch/Window.hs index dcb5f0ad..04689d0e 100644 --- a/Network/HTTP2/Arch/Window.hs +++ b/Network/HTTP2/Arch/Window.hs @@ -4,6 +4,7 @@ module Network.HTTP2.Arch.Window where import Data.IORef +import qualified UnliftIO.Exception as E import UnliftIO.STM import Imports @@ -34,19 +35,26 @@ waitConnectionWindowSize Context{txConnectionWindow} = do ---------------------------------------------------------------- -- Receiving window update -increaseStreamWindowSize :: Stream -> Int -> IO WindowSize -increaseStreamWindowSize Stream{streamWindow} n = atomically $ do - w0 <- readTVar streamWindow - let w1 = w0 + n - writeTVar streamWindow w1 - return w1 - -increaseConnectionWindowSize :: Context -> Int -> IO WindowSize -increaseConnectionWindowSize Context{txConnectionWindow} n = atomically $ do - w0 <- readTVar txConnectionWindow - let w1 = w0 + n - writeTVar txConnectionWindow w1 - return w1 +increaseWindowSize :: StreamId -> TVar WindowSize -> WindowSize -> IO () +increaseWindowSize sid tvar n = do + w <- atomically $ do + w0 <- readTVar tvar + let w1 = w0 + n + writeTVar tvar w1 + return w1 + when (isWindowOverflow w) $ do + let msg = fromString ("window update for stream " ++ show sid ++ " is overflow") + err = if isControl sid then ConnectionErrorIsSent + else StreamErrorIsSent + E.throwIO $ err FlowControlError sid msg + +increaseStreamWindowSize :: Stream -> WindowSize -> IO () +increaseStreamWindowSize Stream{streamNumber,streamWindow} n = + increaseWindowSize streamNumber streamWindow n + +increaseConnectionWindowSize :: Context -> Int -> IO () +increaseConnectionWindowSize Context{txConnectionWindow} n = + increaseWindowSize 0 txConnectionWindow n decreaseWindowSize :: Context -> Stream -> WindowSize -> IO () decreaseWindowSize Context{txConnectionWindow} Stream{streamWindow} siz = do @@ -56,22 +64,21 @@ decreaseWindowSize Context{txConnectionWindow} Stream{streamWindow} siz = do ---------------------------------------------------------------- -- Sending window update -informWindowUpdate :: TQueue Control -> StreamId -> IORef Int -> Int -> IO () -informWindowUpdate _ _ _ 0 = return () -informWindowUpdate controlQ sid incref len = do - -- incref is occupied by the receiver thread - w0 <- readIORef incref - let w1 = w0 + len - if w1 >= defaultWindowSize then do -- fixme - let frame = windowUpdateFrame sid w1 - enqueueControl controlQ $ CFrames Nothing [frame] - writeIORef incref 0 - else - writeIORef incref w1 - -informConnectionWindowUpdate :: Context -> Int -> IO () -informConnectionWindowUpdate Context{..} = - informWindowUpdate controlQ 0 rxConnectionInc +informWindowUpdate :: Context -> Stream -> IORef Int -> Int -> IO () +informWindowUpdate _ _ _ 0 = return () +informWindowUpdate Context{controlQ,rxConnectionInc} Stream{streamNumber} streamInc len = do + join $ atomicModifyIORef rxConnectionInc $ modify 0 + join $ atomicModifyIORef streamInc $ modify streamNumber + where + modify sid w0 + | w1 < thresh = (w1, return ()) + | otherwise = let frame = windowUpdateFrame sid w1 + cframe = CFrames Nothing [frame] + action = enqueueControl controlQ cframe + in (0, action) + where + thresh = defaultWindowSize -- fixme + w1 = w0 + len ----------------------------------------------------------------