Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Comparing changes

Choose two branches to see what's changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
base fork: kolmodin/spdy
base: f194d756cf
...
head fork: kolmodin/spdy
compare: 5c8285c216
Checking mergeability… Don't worry, you can still create the pull request.
  • 2 commits
  • 5 files changed
  • 0 commit comments
  • 1 contributor
Commits on Mar 11, 2012
@kolmodin Feed data frames (request body) into the Request.
The Request data type has a Source for the request body.
Create a Source with a channel linked to it, and feed the channel when we
get data frames.
5ce9d4f
@kolmodin Switch to using hashmap for storing stream states. 5c8285c
View
2  Network/SPDY.hs
@@ -184,7 +184,7 @@ onSynStreamFrame state sId pri nvh = do
putStrLn "Constructed frame:"
print ("syn_reply", sId, nvh')
return (SynReplyControlFrame 0 sId nvhReply) :: IO Frame
- enqueueFrame state $ return $ DataFrame sId 1 $ S.concat ("<html><h1>hello from spdy</h1><br/>" : S.concat ([ C8.pack (show b ++ "<br/>") | b <- nvh ]) : "</html>" : [])
+ enqueueFrame state $ return $ DataFrame 1 sId $ S.concat ("<html><h1>hello from spdy</h1><br/>" : S.concat ([ C8.pack (show b ++ "<br/>") | b <- nvh ]) : "</html>" : [])
where
utf8 (s,t) = (decodeUtf8 s, decodeUtf8 t)
View
2  Network/SPDY/Frame.hs
@@ -27,8 +27,8 @@ ourSPDYVersion = 2
data Frame
= DataFrame {
- dataFrameStreamID :: Word32,
dataFrameFlags :: Word8,
+ dataFrameStreamID :: Word32,
dataFramePayload :: B.ByteString }
| SynStreamControlFrame {
controlFrameFlags :: Word8,
View
100 Network/Wai/Handler/Hope.hs
@@ -1,4 +1,4 @@
-{-# LANGUAGE RecordWildCards, OverloadedStrings, DeriveDataTypeable #-}
+{-# LANGUAGE RecordWildCards, OverloadedStrings, DeriveDataTypeable, NamedFieldPuns #-}
module Network.Wai.Handler.Hope where
import Network.Wai
@@ -24,6 +24,8 @@ import qualified Data.ByteString.Char8 as C8 ( pack ) -- Also IsString instance
import qualified Data.ByteString as S
import qualified Data.ByteString.Lazy as L
+import Data.Bits
+
import Network.Socket hiding ( recv, Closed )
import Control.Applicative
@@ -31,6 +33,8 @@ import Control.Concurrent
import Control.Concurrent.STM.TVar
import Control.Monad.STM
+import Control.Monad ( when )
+
import Control.Exception ( Exception, throwIO, Handler(..), catches )
import Data.Typeable
@@ -56,6 +60,9 @@ import qualified Data.Certificate.X509 as X509
import qualified Data.Certificate.PEM as PEM
import qualified Data.Certificate.KeyRSA as KeyRSA
+import Data.HashMap.Strict ( HashMap )
+import qualified Data.HashMap.Strict as Map
+
import Prelude hiding ( catch )
readCertificate :: FilePath -> IO X509.X509
@@ -127,7 +134,7 @@ type FrameHandler = SessionState -> Frame -> IO SessionState
data SessionState = SessionState
{ sessionStateSendQueue :: TVar [IO Frame] -- TODO(kolmodin): use a priority queue
- , sessionStateStreamStates :: [StreamState]
+ , sessionStateStreamStates :: HashMap Word32 StreamState
, sessionStateNVHReceiveZContext :: Inflate
, sessionStateNVHSendZContext :: Deflate
, sessionStateNextValidSendID :: Word32
@@ -138,6 +145,7 @@ data StreamState = StreamState
{ streamStateID :: Word32
, streamStatePriority :: Word8
, streamStateReplyThread :: ThreadId
+ , streamStateBodyChan :: Maybe (Chan (Maybe S.ByteString))
}
data SPDYException
@@ -152,19 +160,34 @@ initSession = do
queue <- newTVarIO []
zInflate <- liftIO $ initInflateWithDictionary defaultWindowBits nvhDictionary
zDeflate <- liftIO $ initDeflateWithDictionary 6 nvhDictionary defaultWindowBits
- return $ SessionState queue [] zInflate zDeflate 1 2
+ return $ SessionState queue Map.empty zInflate zDeflate 1 2
frameHandler :: Application -> SockAddr -> FrameHandler
frameHandler app sockaddr state frame = do
print frame
case frame of
SynStreamControlFrame flags sId assId pri nvh -> do
- state' <- createStream app sockaddr state sId pri nvh
+ state' <- createStream app sockaddr state flags sId pri nvh
return state'
RstStreamControlFrame flags sId status -> do
putStrLn "RstStream... we're screwed."
-- TODO: remove all knowledge of this stream. empty send buffer.
return state
+ DataFrame flags sId payload -> do
+ let flag_fin = testBit flags 0
+ case getStreamState state sId of
+ Nothing -> do sendRstStream state sId 2 -- 2 == INVALID_STREAM
+ return state
+ Just s -> do
+ let bodyChan = streamStateBodyChan s
+ case bodyChan of
+ Nothing -> do sendRstStream state sId 2 -- which error code?
+ return state
+ Just chan -> do writeChan chan (Just payload)
+ let s' | flag_fin = s { streamStateBodyChan = Nothing }
+ | otherwise = s
+ state' = updateStreamState state s'
+ return state'
PingControlFrame pingId -> do
enqueueFrame state $ return (PingControlFrame pingId)
return state
@@ -175,14 +198,27 @@ frameHandler app sockaddr state frame = do
NoopControlFrame -> do
return state
+getStreamState :: SessionState -> Word32 -> Maybe StreamState
+getStreamState state sId = Map.lookup sId (sessionStateStreamStates state)
+
+updateStreamState :: SessionState -> StreamState -> SessionState
+updateStreamState state stream =
+ let streamStates = sessionStateStreamStates state
+ in state { sessionStateStreamStates = Map.adjust (const stream) (streamStateID stream) streamStates }
+
+insertStreamState :: SessionState -> StreamState -> SessionState
+insertStreamState state stream =
+ let streamStates = sessionStateStreamStates state
+ in state { sessionStateStreamStates = Map.insert (streamStateID stream) stream streamStates }
+
enqueueFrame :: SessionState -> IO Frame -> IO ()
enqueueFrame SessionState { sessionStateSendQueue = queue } frame =
atomically $ do
q <- readTVar queue
writeTVar queue (q ++ [frame])
-createStream :: Application -> SockAddr -> SessionState -> Word32 -> Word8 -> S.ByteString -> IO SessionState
-createStream app sockaddr state@(SessionState { sessionStateNVHReceiveZContext = zInflate }) sId pri nvhBytes = do
+createStream :: Application -> SockAddr -> SessionState -> Word8 -> Word32 -> Word8 -> S.ByteString -> IO SessionState
+createStream app sockaddr state@(SessionState { sessionStateNVHReceiveZContext = zInflate }) flags sId pri nvhBytes = do
putStrLn $ "Creating stream context, id = " ++ show sId
nvhChunks <- do a <- withInflateInput zInflate nvhBytes popper
b <- flushInflate zInflate
@@ -192,9 +228,9 @@ createStream app sockaddr state@(SessionState { sessionStateNVHReceiveZContext =
Fail _ _ msg -> throwIO (SPDYNVHException Nothing msg)
Partial _ -> throwIO (SPDYNVHException Nothing "Could not parse NVH block, returned Partial.")
print (sId, pri, nvh)
- tId <- onSynStreamFrame app sockaddr state sId pri nvh
- let streamState = StreamState sId pri tId
- return state { sessionStateStreamStates = streamState : sessionStateStreamStates state }
+ (tId, bodyChan) <- onSynStreamFrame app sockaddr state flags sId pri nvh
+ let streamState = StreamState sId pri tId bodyChan
+ return (insertStreamState state streamState)
where
feedAll r [] = r
feedAll r (x:xs) = r `feed` x `feedAll` xs
@@ -212,16 +248,20 @@ sendGoAway :: SessionState -> Word32 -> IO ()
sendGoAway state sId = do
enqueueFrame state $ return $ GoAwayFrame 0 sId
-sendRstStream :: SessionState -> Word8 -> Word32 -> Word32 -> IO ()
-sendRstStream state flags sId status = do
- enqueueFrame state $ return $ RstStreamControlFrame flags sId status
+sendRstStream :: SessionState -> Word32 -> Word32 -> IO ()
+sendRstStream state sId status = do
+ enqueueFrame state $ return $ RstStreamControlFrame 0 sId status
-onSynStreamFrame :: Application -> SockAddr -> SessionState -> Word32 -> Word8 -> NameValueHeaderBlock -> IO ThreadId
-onSynStreamFrame app sockaddr state sId pri nvh = do
- req <- case buildReq sockaddr nvh of -- catch errors, return protocol_error on stream
+onSynStreamFrame :: Application -> SockAddr -> SessionState -> Word8 -> Word32 -> Word8 -> NameValueHeaderBlock -> IO (ThreadId, Maybe (Chan (Maybe S.ByteString)))
+onSynStreamFrame app sockaddr state flags sId pri nvh = do
+ (bodySource,bodyChan) <- mkChanSource
+ req <- case buildReq sockaddr bodySource nvh of -- catch errors, return protocol_error on stream
Right req -> return req
Left err -> throwIO (SPDYNVHException (Just sId) err)
- forkIO $ runResourceT $ do
+ let flag_fin = testBit flags 0 -- other side said no more frames from their side
+ when flag_fin $ do
+ writeChan bodyChan Nothing
+ tId <- forkIO $ runResourceT $ do
resp <- app req
let (status, responseHeaders, source) = responseSource resp
headerStatus = ("status", showStatus status)
@@ -238,10 +278,11 @@ onSynStreamFrame app sockaddr state sId pri nvh = do
print ("syn_reply" :: String, sId, nvh')
return (SynReplyControlFrame 0 sId nvhReply) :: IO Frame
source $$ enqueueFrameSink
+ return (tId, if flag_fin then Nothing else Just bodyChan)
where
utf8 (s,t) = (decodeUtf8 s, decodeUtf8 t)
showStatus (Status statusCode statusMessage) = S.concat [C8.pack (show statusCode), " ", statusMessage]
- mkDataFrame = DataFrame sId 0
+ mkDataFrame = DataFrame 0 sId
enqueueFrameSink =
sinkState
()
@@ -250,10 +291,10 @@ onSynStreamFrame app sockaddr state sId pri nvh = do
(Chunk inpBuilder) -> liftIO $ enqueueFrame state $ return $ mkDataFrame (toByteString inpBuilder)
Flush -> return ()
return (StateProcessing ()))
- (\_ -> liftIO $ enqueueFrame state $ return $ DataFrame sId 1 "")
+ (\_ -> liftIO $ enqueueFrame state $ return $ DataFrame 1 sId "")
-buildReq :: SockAddr -> NameValueHeaderBlock -> Either String Request
-buildReq sockaddr nvh = do
+buildReq :: SockAddr -> Source IO S.ByteString -> NameValueHeaderBlock -> Either String Request
+buildReq sockaddr bodySource nvh = do
method <- case lookup (decodeUtf8 "method") nvh of
Just m -> return m
Nothing -> Left "no method in NVH block"
@@ -292,7 +333,7 @@ buildReq sockaddr nvh = do
, isSecure = True
, remoteHost = sockaddr
, queryString = H.parseQuery (encodeUtf8 query)
- , requestBody = sourceState () (\_ -> return StateClosed)
+ , requestBody = bodySource
, vault = V.empty
}
where
@@ -303,6 +344,21 @@ buildReq sockaddr nvh = do
A.endOfInput
return $ HttpVersion x y)
+mkChanSource :: ResourceIO m => IO (Source m S.ByteString, Chan (Maybe S.ByteString))
+mkChanSource = do
+ chan <- newChan
+ return (chan2source chan, chan)
+
+chan2source :: ResourceIO m => Chan (Maybe S.ByteString) -> Source m S.ByteString
+chan2source chan =
+ sourceIO
+ (return ())
+ (\_ -> return ())
+ (\_ -> do v <- liftIO $ readChan chan
+ case v of
+ Nothing -> return IOClosed
+ Just bs -> return (IOOpen bs))
+
sender :: TLSCtx a -> TVar [IO Frame] -> IO ()
sender tlsctx queue = go
where
@@ -334,7 +390,7 @@ sessionHandler handler tlsctx sockaddr = do
SPDYParseException str -> do putStrLn ("Caught this! " ++ show e)
sendGoAway initS 0
SPDYNVHException (Just sId) str -> do putStrLn ("Caught this! " ++ show e)
- sendRstStream initS 0 sId 1
+ sendRstStream initS sId 1
SPDYNVHException Nothing str -> do putStrLn ("Caught this! " ++ show e)
sendGoAway initS 0)
, Handler (\e ->
View
2  spdy.cabal
@@ -36,6 +36,6 @@ executable app
bytestring, stm, conduit, zlib-bindings, transformers, tls == 0.9.*, tls-extra,
crypto-api, certificate, wai, http-types, vault,
blaze-builder, case-insensitive,
- wai-app-static, warp, attoparsec
+ wai-app-static, warp, attoparsec, unordered-containers
ghc-options: -threaded -Wall
View
5 tests/QC.hs
@@ -92,7 +92,7 @@ arbitraryDataFrame = do
sId <- arbitraryWord31be
flags <- arbitrary
payload <- genPayload
- return (DataFrame sId flags payload)
+ return (DataFrame flags sId payload)
arbitrarySynStreamFrame :: Gen Frame
arbitrarySynStreamFrame = do
@@ -112,10 +112,9 @@ arbitrarySynReplyStreamFrame = do
arbitraryRstStreamFrame :: Gen Frame
arbitraryRstStreamFrame = do
- flags <- arbitrary
sId <- arbitraryWord31be
status <- arbitrary
- return (RstStreamControlFrame flags sId status)
+ return (RstStreamControlFrame 0 sId status)
arbitraryGoAwayFrame :: Gen Frame
arbitraryGoAwayFrame = do

No commit comments for this range

Something went wrong with that request. Please try again.