Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

single-producer and/or single-consumer variants #11

Open
jberryman opened this issue Aug 28, 2014 · 4 comments
Open

single-producer and/or single-consumer variants #11

jberryman opened this issue Aug 28, 2014 · 4 comments

Comments

@jberryman
Copy link
Owner

It would be straight-forward to write variants of the readChan and writeChan functions which can omit some atomic operations or bookkeeping when only run from a single thread. At some point I certainly plan to do this for readChan to support an even more efficient MPSC queue.

A cop-out which may happen would be to simply export

unthreadsafeReadChan :: OutChan a -> IO a

Or alternatively, a safe API which uses lazy IO:

newMPSCChan :: IO (InChan a, [a])

I'm not sure if the later is easily-done, or would have performance implications.

Please comment if you have an interest in this use case or any thoughts.

@jberryman
Copy link
Owner Author

I've started on a new variant in which reads don't block (related to #3). This will also support a much faster path for single consumers (SC), as well as pre-interleaved consumers, via a Stream type interface (see below). I'm not sure yet if I will add something similar for single-producer (SP) and interleaved producer scenarios.

Here is a bit of the API I have so far. First we have a function similar to dupChan that returns a set of streams:

-- | Produce the specified number of interleaved \"streams\" from a chan.
-- Consuming a 'UI.Stream' is much faster than calling 'tryReadChan', and
-- might be useful when an MPSC queue is needed, or when multiple consumers
-- should be load-balanced in a round-robin fashion. 
--
-- Usage example:
--
-- > do mapM_ ('writeChan' i) [1..9]
-- >    [str1, str2, str2] <- 'streamChan' 3 o
-- >    forkIO $ printStream str1   -- prints: 1,4,7
-- >    forkIO $ printStream str2   -- prints: 2,5,8
-- >    forkIO $ printStream str3   -- prints: 3,6,9
-- >  where 
-- >    printStream str = do
-- >      h <- 'tryReadStream' str
-- >      case h of
-- >        'Cons' a str' -> print a >> printStream str'
-- >        -- We know that all values were already written, so a Pending tells 
-- >        -- us we can exit; in other cases we might call 'yield' and then 
-- >        -- retry that same @'tryReadStream' str@:
-- >        'Pending' -> return ()
streamChan :: Int -> OutChan a -> IO [Stream a]

And here is what our stream type looks like (at the moment). Reading from a stream requires absolutely no coordination with other readers or writers:

-- | An infinite stream of elements. tryReadChan can be called any number of
-- times from multiple threads, and returns a value which moves monotonically
-- from 'Pending' to 'Cons' if a head element becomes available.
newtype Stream a = Stream { tryReadStream :: IO (Cons a) }

data Cons a = Cons a (Stream a) -- ^ The head element along with the tail @Stream@.
            | Pending           -- ^ The next element is not yet in the queue; you can retry 'tryReadStream'     until a @Cons@ is returned.

@jberryman
Copy link
Owner Author

Per this post Gabriel pointed out that my stream type is essentially ListT IO a from pipes, so it should be possible to make the streamChan function agnostic to the streaming library, and to provide the type I described as an instance if the user doesn't wish to use any streaming library. I'm still working to understand this approach completely.

@lukehoersten
Copy link

I have a single consumer case and a separate single producer case. Basically I'm doing fan-in reading and fan-out writing from a process. The readers and writers are all doing IO so they would all be a separate thread but in both cases, the singleton will be a single thread. Would unthreadsafeReadChan be for reading from a single thread or would the writers all have to be on the reading thread as well (I can't imagine this is the case).

Also, I'm adding new writers on new network connections so building them all ahead of time in a list would not be ideal. Dup works better except dup requires a special case to use the original chan and then dup from there on out.

@jberryman
Copy link
Owner Author

Actually I won't be including streaming library -agnostic streaming in the next release; waiting on Gabriella439/pipes#131

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants