Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Add a conduit parser #11

Closed
wants to merge 17 commits into from

4 participants

@boothead

Hi Michael,

I've been having a play with conduits and I've implemented a conduitParser that's very similar to your sinkParser. I've done this mostly for my own amusement, and I'm pretty new to haskell so I have no idea if it's even any good (there are also no tests yet).

Thanks for a great piece of work, I've found conduits a lot easier to get my head around than iteratees et al.

Cheers,
Ben

@boothead

OK, here's a start at the inner function - it doesn't seem to run any quicker, but I haven't had time to test it properly yet. Does this look about right?

@boothead

Actually it runs much better, I just didn't build it properly!

@boothead

I've added some tests. In the process of doing that I uncovered a bug (which is nice) fixed in 80b00d3. Is there a better way to accumulate the list in this case?

@ehird

I'd love to see this in attoparsec-conduit, as I'd like to structure network-protocol-based pipelines like parseInput =$= process =$= buildOutput. blaze-builder-conduit gives me buildOutput, but I'd currently have to roll my own if I want to create parseInput from an attoparsec parser.

Some comments on the current code:

  • ConduitInput isn't used, but it's in the module and required by conduitParser. Is this intentional? (If so, why isn't there an instance for Text?)
  • Might the {-# SCC #-} annotations interfere with profiling of user code?
  • Prepending "closing" to the error message is ugly; why not include a Bool field in ParseError instead?
@boothead
  • ConduitInput is a leftover from an earlier attempt, AttoparsecInput should suffice now.
  • The {-# SCC #-} was there for my profiling, I don't know what the implications are of leaving it in vs taking it out.
  • I don't really have an opinion one or or the other about the Bool. Michael?

Thanks for the input @ehird

@boothead

Sorry, I screwed up my local branch by rebasing on top of version 0.0.2 and had to delete it and start over. Apologies to anyone who has pulled this already!

@snoyberg
Owner

I'm going to close this request for now, as it looks like it's including a lot of unnecessary commits. Can you try sending it again?

@snoyberg snoyberg closed this
@boothead

On reflection I think that my use case might be a bit too specific to be worthwhile anyway. I'll have a play with it in the latest version and if it looks useful I'll submit another pull request sans screwed up rebase commits :-)

@snoyberg
Owner

Also, you may want to look at the recently added sequence function which, when paired with sinkParser, may do exactly what you're looking for.

@kaoskorobase

I have a similar problem to the one described here. sequence works, almost:

import           Data.Aeson (encode, json)
import qualified Data.Aeson as J
import           Control.Applicative
import           Control.Monad
import           Control.Monad.IO.Class (MonadIO, liftIO)
import           Control.Monad.Trans.Class (lift)
import qualified Data.Attoparsec.ByteString as A
import qualified Data.Attoparsec.Combinator as A
import qualified Data.ByteString.Lazy as BL
import qualified Data.ByteString.Lazy.Char8 as BC
import qualified Data.ByteString as BS
import           Data.Conduit (($$), ($=), (=$))
import qualified Data.Conduit as C
import qualified Data.Conduit.Attoparsec as C
import qualified Data.Conduit.Binary as C
import qualified Data.Conduit.List as C
import           System.IO

fromJson = C.sequence (C.sinkParser json)
jsonSource = C.sourceHandle stdin $= fromJson
toJson = C.map (BS.concat . BL.toChunks . flip BC.snoc '\n' . encode)
jsonSink = toJson =$ C.sinkHandle stdout

main = C.runResourceT $ jsonSource $$ jsonSink

When piping JSON data to this little program everything is fine until EOF is
encountered:

ParseError {errorContexts = ["demandInput"], errorMessage = "not enough bytes"}

Adapting conduitParser from [1] leads to the correct behavior; note the use
of many1 in order to return all intermediate parse results:

conduitParser :: (C.MonadThrow m) =>
                        A.Parser b
                     -> C.Conduit BS.ByteString m b
conduitParser p0 = C.conduitState newParser push close
  where
    newParser = A.parse (A.many1 p0)
    push parser c
        | BS.null c = return $ C.StateFinished Nothing []
    push parser c = do
        case A.feed (parser c) BS.empty of
            A.Done leftover xs
                | BS.null leftover ->
                    return $ C.StateProducing newParser xs
                | otherwise ->
                    return $ C.StateProducing (newParser . BS.append leftover) xs
            A.Fail _ contexts msg -> C.monadThrow $ C.ParseError contexts msg
            A.Partial p -> return $ C.StateProducing p []
    close parser =
        case parser BS.empty of
            A.Done _leftover xs -> return xs
            A.Fail _ contexts msg -> C.monadThrow $ C.ParseError contexts msg
            A.Partial _ -> return [] -- A partial parse when closing is not an error

fromJson = conduitParser json

If there is no other way to make sequence work in this context, I can prepare a patch for attoparsec-conduit that adds conduitParser.

@snoyberg
Owner

The issue here is the newlines. It implies to sequence that there is more data, and therefore it tries to parse another JSON value. More recommendation would be to modify the code like so:

import qualified Data.Conduit.Binary as CB

fromJson = C.sequence $ do
    x <- C.sinkParser json
    CB.dropWhile (== 10)
    return x

I'm not trying to nix a conduitParser, just saying that this is solvable right now if you want to avoid the overhead of a separate function.

@kaoskorobase

Your version of fromJson works fine when piping a whole file to stdin, but when running the program interactively (the original usecase is running it from an existing web application, passing requests to stdin and reading responses from stdout), there's a subtle difference in behavior to conduitParser: The previous input turns up on stdout only after more input is received (how much depends on the IO buffering mode).

@snoyberg
Owner

There are two major issues with the implementation of conduitParser that you're providing here:

  • By use many1, the parser will not return results piecewise. In other words, if you have a stream with a million JSON objects, it will consume all million JSON objects and keep them in memory, and only after it can't consume any more will it return the values to the sink.
  • If you have the stream foo{, it will consume those four bytes and then return []. It should (in my opinion at least) throw an exception.

I think you need to consider what your goal here is. You actually have a very specific requirement: you want to consume JSON values, but ignore any whitespace. I think what you really should do is have some kind of parser like:

jsonOrWhite :: Parser (Maybe Value)
jsonOrWhite = (Just <$> json) <|> (many1 space >> return Nothing)

And then sequence will give you a stream of Maybe Values. Combined with a Conduit that acts like catMaybes, you should have the result you're looking for, without consuming too much memory or ignoring invalid input.

@kaoskorobase

Thanks for this solution, it's not only more elegant but also seems to be a little more efficient than the conduitParser combinator!

Just for the record, I couldn't reproduce the issues you mention above: When piping a largish (50MB) JSON file to the program, it seems to run in constant memory (around 4 MB). many1 is only used in order to obtain all complete parses from a single buffer passed to the conduit, where in the original pull request only the first one would be returned when closing the conduit.

I also cannot reproduce the behavior you mention regarding invalid input: Presented with

foo{"foo":12}

the program fails as expected with a nice error message:

ParseError {errorContexts = [], errorMessage = "Failed reading: satisfy"}
@snoyberg
Owner

You're correct about the streaming: I hadn't noticed that you're explicitly passing B.empty to the parser to try and flush it. This might work, but attoparsec can be very finicky about receiving empties.

I made a typo about the invalid input, I meant:

{"foo":
@kaoskorobase

According to the attoparsec documentation, passing BS.empty seems to the correct way to flush a parser (see the documentation for Partial).

Regarding invalid/partial input, I still don't see your point: With the input above the parser fails with

ParseError {errorContexts = ["}"], errorMessage = "Failed reading: satisfyWith"}

In summary I would say that my requirements are not very specific: I want to process a sequence of tokens that are the results of a parser applied repeatedly to the input stream. In my naïve first approach I expected that sequence . sinkParser would do what I want (note that Data.Aeson.json skips whitespace at the beginning of objects), but it diverges when receiving endOfInput, that's why I adapted conduitParser from the original pull request.

My proposal would be to add conduitParser in order to facilitate the naïve approach or to document why the naïve approach isn't supposed to work in the first place and point to possible alternatives.

@snoyberg
Owner

I don't know why you're getting different results than I did, but when I ran the code, it definitely did not give an error message. But here's a more concrete example of why the flushing approach is bad: it completely changes the meaning of the code.

import qualified Data.Attoparsec.ByteString as A
import qualified Data.Attoparsec.ByteString.Char8 as A8
import qualified Data.ByteString as BS
import qualified Data.ByteString.Char8 as B8
import           Data.Conduit (($$), ($=))
import qualified Data.Conduit as C
import qualified Data.Conduit.Attoparsec as C
import qualified Data.Conduit.List as C
import Data.Char (isSpace)
import Data.Conduit.Attoparsec (sinkParser)

conduitParser :: (C.MonadThrow m) =>
                        A.Parser b
                     -> C.Conduit BS.ByteString m b
conduitParser p0 = C.conduitState newParser push close
  where
    newParser = A.parse (A.many1 p0)
    push _ c
        | BS.null c = return $ C.StateFinished Nothing []
    push parser c = do
        case A.feed (parser c) BS.empty of
            A.Done leftover xs
                | BS.null leftover ->
                    return $ C.StateProducing newParser xs
                | otherwise ->
                    return $ C.StateProducing (newParser . BS.append leftover) xs
            A.Fail _ contexts msg -> C.monadThrow $ C.ParseError contexts msg
            A.Partial p -> return $ C.StateProducing p []
    close parser =
        case parser BS.empty of
            A.Done _leftover xs -> return xs
            A.Fail _ contexts msg -> C.monadThrow $ C.ParseError contexts msg
            A.Partial _ -> return [] -- A partial parse when closing is not an error

word :: A.Parser BS.ByteString
word = A8.skipWhile isSpace >> A8.takeWhile1 (not . isSpace)

wordsConduit1 :: C.MonadThrow m => C.Conduit BS.ByteString m BS.ByteString
wordsConduit1 = conduitParser word

wordsConduit2 :: C.MonadThrow m => C.Conduit BS.ByteString m BS.ByteString
wordsConduit2 = C.sequence $ sinkParser word

main :: IO ()
main = do
    (src $= wordsConduit1 $$ C.consume) >>= print
    (src $= wordsConduit2 $$ C.consume) >>= print
  where
    src = C.sourceList $ map B8.pack ["foo", "bar", " baz"]

The first one prints the words "foo", "bar", and "baz", while the second prints "foobar" and "baz".

@kaoskorobase

I see, that's a problem ... it's incorrect to flush the parser in push, because then semantics depend on how the input is chunked. It's also not enough to flush the parser in close, because then, as you noted above, processing is not done in constant-space anymore.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Jan 30, 2012
  1. @boothead

    Add a conduit parser

    boothead authored boothead committed
    The conduitParser function produces a Conduit that continually
    applies the parser against the stream, recreating it when Done
    is returned.
  2. @boothead

    Remove spurious language pragma

    boothead authored boothead committed
  3. @boothead

    Branch to try and improve efficiency

    boothead authored boothead committed
  4. @boothead

    Added an inner function to parse the leftover

    boothead authored boothead committed
  5. @boothead

    Add the ConduitInput instance for ByteString back in

    boothead authored boothead committed
  6. @boothead

    Fix close to not error in the case of A.Fail with no leftover

    Ben Ford authored boothead committed
  7. @boothead

    Beginings of a test suite

    boothead authored boothead committed
  8. @boothead

    Add a quick check test for sinkParser

    Ben Ford authored boothead committed
  9. @boothead

    Add emacs files with hashes in to gitignore

    Ben Ford authored boothead committed
  10. @boothead

    Add a second quickcheck property for sinkParser

    boothead authored boothead committed
  11. @boothead

    Reverse the result of parsing a leftover bytestring

    boothead authored boothead committed
    I don't know if this is the most efficient way to do it, or if there's a
    way to accumulate the list in the correct order in the first place
  12. @boothead

    Add a quick check property for conduitParser and hlint

    boothead authored boothead committed
  13. @boothead

    Hlint suggestions

    boothead authored boothead committed
  14. @boothead
  15. @boothead
  16. @boothead
  17. @boothead
This page is out of date. Refresh to see the latest.
View
4 .gitignore
@@ -1,6 +1,10 @@
*.swp
+\#*
+.#*
dist/
conduit/tmp
conduit/tmp2
*.hi
*.o
+*/.compiled
+cabal-dev
View
46 attoparsec-conduit/Data/Conduit/Attoparsec.hs
@@ -5,13 +5,14 @@
-- Copyright: 2011 Michael Snoyman, 2010 John Millikin
-- License: MIT
--
--- Turn an Attoparsec parser into a 'C.Sink'.
+-- Turn an Attoparsec parser into a 'C.Sink' or 'C.Conduit'.
--
-- This code was taken from attoparsec-enumerator and adapted for conduits.
module Data.Conduit.Attoparsec
( ParseError (..)
, AttoparsecInput
, sinkParser
+ , conduitParser
) where
import Control.Exception (Exception)
@@ -49,6 +50,7 @@ instance AttoparsecInput B.ByteString where
isNull = B.null
notEmpty = filter (not . B.null)
+
instance AttoparsecInput T.Text where
parseA = Data.Attoparsec.Text.parse
feedA = Data.Attoparsec.Text.feed
@@ -74,8 +76,48 @@ sinkParser p0 = C.sinkState
in return (C.StateDone lo x)
A.Fail _ contexts msg -> lift $ C.resourceThrow $ ParseError contexts msg
A.Partial p -> return (C.StateProcessing p)
- close parser = do
+ close parser =
case feedA (parser empty) empty of
A.Done _leftover y -> return y
A.Fail _ contexts msg -> lift $ C.resourceThrow $ ParseError contexts msg
A.Partial _ -> lift $ C.resourceThrow DivergentParser
+
+
+-- | Convert an Attoparsec 'A.Parser' into a 'C.Conduit'. The parser will
+-- be streamed bytes until the source is exhausted. When done is returned a new
+-- parser is created and fed with anything leftover in the stream before resuming.
+--
+-- If parsing fails, a 'ParseError' will be thrown with 'C.resourceThrow'.
+conduitParser :: (AttoparsecInput a, C.ResourceThrow m) =>
+ A.Parser a b
+ -> C.Conduit a m b
+conduitParser p0 = C.conduitState
+ (parseA p0)
+ push
+ close
+ where
+ push parser c | isNull c = return $ C.StateProducing parser []
+ push parser c = {-# SCC "push" #-}
+ case doParse parser c [] of
+ Left pErr -> lift $ C.resourceThrow pErr
+ Right (cont, results) -> return $ C.StateProducing cont (reverse results)
+
+ -- doParse :: (A.Parser a b) -> a -> [b]
+ -- -> Either ParseError ((a -> A.IResult a b), [b])
+ doParse parser inp results = {-# SCC "parse" #-}
+ case parser inp of
+ A.Done leftover x
+ | isNull leftover ->
+ Right (parseA p0, x : results)
+ | otherwise ->
+ doParse (parseA p0) leftover (x:results)
+ A.Fail _ contexts msg -> Left $ ParseError contexts msg
+ A.Partial p -> return (p, results)
+
+ close parser =
+ case feedA (parser empty) empty of
+ A.Done _leftover y -> return [y]
+ A.Fail leftover _ _ | isNull leftover -> return []
+ A.Fail _ contexts msg -> lift $ C.resourceThrow $ ParseError contexts ("closing " ++ msg)
+
+ A.Partial _ -> lift $ C.resourceThrow DivergentParser
View
2  attoparsec-conduit/attoparsec-conduit.cabal
@@ -35,7 +35,9 @@ test-suite test
, bytestring
, blaze-builder
, transformers
+ , attoparsec-conduit
, text
+ , attoparsec >= 0.10
ghc-options: -Wall
source-repository head
View
91 attoparsec-conduit/test/main.hs
@@ -1,35 +1,88 @@
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE CPP #-}
+module Main where
import Test.Hspec.Monadic
-{-
+
import Test.Hspec.HUnit ()
import Test.Hspec.QuickCheck (prop)
import Test.HUnit
+import Test.QuickCheck
+import qualified Data.Attoparsec as A
+import qualified Data.Attoparsec.Char8 as AC8
import qualified Data.Conduit as C
+import Data.Conduit
+import qualified Data.Conduit.Attoparsec as CA
import qualified Data.Conduit.List as CL
-import qualified Data.Conduit.Lazy as CLazy
-import qualified Data.Conduit.Text as CT
-import Data.Conduit.Blaze (builderToByteString)
-import Data.Conduit (runResourceT)
+import Control.Monad
import Control.Monad.ST (runST)
import Data.Monoid
import qualified Data.ByteString as S
-import qualified Data.IORef as I
-import Blaze.ByteString.Builder (fromByteString, toLazyByteString, insertLazyByteString)
-import qualified Data.ByteString.Lazy as L
+import qualified Data.ByteString.Char8 as C8
import Data.ByteString.Lazy.Char8 ()
-import Data.Maybe (catMaybes)
-import Control.Monad.Trans.Writer (Writer)
-import qualified Data.Text as T
-import qualified Data.Text.Lazy as TL
-import qualified Data.Text.Lazy.Encoding as TLE
-import Control.Monad.Trans.Resource (runExceptionT_, withIO, resourceForkIO)
-import Control.Concurrent (threadDelay, killThread)
-import Control.Monad.IO.Class (liftIO)
-import Control.Applicative (pure, (<$>), (<*>))
--}
+import Data.List (intersperse)
+import Control.Monad.Trans.Resource (runExceptionT_)
+import Control.Applicative -- (pure, (<$>), (<*>))
+
+instance Arbitrary S.ByteString where
+ arbitrary = do
+ len <- choose (0, 10)
+ chars <- replicateM len (choose ('a', 'z'))
+ return $ C8.pack chars
+
+testParseWord :: String -> A.Parser S.ByteString
+testParseWord s = AC8.string (C8.pack s) <* AC8.space
main :: IO ()
main = hspecX $ do
- return ()
+
+ let src = CL.sourceList $ C8.pack <$> lines "test one two\nthree four "
+ let takeWord = AC8.takeWhile (/=' ')
+
+
+
+ describe "parserSink" $ do
+
+ it "only runs parser once" $ do
+ res <- C.runResourceT $ src $$ CA.sinkParser $ testParseWord "test"
+ res @?= "test"
+
+ it "leaves the rest of input" $ do
+ (x, y) <- runResourceT $ do
+ bsrc <- C.bufferSource src
+ x <- bsrc $$ CA.sinkParser $ testParseWord "test"
+ y <- bsrc $$ CL.consume
+ return (x, y)
+ y @?= C8.lines "one two\nthree four "
+
+ prop "parse first word == head" $
+ \inp inp2 -> not (S.null inp) ==>
+ let res = runST $ runExceptionT_ $ runResourceT $
+ CL.sourceList [inp, inp2]
+ $$ CA.sinkParser $ AC8.takeWhile (/=' ')
+ in res == C8.takeWhile (/=' ') (inp `mappend` inp2)
+
+ prop "parse first word leaves exactly tail" $
+ \inp inp2 -> not (S.null inp) ==>
+ let res = runST $ runExceptionT_ $ runResourceT $ do
+ bsrc <- C.bufferSource $ CL.sourceList [inp, inp2]
+ _ <- bsrc $$ CA.sinkParser $ takeWord
+ C8.concat <$> (bsrc $$ CL.consume)
+ in res == C8.dropWhile (/=' ') (inp `mappend` inp2)
+
+ describe "paserConduit" $ do
+
+ it "runs parser continuously" $ do
+ res <- runResourceT $ src
+ $= CA.conduitParser (takeWord <* AC8.space)
+ $$ CL.consume
+ res @?= ["test", "one", "twothree", "four"]
+
+ prop "parse word and space == init words" $
+ \inp -> length inp > 3 ==>
+ let res = runST $ runExceptionT_ $ runResourceT $
+ CL.sourceList (intersperse " " inp)
+ $= CA.conduitParser (takeWord <* AC8.char ' ')
+ $$ CL.consume
+ in res == init inp
+
Something went wrong with that request. Please try again.