Skip to content
Browse files

New conduit 1.0 changes

  • Loading branch information...
1 parent 187e086 commit eaf3aabe2389e90cc71392e77b2b81ce657e31a7 @snoyberg snoyberg committed Feb 13, 2013
View
12 wai-eventsource/src/Network/Wai/EventSource.hs
@@ -1,4 +1,5 @@
{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE RankNTypes #-}
{-|
A WAI adapter to the HTML5 Server-Sent Events API.
-}
@@ -25,20 +26,21 @@ import Network.Wai.EventSource.EventStream
eventSourceAppChan :: Chan ServerEvent -> Application
eventSourceAppChan chan _ = do
chan' <- liftIO $ dupChan chan
- return $ response chanToSource chan'
+ return $ response $ chanToSource chan'
-- | Make a new WAI EventSource application reading events from
-- the given source.
eventSourceAppSource :: Source (ResourceT IO) ServerEvent -> Application
-eventSourceAppSource src _ = return $ response sourceToSource src
+eventSourceAppSource src _ = return $ response $ sourceToSource src
-- | Make a new WAI EventSource application reading events from
-- the given IO action.
eventSourceAppIO :: IO ServerEvent -> Application
-eventSourceAppIO act _ = return $ response ioToSource act
+eventSourceAppIO act _ = return $ response $ ioToSource act
-response :: (a -> Source (ResourceT IO) (Flush Builder)) -> a -> Response
-response f a = ResponseSource status200 [("Content-Type", "text/event-stream")] $ f a
+response :: Source (ResourceT IO) (Flush Builder)
+ -> Response
+response = ResponseSource status200 [("Content-Type", "text/event-stream")]
chanToSource :: Chan ServerEvent -> Source (ResourceT IO) (Flush Builder)
chanToSource = ioToSource . readChan
View
18 wai-extra/Network/Wai/Middleware/Jsonp.hs
@@ -18,14 +18,14 @@ module Network.Wai.Middleware.Jsonp (jsonp) where
import Network.Wai
import Data.ByteString (ByteString)
import qualified Data.ByteString.Char8 as B8
-import Blaze.ByteString.Builder (copyByteString)
+import Blaze.ByteString.Builder (Builder, copyByteString)
import Blaze.ByteString.Builder.Char8 (fromChar)
import Data.Monoid (mappend)
import Control.Monad (join)
import Data.Maybe (fromMaybe)
import qualified Data.ByteString as S
import qualified Data.Conduit as C
-import qualified Data.Conduit.List as CL
+import Network.HTTP.Types (Status, ResponseHeaders)
-- | Wrap json responses in a jsonp callback.
--
@@ -78,11 +78,17 @@ jsonp app env = do
_ -> Nothing
fixHeaders = changeVal "Content-Type" "text/javascript"
+ addCallback :: Monad m
+ => ByteString
+ -> Status
+ -> ResponseHeaders
+ -> C.Source (C.ResourceT IO) (C.Flush Builder)
+ -> m Response
addCallback cb s hs b =
- return $ ResponseSource s hs $
- CL.sourceList [C.Chunk $ copyByteString cb `mappend` fromChar '(']
- `mappend` b
- `mappend` CL.sourceList [C.Chunk $ fromChar ')']
+ return $ ResponseSource s hs $ do
+ C.yield (C.Chunk $ copyByteString cb `mappend` fromChar '(')
+ b
+ C.yield (C.Chunk $ fromChar ')')
changeVal :: Eq a
=> a
View
20 wai-extra/Network/Wai/Parse.hs
@@ -3,6 +3,7 @@
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE PatternGuards #-}
{-# LANGUAGE TypeFamilies #-}
+{-# LANGUAGE RankNTypes #-}
-- | Some helpers for parsing data out of a raw WAI 'Request'.
module Network.Wai.Parse
@@ -52,7 +53,7 @@ import Control.Monad (when, unless)
import Control.Monad.Trans.Class (lift)
import Control.Monad.Trans.Resource (allocate, release, register)
#if MIN_VERSION_conduit(1, 0, 0)
-import Data.Conduit.Internal (Pipe (NeedInput, HaveOutput), (>+>), withUpstream, Sink (..), injectLeftovers, streamFromPipe, ConduitM)
+import Data.Conduit.Internal (Pipe (NeedInput, HaveOutput), (>+>), withUpstream, Sink (..), injectLeftovers, ConduitM (..))
import Data.Void (Void)
#endif
@@ -190,7 +191,7 @@ conduitRequestBody backend (Multipart bound) =
parsePieces backend $ S8.pack "--" `S.append` bound
#if MIN_VERSION_conduit(1, 0, 0)
-takeLine :: MonadSink S.ByteString m (Maybe S.ByteString)
+takeLine :: Monad m => Sink S.ByteString m (Maybe S.ByteString)
#else
takeLine :: Monad m => Pipe S.ByteString S.ByteString o u m (Maybe S.ByteString)
#endif
@@ -209,7 +210,7 @@ takeLine =
return $ Just $ killCR x
#if MIN_VERSION_conduit(1, 0, 0)
-takeLines :: MonadSink S.ByteString (ResourceT IO) [S.ByteString]
+takeLines :: Sink S.ByteString (ResourceT IO) [S.ByteString]
#else
takeLines :: Pipe S.ByteString S.ByteString o u (ResourceT IO) [S.ByteString]
#endif
@@ -311,13 +312,13 @@ sinkTillBound' :: S.ByteString
#endif
sinkTillBound' bound name fi sink =
#if MIN_VERSION_conduit(1, 0, 0)
- streamFromPipe $ anyOutput $
+ ConduitM $ anyOutput $
#endif
conduitTillBound bound >+> withUpstream (fix $ sink name fi)
where
#if MIN_VERSION_conduit(1, 0, 0)
fix :: Sink S8.ByteString (ResourceT IO) y -> Pipe Void S8.ByteString Void Bool (ResourceT IO) y
- fix (Sink p) = ignoreTerm >+> injectLeftovers p
+ fix (ConduitM p) = ignoreTerm >+> injectLeftovers p
ignoreTerm = await' >>= maybe (return ()) (\x -> yield' x >> ignoreTerm)
await' = NeedInput (return . Just) (const $ return Nothing)
yield' = HaveOutput (return ()) (return ())
@@ -336,6 +337,9 @@ conduitTillBound :: Monad m
-> Pipe S.ByteString S.ByteString S.ByteString u m Bool
#endif
conduitTillBound bound =
+#if MIN_VERSION_conduit(1, 0, 0)
+ unConduitM $
+#endif
go id
where
go front = await >>= maybe (close front) (push front)
@@ -366,19 +370,19 @@ sinkTillBound :: S.ByteString
-> (x -> S.ByteString -> IO x)
-> x
#if MIN_VERSION_conduit(1, 0, 0)
- -> MonadSink S.ByteString (ResourceT IO) (Bool, x)
+ -> Sink S.ByteString (ResourceT IO) (Bool, x)
#else
-> Pipe S.ByteString S.ByteString o u (ResourceT IO) (Bool, x)
#endif
sinkTillBound bound iter seed0 =
#if MIN_VERSION_conduit(1, 0, 0)
- streamFromPipe $
+ ConduitM $
#endif
(conduitTillBound bound >+> (withUpstream $ ij $ CL.foldM iter' seed0))
where
iter' a b = liftIO $ iter a b
#if MIN_VERSION_conduit(1, 0, 0)
- ij p = ignoreTerm >+> injectLeftovers p
+ ij (ConduitM p) = ignoreTerm >+> injectLeftovers p
ignoreTerm = await' >>= maybe (return ()) (\x -> yield' x >> ignoreTerm)
await' = NeedInput (return . Just) (const $ return Nothing)
yield' = HaveOutput (return ()) (return ())
View
8 wai-handler-launch/Network/Wai/Handler/Launch.hs
@@ -48,12 +48,10 @@ ping var app req
let (s, hs, body) = responseSource res
let (isEnc, headers') = fixHeaders id hs
let headers'' = filter (\(x, _) -> x /= "content-length") headers'
- let fixEnc src =
- if isEnc then
- src $= decompressFlush (WindowBits 31)
- else src
return $ ResponseSource s headers''
- $ fixEnc (body $= builderToByteStringFlush)
+ $ (if isEnc
+ then body $= builderToByteStringFlush $= decompressFlush (WindowBits 31)
+ else body $= builderToByteStringFlush)
$= insideHead
$= CL.map (fmap fromByteString)
View
2 wai-websockets/Network/Wai/Handler/WebSockets.hs
@@ -1,4 +1,6 @@
{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE RankNTypes #-}
+{-# LANGUAGE ImpredicativeTypes #-}
module Network.Wai.Handler.WebSockets
( intercept
, interceptWith
View
1 wai/Network/Wai.hs
@@ -1,6 +1,7 @@
{-# LANGUAGE Rank2Types #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE DeriveDataTypeable #-}
+{-# LANGUAGE ImpredicativeTypes #-}
{-|
This module defines a generic web application interface. It is a common
View
9 warp-tls/Network/Wai/Handler/WarpTLS.hs
@@ -1,5 +1,6 @@
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
+{-# LANGUAGE CPP #-}
module Network.Wai.Handler.WarpTLS
( TLSSettings (..)
, runTLS
@@ -54,7 +55,13 @@ runTLSSocket tset set sock app = do
(fromClient, firstBS) <- sourceSocket s C.$$+ CL.peek
let toClient = sinkSocket s
ifromClient <- I.newIORef fromClient
+#if MIN_VERSION_conduit(1, 0, 0)
+ let getNext :: C.Sink B.ByteString IO B.ByteString
+ -> IO B.ByteString
+ getNext sink = do
+#else
let getNext sink = do
+#endif
fromClient' <- I.readIORef ifromClient
(fromClient'', bs) <- fromClient' C.$$++ sink
I.writeIORef ifromClient fromClient''
@@ -67,7 +74,7 @@ runTLSSocket tset set sock app = do
{ TLS.backendFlush = return ()
, TLS.backendClose = return ()
, TLS.backendSend = \bs -> C.yield bs C.$$ toClient
- , TLS.backendRecv = getNext . takeMost
+ , TLS.backendRecv = \x -> getNext $ takeMost x
}
params
gen
View
28 warp/Network/Wai/Handler/Warp/Conduit.hs
@@ -1,3 +1,5 @@
+{-# LANGUAGE RankNTypes #-}
+{-# LANGUAGE CPP #-}
module Network.Wai.Handler.Warp.Conduit where
import Control.Applicative
@@ -16,6 +18,9 @@ import qualified Data.Conduit.List as CL
import qualified Data.IORef as I
import Data.Word (Word, Word8)
import Network.Wai.Handler.Warp.Types
+#if MIN_VERSION_conduit(1, 0, 0)
+import Data.Conduit.Internal (ConduitM)
+#endif
----------------------------------------------------------------
@@ -82,22 +87,31 @@ data ChunkState = NeedLen
bsCRLF :: L.ByteString
bsCRLF = pack "\r\n"
-chunkedSource :: MonadIO m
- => I.IORef (ResumableSource m ByteString, ChunkState)
- -> Source m ByteString
+chunkedSource :: I.IORef (ResumableSource (ResourceT IO) ByteString, ChunkState)
+ -> Source (ResourceT IO) ByteString
chunkedSource ipair = do
(src, mlen) <- liftIO $ I.readIORef ipair
go src mlen
where
+#if MIN_VERSION_conduit(1, 0, 0)
+ go' :: ResumableSource (ResourceT IO) ByteString
+ -> (Sink ByteString (ResourceT IO) (Word, ByteString) -> Sink ByteString (ResourceT IO) (Word, ByteString))
+ -> ConduitM k ByteString (ResourceT IO) ()
+#endif
go' src front = do
(src', (len, bs)) <- lift $ src $$++ front getLen
let src''
| S.null bs = src'
| otherwise = fmapResume (yield bs >>) src'
go src'' $ HaveLen len
- go src NeedLen = go' src id
- go src NeedLenNewline = go' src (CB.take 2 >>)
+#if MIN_VERSION_conduit(1, 0, 0)
+ go :: ResumableSource (ResourceT IO) ByteString
+ -> ChunkState
+ -> Source (ResourceT IO) ByteString
+#endif
+ go src NeedLen = go' src (\x -> x)
+ go src NeedLenNewline = go' src (\x -> CB.take 2 >> x)
go src (HaveLen 0) = do
-- Drop the final CRLF
(src', ()) <- lift $ src $$++ do
@@ -156,5 +170,9 @@ isHexDigit w = w >= 48 && w <= 57
----------------------------------------------------------------
+#if MIN_VERSION_conduit(1, 0, 0)
+fmapResume :: (ConduitM () o1 m () -> ConduitM () o2 m ()) -> ResumableSource m o1 -> ResumableSource m o2
+#else
fmapResume :: (Source m o1 -> Source m o2) -> ResumableSource m o1 -> ResumableSource m o2
+#endif
fmapResume f (ResumableSource src m) = ResumableSource (f src) m
View
12 warp/Network/Wai/Handler/Warp/Request.hs
@@ -1,5 +1,7 @@
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE RankNTypes #-}
+{-# LANGUAGE CPP #-}
module Network.Wai.Handler.Warp.Request where
@@ -24,6 +26,9 @@ import Network.Wai.Handler.Warp.Conduit
import Network.Wai.Handler.Warp.ReadInt
import Network.Wai.Handler.Warp.Types
import Prelude hiding (lines)
+#if MIN_VERSION_conduit(1, 0, 0)
+import qualified Data.Conduit.Internal as CI
+#endif
-- FIXME come up with good values here
maxTotalHeaderLength :: Int
@@ -96,7 +101,12 @@ parseRequest' conn port (firstLine:otherLines) remoteHost' src = do
, requestHeaders = heads
, isSecure = False
, remoteHost = remoteHost'
- , requestBody = rbody
+ , requestBody =
+#if MIN_VERSION_conduit(1, 0, 0)
+ CI.ConduitM $ CI.pipeL (return ()) $ CI.unConduitM rbody
+#else
+ rbody
+#endif
, vault = mempty
}, getSource)
View
9 warp/Network/Wai/Handler/Warp/Response.hs
@@ -15,9 +15,6 @@ import Data.ByteString (ByteString)
import qualified Data.ByteString.Char8 as B (pack)
import qualified Data.CaseInsensitive as CI
import Data.Conduit
-#if MIN_VERSION_conduit(1, 0, 0)
-import Data.Conduit.Internal (mapOutput, SourceM (..))
-#endif
import Data.Conduit.Blaze (builderToByteString)
import qualified Data.Conduit.List as CL
import Data.Maybe (isJust)
@@ -111,15 +108,9 @@ sendResponse cleaner req conn (ResponseSource s hs bodyFlush)
where
th = threadHandle cleaner
body =
-#if MIN_VERSION_conduit(1, 0, 0)
- SourceM $
-#endif
mapOutput (\x -> case x of
Flush -> flush
Chunk builder -> builder)
-#if MIN_VERSION_conduit(1, 0, 0)
- $ unSourceM
-#endif
bodyFlush
cbody = if needsChunked then body $= chunk else body
-- FIXME perhaps alloca a buffer per thread and reuse that in all
View
20 warp/Network/Wai/Handler/Warp/Run.hs
@@ -1,5 +1,6 @@
{-# LANGUAGE CPP #-}
{-# LANGUAGE ScopedTypeVariables #-}
+{-# LANGUAGE RankNTypes #-}
module Network.Wai.Handler.Warp.Run where
@@ -25,6 +26,9 @@ import Network.Wai.Handler.Warp.Settings
import qualified Network.Wai.Handler.Warp.Timeout as T
import Network.Wai.Handler.Warp.Types
import Prelude hiding (catch)
+#if MIN_VERSION_conduit(1, 0, 0)
+import qualified Data.Conduit.Internal as CI
+#endif
-- Sock.recv first tries to call recvfrom() optimistically.
-- If EAGAIN returns, it polls incoming data with epoll/kqueue.
@@ -185,6 +189,8 @@ serveConnection settings cleaner port app conn remoteHost' =
serveConnection' :: ResourceT IO ()
serveConnection' = serveConnection'' $ connSource conn th
+ serveConnection'' :: Source (ResourceT IO) ByteString
+ -> ResourceT IO ()
serveConnection'' fromClient = do
(env, getSource) <- parseRequest conn port remoteHost' fromClient
case settingsIntercept settings env of
@@ -201,11 +207,25 @@ serveConnection settings cleaner port app conn remoteHost' =
requestBody env $$ CL.sinkNull
ResumableSource fromClient' _ <- liftIO getSource
+#if MIN_VERSION_conduit(1, 0, 0)
+ when keepAlive $ serveConnection''
+ $ CI.ConduitM
+ $ CI.pipeL (return ())
+ $ CI.unConduitM fromClient'
+#else
when keepAlive $ serveConnection'' fromClient'
+#endif
Just intercept -> do
liftIO $ T.pause th
ResumableSource fromClient' _ <- liftIO getSource
+#if MIN_VERSION_conduit(1, 0, 0)
+ let fromClient'' = CI.ConduitM
+ $ CI.pipeL (return ())
+ $ CI.unConduitM fromClient'
+ intercept fromClient'' conn
+#else
intercept fromClient' conn
+#endif
connSource :: Connection -> T.Handle -> Source (ResourceT IO) ByteString
connSource Connection { connRecv = recv } th = src
View
1 warp/Network/Wai/Handler/Warp/Settings.hs
@@ -1,3 +1,4 @@
+{-# LANGUAGE ImpredicativeTypes #-}
module Network.Wai.Handler.Warp.Settings where
import Control.Exception
View
6 warp/test/ConduitSpec.hs
@@ -26,8 +26,8 @@ spec = describe "conduit" $ do
rsrc' <- lift $ ibsDone ibs
z <- rsrc' $$+- CL.consume
lift $ S.concat z `shouldBe` S.pack [41..50]
- it "chunkedSource" $ do
+ it "chunkedSource" $ runResourceT $ do
(rsrc, ()) <- yield "5\r\n12345\r\n3\r\n678\r\n0\r\nBLAH" $$+ return ()
- ref <- I.newIORef (rsrc, NeedLen)
+ ref <- lift $ I.newIORef (rsrc, NeedLen)
x <- chunkedSource ref $$ CL.consume
- S.concat x `shouldBe` "12345678"
+ lift $ S.concat x `shouldBe` "12345678"

0 comments on commit eaf3aab

Please sign in to comment.
Something went wrong with that request. Please try again.