New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flaws of Lwt_stream #250

Open
aantron opened this Issue May 23, 2016 · 31 comments

Comments

Projects
None yet
8 participants
@aantron
Copy link
Collaborator

aantron commented May 23, 2016

Please tell why Lwt_stream doesn't work for you, ideally at the level of semantics (i.e. prefer "lazy streams aren't right for our use case because blahblah" to "Lwt_stream is missing a specific function that can easily be added").

Hopefully we can

  • change it in a minor or major release,
  • settle on some helper library or libraries aside from Lwt, and gradually starting moving Lwt_stream towards deprecation,
  • keep it, if you think it's great,
  • or your alternative proposals.

cc @c-cube @hcarty @seliopou @Drup @diml

Please pull in anyone else who has interest/expertise.

Some existing anguish: #239, #151.

EDIT: Despite the issue title, thank you to @diml.

EDIT: Note also #155.

@c-cube

This comment has been minimized.

Copy link
Collaborator

c-cube commented May 24, 2016

I don't have the solution(s), but issues with Lwt_stream that come to my mind are:

  • the semantics is not trivlal (several ways of creating a stream, with several distinct semantics)
  • biggest issue is resource management. I am not sure how to handle this at all (doing it really cleanly might require linear types?); the thing is, you never know which part of the code might be using a stream (or the Lwt_stream.map f (Lwt_stream.filter g the_stream)), so when the consumer is done it doesn't really have a way of signaling the whole pipeline and having the underlying source closed (if the_stream is a file descriptor, say).
  • I have an issue somewhere with multiple producer/multiple consumers, I think, although it's been a while and I don't really remember precisely.

I think Lwt_stream should mostly stay as is; opam makes it easy to depend on 3rd party libraries that have more freedom to explore, at least for now. It's like Stream in the stdlib, it's almost impossible to improve in a retrocompatible way. I don't think it's time for deprecation yet, not until we have robust, well tested alternatives in 3rd party libs.

@aantron

This comment has been minimized.

Copy link
Collaborator Author

aantron commented May 24, 2016

Thanks. Agreed.

3rd party libraries that have more freedom to explore

I would like to move this along. Of course, Lwt_stream will remain mostly as is for a while – we have to be conservative. But I am highly open to, for example, "blessing" an alternative library in the docs of Lwt_stream, if we have/develop one that addresses many of the issues people have. It's kind of sad to offer a module that:

  • appears to provide nice streams,
  • but, after trying it, users discover that it doesn't do what they want,
  • which then forces people to do research to find an alternative.

If Lwt_stream really is like this, then It's a waste of people's time, and we should look to address it – even if only with documentation for now.

@hcarty

This comment has been minimized.

Copy link
Contributor

hcarty commented May 24, 2016

Lwt_stream is imperfect but I wouldn't go as far as saying it sucks. It does serve a purpose! Both push- and pull-driven streams are useful to have. Having a finite > 1 set of consumers is possible but takes some manual juggling to make it work - certainly not ideal for users. If Lwt_stream can be improved without breaking the Lwt-using world then it would be good to do so.

An example for a potential Lwt_stream.map_n to show one way to have a capped concurrent map over a stream:

open Lwt.Infix

let map_n ?(max_threads = 1) f stream =
  begin
    if max_threads <= 0 then
      Lwt.fail_invalid_arg "map_n: max_threads must be > 0"
    else
      Lwt.return_unit
  end >>= fun () ->
  let rec loop running available =
    begin
      if available > 0 then (
        Lwt.return (running, available)
      )
      else (
        Lwt.nchoose_split running >>= fun (complete, running) ->
        Lwt.return (running, available + List.length complete)
      )
    end >>= fun (running, available) ->
    Lwt_stream.get stream >>= function
    | None ->
      Lwt.join running
    | Some elt ->
      loop (f elt :: running) (pred available)
  in
  loop [] max_threads
@c-cube

This comment has been minimized.

Copy link
Collaborator

c-cube commented May 24, 2016

I agree with @hcarty, it doesn't suck, but it has shortcomings.

A small suggestion on the map_n implementation: I tend to use Lwt_pool and Lwt_list.map_p for this kind of things, so I wonder if it would fit to stream as well.

