Skip to content

Commit

Permalink
Same inputs provided in passthroughSink #304
Browse files Browse the repository at this point in the history
Ensure downstream and inner sink receive same inputs, see lengthy
comment added to code for explanation
  • Loading branch information
snoyberg committed Apr 3, 2017
1 parent b13f86c commit b1f2a50
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 13 deletions.
6 changes: 6 additions & 0 deletions conduit/ChangeLog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## 1.2.9.1

* Ensure downstream and inner sink receive same inputs in
`passthroughSink`
[#304](https://github.com/snoyberg/conduit/issues/304)

## 1.2.9

* `chunksOf` [#296](https://github.com/snoyberg/conduit/pull/296)
Expand Down
55 changes: 43 additions & 12 deletions conduit/Data/Conduit/Internal/Conduit.hs
Original file line number Diff line number Diff line change
Expand Up @@ -703,23 +703,54 @@ passthroughSink :: Monad m
-> (r -> m ()) -- ^ finalizer
-> Conduit i m i
passthroughSink (ConduitM sink0) final = ConduitM $ \rest -> let
go _ (Done r) = do
-- A bit of explanation is in order, this function is
-- non-obvious. The purpose of go is to keep track of the sink
-- we're passing values to, and then yield values downstream. The
-- third argument to go is the current state of that sink. That's
-- relatively straightforward.
--
-- The second value is the leftover buffer. These are values that
-- the sink itself has called leftover on, and must be provided
-- back to the sink the next time it awaits. _However_, these
-- values should _not_ be reyielded downstream: we have already
-- yielded them downstream ourself, and it is the responsibility
-- of the functions wrapping around passthroughSink to handle the
-- leftovers from downstream.
--
-- The trickiest bit is the first argument, which is a solution to
-- bug https://github.com/snoyberg/conduit/issues/304. The issue
-- is that, once we get a value, we need to provide it to both the
-- inner sink _and_ yield it downstream. The obvious thing to do
-- is yield first and then recursively call go. Unfortunately,
-- this doesn't work in all cases: if the downstream component
-- never calls await again, our yield call will never return, and
-- our sink will not get the last value. This results is confusing
-- behavior where the sink and downstream component receive a
-- different number of values.
--
-- Solution: keep a buffer of the next value to yield downstream,
-- and only yield it downstream in one of two cases: our sink is
-- asking for another value, or our sink is done. This way, we
-- ensure that, in all cases, we pass exactly the same number of
-- values to the inner sink as to downstream.

go mbuf _ (Done r) = do
maybe (return ()) CI.yield mbuf
lift $ final r
unConduitM (awaitForever yield) rest
go is (Leftover sink i) = go (i:is) sink
go _ (HaveOutput _ _ o) = absurd o
go is (PipeM mx) = do
go mbuf is (Leftover sink i) = go mbuf (i:is) sink
go _ _ (HaveOutput _ _ o) = absurd o
go mbuf is (PipeM mx) = do
x <- lift mx
go is x
go (i:is) (NeedInput next _) = go is (next i)
go [] (NeedInput next done) = do
go mbuf is x
go mbuf (i:is) (NeedInput next _) = go mbuf is (next i)
go mbuf [] (NeedInput next done) = do
maybe (return ()) CI.yield mbuf
mx <- CI.await
case mx of
Nothing -> go [] (done ())
Just x -> do
CI.yield x
go [] (next x)
in go [] (sink0 Done)
Nothing -> go Nothing [] (done ())
Just x -> go (Just x) [] (next x)
in go Nothing [] (sink0 Done)

-- | Convert a @Source@ into a list. The basic functionality can be explained as:
--
Expand Down
2 changes: 1 addition & 1 deletion conduit/conduit.cabal
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Name: conduit
Version: 1.2.9
Version: 1.2.9.1
Synopsis: Streaming data processing library.
description:
`conduit` is a solution to the streaming data problem, allowing for production,
Expand Down
9 changes: 9 additions & 0 deletions conduit/test/main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -854,6 +854,15 @@ main = hspec $ do
x <- I.readIORef ref
x `shouldBe` (-1)

it "handles the last input correctly #304" $ do
ref <- I.newIORef (-1 :: Int)
let sink = CL.mapM_ (I.writeIORef ref)
conduit = C.passthroughSink sink (const (return ()))
res <- mapM_ C.yield [1..] C.$$ conduit C.=$ CL.take 5
res `shouldBe` [1..5]
x <- I.readIORef ref
x `shouldBe` 5

describe "mtl instances" $ do
it "ErrorT" $ do
let src = flip catchError (const $ C.yield 4) $ do
Expand Down

0 comments on commit b1f2a50

Please sign in to comment.