diff --git a/Imports.hs b/Imports.hs index ddbf0217..7ec1ced1 100644 --- a/Imports.hs +++ b/Imports.hs @@ -8,10 +8,11 @@ module Imports ( , module Data.List , module Data.Foldable , module Data.Int + , module Data.Maybe , module Data.Monoid , module Data.Ord + , module Data.String , module Data.Word - , module Data.Maybe , module Numeric , GCBuffer , withForeignPtr @@ -29,6 +30,7 @@ import Data.Int import Data.List import Data.Maybe import Data.Monoid +import Data.String import Data.Ord import Data.Word import Foreign.ForeignPtr diff --git a/Network/HTTP2/Arch/HPACK.hs b/Network/HTTP2/Arch/HPACK.hs index 89d725cf..023cf31a 100644 --- a/Network/HTTP2/Arch/HPACK.hs +++ b/Network/HTTP2/Arch/HPACK.hs @@ -69,15 +69,16 @@ hpackDecodeHeader hdrblk sid ctx = do if isClient ctx || checkRequestHeader vt then return tbl else - E.throwIO $ StreamErrorIsSent ProtocolError sid + E.throwIO $ StreamErrorIsSent ProtocolError sid "illegal header" hpackDecodeTrailer :: HeaderBlockFragment -> StreamId -> Context -> IO HeaderTable hpackDecodeTrailer hdrblk sid Context{..} = decodeTokenHeader decodeDynamicTable hdrblk `E.catch` handl where handl IllegalHeaderName = - E.throwIO $ StreamErrorIsSent ProtocolError sid - handl _ = - E.throwIO $ StreamErrorIsSent CompressionError sid + E.throwIO $ StreamErrorIsSent ProtocolError sid "illegal trailer" + handl e = do + let msg = fromString $ show e + E.throwIO $ StreamErrorIsSent CompressionError sid msg {-# INLINE checkRequestHeader #-} checkRequestHeader :: ValueTable -> Bool diff --git a/Network/HTTP2/Arch/Receiver.hs b/Network/HTTP2/Arch/Receiver.hs index cd994263..d867776a 100644 --- a/Network/HTTP2/Arch/Receiver.hs +++ b/Network/HTTP2/Arch/Receiver.hs @@ -73,10 +73,10 @@ frameReceiver ctx@Context{..} conf@Config{..} = loop 0 `E.catch` sendGoaway let frame = goawayFrame sid err $ Short.fromShort msg enqueueControl controlQ $ CFrames Nothing [frame] enqueueControl controlQ $ CFinish e - | Just e@(StreamErrorIsSent err sid) <- E.fromException se = do + | Just e@(StreamErrorIsSent err sid msg) <- E.fromException se = do let frame = resetFrame err sid enqueueControl controlQ $ CFrames Nothing [frame] - let frame' = goawayFrame sid err "closing a connection after sending a stream error" + let frame' = goawayFrame sid err $ Short.fromShort msg enqueueControl controlQ $ CFrames Nothing [frame'] enqueueControl controlQ $ CFinish e | Just e@(StreamErrorIsReceived err sid) <- E.fromException se = do @@ -174,7 +174,7 @@ controlOrStream ctx@Context{..} conf@Config{..} ftyp header@FrameHeader{streamId processState :: StreamState -> Context -> Stream -> StreamId -> IO Bool processState (Open (NoBody tbl@(_,reqvt))) ctx@Context{..} strm@Stream{streamInput} streamId = do let mcl = fst <$> (getHeaderValue tokenContentLength reqvt >>= C8.readInt) - when (just mcl (/= (0 :: Int))) $ E.throwIO $ StreamErrorIsSent ProtocolError streamId + when (just mcl (/= (0 :: Int))) $ E.throwIO $ StreamErrorIsSent ProtocolError streamId "no body but content-length is not zero" halfClosedRemote ctx strm tlr <- newIORef Nothing let inpObj = InpObj tbl (Just 0) (return "") tlr @@ -184,14 +184,14 @@ processState (Open (NoBody tbl@(_,reqvt))) ctx@Context{..} strm@Stream{streamInp else putMVar streamInput inpObj return False -processState (Open (HasBody tbl@(_,reqvt))) ctx@Context{..} strm@Stream{streamInput} streamId = do +processState (Open (HasBody tbl@(_,reqvt))) ctx@Context{..} strm@Stream{streamInput} _streamId = do let mcl = fst <$> (getHeaderValue tokenContentLength reqvt >>= C8.readInt) bodyLength <- newIORef 0 tlr <- newIORef Nothing q <- newTQueueIO setStreamState ctx strm $ Open (Body q mcl bodyLength tlr) incref <- newIORef 0 - bodySource <- mkSource q $ informWindowUpdate controlQ streamId incref + bodySource <- mkSource q $ informWindowUpdate ctx strm incref let inpObj = InpObj tbl mcl (readSource bodySource) tlr if isServer ctx then do let si = toServerInfo roleInfo @@ -245,8 +245,8 @@ getStream' ctx@Context{..} ftyp streamId Nothing mMaxConc <- maxConcurrentStreams <$> readIORef mySettings case mMaxConc of Nothing -> return () - Just maxConc -> when (cnt >= maxConc) $ - E.throwIO $ StreamErrorIsSent RefusedStream streamId + Just maxConc -> when (cnt >= maxConc) $ + E.throwIO $ StreamErrorIsSent RefusedStream streamId "exceeds max concurrent" Just <$> openStream ctx streamId ftyp | otherwise = undefined -- never reach @@ -300,10 +300,9 @@ control FrameGoAway header bs _ _ = do else E.throwIO $ ConnectionErrorIsReceived err sid $ Short.toShort msg -control FrameWindowUpdate header@FrameHeader{streamId} bs ctx _ = do +control FrameWindowUpdate header bs ctx _ = do WindowUpdateFrame n <- guardIt $ decodeWindowUpdateFrame header bs - w <- increaseConnectionWindowSize ctx n - when (isWindowOverflow w) $ E.throwIO $ ConnectionErrorIsSent FlowControlError streamId "control window should be less than 2^31" + increaseConnectionWindowSize ctx n control _ _ _ _ _ = -- must not reach here @@ -321,7 +320,7 @@ guardIt x = case x of {-# INLINE checkPriority #-} checkPriority :: Priority -> StreamId -> IO () checkPriority p me - | dep == me = E.throwIO $ StreamErrorIsSent ProtocolError me + | dep == me = E.throwIO $ StreamErrorIsSent ProtocolError me "priority depends on itself" | otherwise = return () where dep = streamDependency p @@ -367,11 +366,10 @@ stream FrameHeaders header@FrameHeader{flags,streamId} bs ctx (Open (Body q _ _ -- ignore data-frame except for flow-control when we're done locally stream FrameData - FrameHeader{flags,payloadLength} + FrameHeader{flags} _bs - ctx s@(HalfClosedLocal _) + _ctx s@(HalfClosedLocal _) _ = do - informConnectionWindowUpdate ctx payloadLength let endOfStream = testEndStream flags if endOfStream then do return HalfClosedRemote @@ -383,7 +381,6 @@ stream FrameData bs ctx@Context{emptyFrameRate} s@(Open (Body q mcl bodyLength _)) _ = do - informConnectionWindowUpdate ctx payloadLength DataFrame body <- guardIt $ decodeDataFrame header bs len0 <- readIORef bodyLength let len = len0 + payloadLength @@ -400,7 +397,7 @@ stream FrameData if endOfStream then do case mcl of Nothing -> return () - Just cl -> when (cl /= len) $ E.throwIO $ StreamErrorIsSent ProtocolError streamId + Just cl -> when (cl /= len) $ E.throwIO $ StreamErrorIsSent ProtocolError streamId "actual body length is not the same as content-length" -- no trailers atomically $ writeTQueue q "" return HalfClosedRemote @@ -434,10 +431,9 @@ stream FrameContinuation FrameHeader{flags,streamId} frag ctx s@(Open (Continued else return $ Open $ Continued rfrags' siz' n' endOfStream -stream FrameWindowUpdate header@FrameHeader{streamId} bs _ s strm = do +stream FrameWindowUpdate header bs _ s strm = do WindowUpdateFrame n <- guardIt $ decodeWindowUpdateFrame header bs - w <- increaseStreamWindowSize strm n - when (isWindowOverflow w) $ E.throwIO $ StreamErrorIsSent FlowControlError streamId + increaseStreamWindowSize strm n return s stream FrameRSTStream header@FrameHeader{streamId} bs ctx _ strm = do @@ -458,8 +454,8 @@ stream FrameContinuation FrameHeader{streamId} _ _ _ _ = E.throwIO $ ConnectionE stream _ FrameHeader{streamId} _ _ (Open Continued{}) _ = E.throwIO $ ConnectionErrorIsSent ProtocolError streamId "an illegal frame follows header/continuation frames" -- Ignore frames to streams we have just reset, per section 5.1. stream _ _ _ _ st@(Closed (ResetByMe _)) _ = return st -stream FrameData FrameHeader{streamId} _ _ _ _ = E.throwIO $ StreamErrorIsSent StreamClosed streamId -stream _ FrameHeader{streamId} _ _ _ _ = E.throwIO $ StreamErrorIsSent ProtocolError streamId +stream FrameData FrameHeader{streamId} _ _ _ _ = E.throwIO $ StreamErrorIsSent StreamClosed streamId $ fromString ("illegal data frame for " ++ show streamId) +stream _ FrameHeader{streamId} _ _ _ _ = E.throwIO $ StreamErrorIsSent ProtocolError streamId $ fromString ("illegal frame for " ++ show streamId) ---------------------------------------------------------------- diff --git a/Network/HTTP2/Arch/Types.hs b/Network/HTTP2/Arch/Types.hs index 121dcfe3..727495d6 100644 --- a/Network/HTTP2/Arch/Types.hs +++ b/Network/HTTP2/Arch/Types.hs @@ -212,7 +212,7 @@ data HTTP2Error = | ConnectionErrorIsReceived ErrorCode StreamId ReasonPhrase | ConnectionErrorIsSent ErrorCode StreamId ReasonPhrase | StreamErrorIsReceived ErrorCode StreamId - | StreamErrorIsSent ErrorCode StreamId + | StreamErrorIsSent ErrorCode StreamId ReasonPhrase | BadThingHappen E.SomeException deriving (Show, Typeable) diff --git a/Network/HTTP2/Arch/Window.hs b/Network/HTTP2/Arch/Window.hs index 6dabf088..04689d0e 100644 --- a/Network/HTTP2/Arch/Window.hs +++ b/Network/HTTP2/Arch/Window.hs @@ -4,6 +4,7 @@ module Network.HTTP2.Arch.Window where import Data.IORef +import qualified UnliftIO.Exception as E import UnliftIO.STM import Imports @@ -15,48 +16,6 @@ import Network.HTTP2.Arch.Stream import Network.HTTP2.Arch.Types import Network.HTTP2.Frame ----------------------------------------------------------------- --- Receiving window update - -increaseStreamWindowSize :: Stream -> Int -> IO WindowSize -increaseStreamWindowSize Stream{streamWindow} n = atomically $ do - w0 <- readTVar streamWindow - let w1 = w0 + n - writeTVar streamWindow w1 - return w1 - -increaseConnectionWindowSize :: Context -> Int -> IO WindowSize -increaseConnectionWindowSize Context{txConnectionWindow} n = atomically $ do - w0 <- readTVar txConnectionWindow - let w1 = w0 + n - writeTVar txConnectionWindow w1 - return w1 - ----------------------------------------------------------------- --- Sending window update - -decreaseWindowSize :: Context -> Stream -> WindowSize -> IO () -decreaseWindowSize Context{txConnectionWindow} Stream{streamWindow} siz = do - atomically $ modifyTVar' txConnectionWindow (subtract siz) - atomically $ modifyTVar' streamWindow (subtract siz) - -informWindowUpdate :: TQueue Control -> StreamId -> IORef Int -> Int -> IO () -informWindowUpdate _ _ _ 0 = return () -informWindowUpdate controlQ sid incref len = do - -- incref is occupied by the receiver thread - w0 <- readIORef incref - let w1 = w0 + len - if w1 >= defaultWindowSize then do -- fixme - let frame = windowUpdateFrame sid w1 - enqueueControl controlQ $ CFrames Nothing [frame] - writeIORef incref 0 - else - writeIORef incref w1 - -informConnectionWindowUpdate :: Context -> Int -> IO () -informConnectionWindowUpdate Context{..} = - informWindowUpdate controlQ 0 rxConnectionInc - getStreamWindowSize :: Stream -> IO WindowSize getStreamWindowSize Stream{streamWindow} = readTVarIO streamWindow @@ -73,6 +32,54 @@ waitConnectionWindowSize Context{txConnectionWindow} = do w <- readTVar txConnectionWindow checkSTM (w > 0) +---------------------------------------------------------------- +-- Receiving window update + +increaseWindowSize :: StreamId -> TVar WindowSize -> WindowSize -> IO () +increaseWindowSize sid tvar n = do + w <- atomically $ do + w0 <- readTVar tvar + let w1 = w0 + n + writeTVar tvar w1 + return w1 + when (isWindowOverflow w) $ do + let msg = fromString ("window update for stream " ++ show sid ++ " is overflow") + err = if isControl sid then ConnectionErrorIsSent + else StreamErrorIsSent + E.throwIO $ err FlowControlError sid msg + +increaseStreamWindowSize :: Stream -> WindowSize -> IO () +increaseStreamWindowSize Stream{streamNumber,streamWindow} n = + increaseWindowSize streamNumber streamWindow n + +increaseConnectionWindowSize :: Context -> Int -> IO () +increaseConnectionWindowSize Context{txConnectionWindow} n = + increaseWindowSize 0 txConnectionWindow n + +decreaseWindowSize :: Context -> Stream -> WindowSize -> IO () +decreaseWindowSize Context{txConnectionWindow} Stream{streamWindow} siz = do + atomically $ modifyTVar' txConnectionWindow (subtract siz) + atomically $ modifyTVar' streamWindow (subtract siz) + +---------------------------------------------------------------- +-- Sending window update + +informWindowUpdate :: Context -> Stream -> IORef Int -> Int -> IO () +informWindowUpdate _ _ _ 0 = return () +informWindowUpdate Context{controlQ,rxConnectionInc} Stream{streamNumber} streamInc len = do + join $ atomicModifyIORef rxConnectionInc $ modify 0 + join $ atomicModifyIORef streamInc $ modify streamNumber + where + modify sid w0 + | w1 < thresh = (w1, return ()) + | otherwise = let frame = windowUpdateFrame sid w1 + cframe = CFrames Nothing [frame] + action = enqueueControl controlQ cframe + in (0, action) + where + thresh = defaultWindowSize -- fixme + w1 = w0 + len + ---------------------------------------------------------------- -- max: 2,147,483,647 (2^31-1) is too large.