@hcarty

This comment has been minimized.

Copy link
Contributor

hcarty commented May 24, 2016

@c-cube That was my first attempt, but map_p concurrently pulls stream elements and spawns new Lwt threads as long as the stream has more to take. That was a problem for very large or infinite streams. Lwt_pool blocks within the mapping threads but it doesn't prevent new threads from spawning.

@pqwy

This comment has been minimized.

Copy link
Contributor

pqwy commented May 27, 2016

Please tell why Lwt_stream doesn't work for you, ideally at the level of semantics [...]

Consider a finite stream that yields k successive elements during a program execution. Consider n concurrent readers on this stream. I expect precisely one of these statements to hold, if the program runs long enough:

  • The number of dequeuing events is n * k. This means that every reader gets to see every element, which is the semantics expected of lazy lists. In terms of monad transformers, this is ListT Lwt a.
  • The number of dequeuing events is k. Each element is seen only by a single reader, e.g. each dequeuing event carries a unique element. With respect to uniqueness, this is what's sometimes described as a "channel", and matches what people often expect of a "stream".

Presently, neither of these is the case. The number of dequeuing events ,x, is k <= x <= n * k. Better yet, it is non-deterministic, and impossible to know statically.

Operationally, if there is an element available in the stream, the following Lwt_stream.next returns it, and removes it from the stream. If there is no element, however, a reader blocks on the stream. Several readers can end up blocking on the same stream in this time window, and when the next element becomes available, it is pushed to all of them.

In operational terms, this makes certain sense. Semantically, it makes no sense whatsoever, and makes streams between impossible to reason about, and plain useless, in situations with multiple readers.

This, purely semantic, hitch is my greatest gripe about streams, and it's the one I complained about in #151. The fix proposed really only addressed the semantics, but it can both lead to thread starvation and causes needless work. I have a draft of the updated stream implementation, which takes care of explicitly queuing the readers. It guarantees the behavior 2 without these downsides. In addition, it makes the implementation shorter. If there is any interest, I can flesh it out, benchmark it against the current version, and post for review. Alternatively, I was playing with the idea of publishing this implementation as a separate package.

A further issue is that push-based streams are conflated with pull-based streams, and it can be argued that push-streams should be more like channels, while pull-streams should be more like lists, but I can live with the two being lumped into one as this didn't bite me in practice.

Edit:

Forgot to add that the test suite passes when the concurrent-readers behavior is changed, and I could not provoke other programs to misbehave with that code. So changing this is totally feasible.

@c-cube

This comment has been minimized.

Copy link
Collaborator

c-cube commented May 27, 2016

That's an interesting analysis. I like this semantics of "dequeuing events".

Note that lwt-pipe, which is just a prototype so far, implements the semantics "exactly k dequeuing events". No event is lost just because there is no consumer, and the pipe acts as channel (it is really a channel).

For the n * k semantics, I don't see any good alternative to the (lazy) list, because there is no reliable way of knowing how many consumers are waiting for each item. A list would be a mere immutable value, with all the associate benefits of functional programming.

type 'a llist = 'a llist_cell Lwt.t lazy_t
and 'a llist_cell =
  | Nil
  | Cons of 'a * 'a llist
@hcarty

This comment has been minimized.

Copy link
Contributor

hcarty commented May 27, 2016

@pqwy A desire to have your second case with k events is what prompted my to write the map_n implementation above

@aantron

This comment has been minimized.

Copy link
Collaborator Author

aantron commented May 28, 2016

I'd like to agree with @c-cube and @hcarty above that Lwt_stream doesn't "suck." It has flaws. But we do want to hear from people that think the flaws are really serious, who might not comment if they don't think we are willing to address them – even if we have to, hypothetically, replace Lwt_stream. In all cases, thank you to @diml for his work :) And thanks for the comments so far.

@aantron aantron changed the title Lwt_stream sucks (?) Flaws of Lwt_stream May 28, 2016

@hcarty

This comment has been minimized.

Copy link
Contributor

hcarty commented Jun 15, 2016

@aantron Are you interested in a PR for the Lwt_stream.map_n implementation above, or something close to it?

@aantron

This comment has been minimized.

Copy link
Collaborator Author

aantron commented Jun 16, 2016

@hcarty If you could wait, and we discuss it later, that would be best. I am still pretty far off in the backlog from being able to give this topic the consideration it deserves. It wouldn't be fair to ask you to do work when I can't respond to it properly. But PRs are always appreciated in principle, so thank you for offering :)

@rneswold

This comment has been minimized.

Copy link
Contributor

rneswold commented Jun 24, 2016

I was experimenting with alternate implementations for streams and, after some discussions with @aantron (#257), created this:

Lwt_stream.mli: https://gist.github.com/rneswold/5ace9696ec92a41825f6f08607ff5dec
Lwt_stream.ml: https://gist.github.com/rneswold/1f7feccf7383fe578c6d796b62ba654c

This uses the most basic primitive of LWT as its data structure, so I don't think there could be an implementation that uses less resources.

Each element of the stream is a thread obtained from Lwt.wait. The push function uses the wakener to provide a value and extend the "chain". An Lwt_stream.t (an opaque type) is a reference to a thread. Multiple threads, with their own cloned stream, will be blocking on the same, underlying thread so when data is added, all threads get unblocked.

When the push function goes out of scope, the final node in the stream becomes fail End_of_stream.

@pqwy: This implementation promises n * k dequeued events, if each thread owns a clone of the stream. If Lwt_stream.next is wrapped with Lwt_mutex.with_lock, threads can share a stream and you get n dequeues.

More complicated requirements (e.g. bounded streams) could be built from this. I find, however, the simplicity of this example appealing. As is, it appears to be a very efficient way to broadcast data to multiple threads.

@c-cube

This comment has been minimized.

Copy link
Collaborator

c-cube commented Jun 25, 2016

@rneswold just a nitpick, to me, handling non-memory resources using the GC is quite bad. If you wait until a value is finalized to close the stream (and release the underlying resources such as file descriptors) it will probably be too late, and the program will fail with "too many file descriptors are open". Same, requiring to exhaust the stream before it closes is really inefficient (what if I read the first lines of a 100MB file, then do not need the line iterator? reading the whole file before closing it is quite inefficient). I'd argue that a n dequeue structure (a bounded mutable queue) is more efficient in most cases, and you can build a lazy immutable stream on top if you need n * k behavior, at the cost of more memory.

@rneswold

This comment has been minimized.

Copy link
Contributor

rneswold commented Jun 26, 2016

@c-cube In general, I agree with you. Many types of resources should be reclaimed as soon as possible. In the case of streams, however, we can make an exception:

  • A stream container is an in-memory container, so its contents' resources are already tied to the garbage collector.
  • The current stream implementation holds its state in a record, its data in a chain (i.e. list) of nodes, and getting the next value allocates a thread to block or read the next value. In my example, the state is a reference to the very thread that will block. Put another way, if 100 threads were reading a stream, under the current implementation there are n unread items (allocations) in the stream and there will be 100 * n threads required to read all the data. Under my example, there are 2 * n allocations per stream item (the thread and the waker). There are no additional allocations required to block the threads reading the data.

I've been reading about ways to write software so that many runtime errors can be caught at compile time1 (by designing the API in such a way that it's impossible to get in the runtime error state.) My stream implementation follows this in that you can only add items to the stream. When the push function goes out of scope, the stream is closed. There's no need to check at run-time whether you're pushing values on a closed stream because it's impossible.

This stream model is very useful and efficient in broadcasting data to LWT threads.

I think your concern is that, if we read a file and convert it to a stream of lines, the file handle is tied to the lifetime of the stream. This is a valid concern and I'm glad you brought it up. In fact, I encourage criticisms. But rather than write-off this implementation, I'd like to see if there's a way to support your use-cases with minimal change to the core design.


[1] https://www.cl.cam.ac.uk/teaching/1415/L28/gadts.pdf

@rneswold

This comment has been minimized.

Copy link
Contributor

rneswold commented Jun 27, 2016

Put another way, if 100 threads were reading a stream, under the current implementation...

Wow. My reasoning and math sounded good until I started measuring. The current Lwt_stream consistently beats my implementation from 15% to 30% (using 4.02.3)! My benchmark module is here:

bench.ml : https://gist.github.com/rneswold/b47e3ce141ae12e10695972de2fba68f

I set n_stream to 1,000,000 entries and then ran 1 to 1,000 threads reading it. I then set n_stream to 1,000 and ran 1,000 to 100,000 threads reading it. I'm going to play around with this some more (for instance, is it the writer or readers that isn't scaling well?)

@c-cube

This comment has been minimized.

Copy link
Collaborator

c-cube commented Jun 28, 2016

@rneswold I'd be interested in the addition of my lwt-pipe to your benchmark :)

@michipili

This comment has been minimized.

Copy link

michipili commented Sep 13, 2016

I am working a lot with NodeJS and js_of_ocaml and the state of tail-recursion optimisation in js_of_ocaml forces me to write my own stream iterators. It would be very useful to have streams as a separate library (I mean, a separate linker unit) to ease the use of a drop in replacement for the functions it defines.

@rneswold

This comment has been minimized.

Copy link
Contributor

rneswold commented Jan 31, 2017

Here's an updated functional stream implementation I've been playing with:

https://gist.github.com/rneswold/0d80560a80314ce3f1aa57a64ee406dd

@aantron pointed out to me the equivalency between IVar and Lwt.t so I based the stream contents on a chain of resolved Lwt.t values terminated with a Fail. clone simply becomes the identity function. The combinators, map, filter, combine, and append compose nicely and, no matter how many threads read from them, each node of their content is calculated once (an earlier version of this module calculated each node n times for n consumers.)

Since the implementation is based on Lwt.t values, any improvements made to the LWT scheduler will automatically improve the performance of this module.

It still doesn't address @c-cube 's concerns of lazy consumption. I tried writing Lwt_fstream.of_list to be consumer-driven, but couldn't do it without complicating the core. I'm still trying to find a solution for this use-case.

I provide this example to further discussion of this issue. At the very least, I think I'll package this up as a separate OPAM package once I finish the remaining functions (to make it closely compatible with Lwt_streams API.)

@mrvn

This comment has been minimized.

Copy link

mrvn commented Apr 14, 2017

@c-cube

just a nitpick, to me, handling non-memory resources using the GC is quite bad. If you wait until a value is finalized to close the stream (and release the underlying resources such as file descriptors) it will probably be too late, and the program will fail with "too many file descriptors are open".

I disagree. The Gc runs depending on how much resources you allocate. If you use a custom block with finalizer you can fine tune how often the Gc should run. For anything holding file descriptors I would say that should be at least every 100 file descriptors. That way you can have 900 files open and being reachable and still never run out of file descriptors. As soon as you get to 1000 open file descriptors the Gc would run and get you back to the 900 still reachable ones so you never hit the 1024 limit.

If I see a problem here it's more that standard file decriptors are not custom blocks but simply ints. So they aren't Gc'ed at all. They are not counted as resource other than the 4/8 bytes of memory they take up in some record or tuple. And then the Gc happily creates 1 million of them before the minor heap gets processed. Obviously then you run out of file descriptors. The above only works if wrap them into a custom block at a higher level.

@c-cube

This comment has been minimized.

Copy link
Collaborator

c-cube commented Apr 14, 2017

@mrvn I didn't know you could do such things in custom blocks. But as things stand now, the GC will not protect you against this very annoying error of "too many file descriptors", and it is not clear (to me) how Lwt can change that.

@mrvn

This comment has been minimized.

Copy link

mrvn commented Apr 14, 2017

@c-cube You would have to wrap every data structure containing a file decriptor in Lwt with a custom block. The Unix.file_descr should have been a custom block in the first place and close itself when it becomes unreachable (unless already closed explicitly). Only way to tell the Gc about the cost of some external resource that I know of, i.e. make it run more often.

@c-cube

This comment has been minimized.

Copy link
Collaborator

c-cube commented Apr 14, 2017

That is why I prefer RAII-like approaches, especially with_foo functions.

@kydos

This comment has been minimized.

Copy link

kydos commented Jun 13, 2018

@aantron should Lwt_stream be used for production code at all? In the documentation page (http://ocsigen.org/lwt/dev/api/Lwt_stream) you suggest to look at @c-cube's lwt-pipe, but that is also marked as incomplete -- while the functionalities I tried seem to work fine.

I like the CML approach to concurrency and it would be nice to have something that we could rely upon with Lwt. The other aspect of channels that I like is that they enforce the share nothing, thus should make it easier to have code that will run properly on ocaml multicore. What is your recommendation?

Thanks very much in advance!

A+

@aantron

This comment has been minimized.

Copy link
Collaborator Author

aantron commented Jun 13, 2018

@kydos yep, Lwt_stream can/should be used. It remains stable and maintained. If lwt-pipe suits your needs, though, we suggest you use that.

The idea of that message is to avoid people using Lwt_stream so much by default, that we don't get enough feedback to develop lwt-pipe or other alternatives. Maybe the message in the docs should be adjusted.

@kydos

This comment has been minimized.

Copy link

kydos commented Jun 13, 2018

Thank for the prompt response @aantron. I agree that the note on the documentation should be updated as it is perhaps a bit too discouraging :-)

Keep up with the great work on Lwt!

A+

@kydos

This comment has been minimized.

Copy link

kydos commented Jun 14, 2018

@aantron, to give you some more context, we are using OCaml and Lwt to build a secure and high performance broker for the zenoh protocol. If you are familiar with NDN (Named Data Networking), zenoh addresses the problem of NDN while taking into account the new requirements posed by IoT.

Anyway, I'd like to maintain the "impure" code well isolated and use channels to compose the different processing stages. OCaml requires some discipline to ensure that no mutable state is ever transmitted across a channel, but let's stay we enforce that currently by design and careful coding.

As you can imagine we need the channels (or streams) connecting the various stages to be quite performant. Thus I did some quick benchmarking and my findings give the following rankings in terms of raw throughput as well as stability of the throughout:

  1. Lwt_stream
  2. Lwt_pipe
  3. Event + Threads (a distant #3)

Thus, going back to the reflection on your statement from yesterday, it looks like Lwt_stream is the way to go when using Lwt. That said, we want to use Lwt not only because we like it but also because of MirageOS.

If you are interested I can contribute the test code.

Ciao,
@kydos

P.S. Have you done much experimenting with Lwt on ocaml-multicore? I was it is one of the open issues. Once you have something working, if you want us to do some benchmarking on throughput would be happy to do so.

@c-cube

This comment has been minimized.

Copy link
Collaborator

c-cube commented Jun 14, 2018

That's interesting. I'd be curious to know by how much lwt_stream is faster than lwt-pipe?
(Note that the goal of lwt-pipe, when I had time to work on it, was to provide a clean push-based abstraction which would play nice with resources when readers are faster than writers. The problem is that it's still very immature).

@aantron

This comment has been minimized.

Copy link
Collaborator Author

aantron commented Jun 14, 2018

@kydos I've done only some rudimentary experiments with Lwt on multicore. I'll be sure to get back when there is something concrete.

@kydos

This comment has been minimized.

Copy link

kydos commented Jun 15, 2018

@c-cube, on the in-bound processing path the writer is the transport plug-in, which may be reading data from TCP/IP, UDP/IP, raw ETH, etc., and producing protocol frames. Next is our protocol engine. Thus in this case the reader may be slower than the writer, especially on high throughput networks or with large numbers of entries on our routing tables.

In any case the test I did was synthetic. The ultimate truth would be provided by running throughput tests on our broker and seeing what the difference really is. Perhaps I'll have somebody in our team looking at this, but won't be probably before a month. I'll let you know.

aantron added a commit that referenced this issue Jun 25, 2018

@c-cube

This comment has been minimized.

Copy link
Collaborator

c-cube commented Feb 12, 2019

@kydos I updated lwt-pipe recently and changed some internals (along with a user contributing read_with_timeout). Would you be interested in taking another look? :)

@kydos

This comment has been minimized.

Copy link

kydos commented Feb 13, 2019

@c-cube I'll run the test and will let you know what I get.

A+

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment