Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial implementation of streaming request #107

Merged
merged 12 commits into from
Dec 15, 2020

Conversation

hubertlepicki
Copy link
Contributor

@hubertlepicki hubertlepicki commented Nov 28, 2020

This provides support for streaming request body with Elixir streams and Enumerable.reduce like @josevalim suggested.

I have tested this with fairly large files and it doesn't seem to be having any negative effect on memory with or without wrapping the streaming of body into Task.

@sneako suggested we don't do Task to to wrap streaming request body. @josevalim suggested we'd better wrap the whole operation in a process if we want to control the timeout on streaming request body. Here is the conversation for reference: https://elixirforum.com/t/memory-leak-binaries-that-only-recon-bin-leak-1-helps-with/35804/10

I am, however, getting to conclusion that maybe we should not be handling that directly in Finch, and instead have a timeout parameter on Mint.HTTP.stream_request_body/3. This is so we are consistent with what we do on both sending and receiving sides. Mint accepts a timeout option when receiving the payload, or establishing a connection, but does not accept a timeout option when sending payload to the server (or making a request in general), or streaming.

I think what we should be doing on Finch and other higher-level clients is to appropriately handle splitting / calculating overall and individual timeouts per chunks of data we send, but add option to Mint itself to have timeout option available when we send data to server. So, we'd add the option here, maybe also here and then to appropriate transport, in our case http1 so places like here and several other places.

So I suggest we don't try to solve this in Finch now, instead build building blocks for easily solving it in Mint, and then come back to improve the library with timeout options on sending request payload.

There is a problem here, however, in that :gen_tcp does not support timeout on send either. So this would have to be entirely contained within Mint I think.

There is also option to play with send_timeout, which I think defaults to something like 3 hours:

send(Socket, Packet) -> ok | {error, Reason}
Types:
Socket = socket()
Packet = iodata()
Reason = inet:posix()

Sends a packet on a socket.

There is no send call with timeout option, you use the send_timeout socket option if timeouts are desired. See the examples section.
The fact that the send call does not accept a timeout option, is because timeouts on send is handled through the socket option send_timeout. The behavior of a send operation with no receiver is in a very high degree defined by the underlying TCP stack, as well as the network infrastructure. If one wants to write code that handles a hanging receiver that might eventually cause the sender to hang on a send call, one writes code like the following.
Consider a process that receives data from a client process that is to be forwarded to a server on the network. The process has connected to the server via TCP/IP and does not get any acknowledge for each message it sends, but has to rely on the send timeout option to detect that the other end is unresponsive. We could use the send_timeout option when connecting:

...
{ok,Sock} = gen_tcp:connect(HostAddress, Port,
                            [{active,false},
                             {send_timeout, 5000},
                             {packet,2}]),
                loop(Sock), % See below
...

If we want to do it, however, entirely in Finch, then I don't see problem with using Task / spawned process that streams the data from it. According to my tests it works just fine.

What do you folks think?

@hubertlepicki hubertlepicki force-pushed the feature/http_request_streaming branch 5 times, most recently from 748643d to fb226cf Compare November 29, 2020 18:11
This provides initial implementation of streaming request. The tests are
missing and also possibly there may be issues with this code. This is a
work in progress.
@hubertlepicki hubertlepicki changed the title [WIP] Initial implementation of streaming request Initial implementation of streaming request Nov 29, 2020
@sneako
Copy link
Owner

sneako commented Nov 29, 2020

Great write up! I don't think Mint would want to change to support these timeouts either, but I'm interested to hear what change you would suggest for that. To me the send_timeout connect option seems like the correct way to handle this

@hubertlepicki
Copy link
Contributor Author

Then, we're done here. Minus HTTP2 support that I think we need to do to.

@sneako any other feedback / suggestions or that's good to go?

@hubertlepicki
Copy link
Contributor Author

FYI I added some more tests and started working on HTTP2 support.

@josevalim
Copy link
Contributor

The whole goal of Mint is so we don't spawn processes, so if our solution is to spawn processes, then I would prefer if it is kept outside of Finch/Mint, because it goes against the model we are trying to work with. As far as I understand, you can emulate the process model even if we keep things stateless in Mint, because you can make the stream itself stateful but using things like Stream.resource.

So, in my mind, I would keep the model in Finch as simple as possible, so all we do is Enumerable.reduce, not even using suspend, and leave all complexity to the stream itself.

@hubertlepicki
Copy link
Contributor Author

@josevalim I understand that, we could move the whole logic of fetching the next batch of bytes to send to stream and we may want to do that.

I need to think on how this will work with HTTP2 however. I don't think we care about window size / amount of bytes returned by stream in HTTP1 but HTTP2 seems to have different way to control the flow and if we send too much bytes we end up with window size errors.

Correct me if I am wrong, but that leaves us with need to transform the stream / enumerable to respect window sizes ideally and moving that to the stream itself may be actually complicating things.

@josevalim
Copy link
Contributor

@hubertlepicki http2 is a different ball game. The streaming approach with back-pressure is the best way to do it, even if with non streaming payloads (see #88). HTTP2 is also process-based, so I think you want to stream on the client and send the pieces to the server. I think you want to tackle those separately, just because of how different they are.

@hubertlepicki
Copy link
Contributor Author

hubertlepicki commented Nov 30, 2020

Yes. Okay. I can do that.

Ref. HTTP2: I actually don't know what's the policy here in this project on adding a feature that works on HTTP1 but not HTTP2. The public API is the same for both protocols. @keathley @sneako are we okay to have streaming requests working on HTTP1, but not on HTTP2 for a while, or do we want to have both merged / done at the same time?

We could also accept {:stream, enumerable} from start, and either error or warn when HTTP2 is used, or force HTTP1... I really don't know what's in-line with project policy here.

@sneako
Copy link
Owner

sneako commented Nov 30, 2020

I'm ok with just adding this for HTTP/1 initially and documenting that it isn't supported for H2 yet, since H2 really needs these window size updates before it can be as robust as our current HTTP/1 support is, but let's see what @keathley thinks too.

lib/finch/http1/conn.ex Outdated Show resolved Hide resolved
lib/finch/http1/conn.ex Outdated Show resolved Hide resolved
Suggested by @josevalim, streaming request now leaves the option to
generate the chunks of data to stream itself, reducing amount of logic
we need to add to this module.
@hubertlepicki
Copy link
Contributor Author

So, in my mind, I would keep the model in Finch as simple as possible, so all we do is Enumerable.reduce, not even using suspend, and leave all complexity to the stream itself.

Is this what you had in mind @josevalim ? keathley@ea42f8f#diff-48431cc1d91063480b5006d7585c96ea39433e319aca2b5e3a6c597fdbd7e10fR146

lib/finch/http1/conn.ex Show resolved Hide resolved
lib/finch/http1/conn.ex Outdated Show resolved Hide resolved
lib/finch/http1/conn.ex Outdated Show resolved Hide resolved
@ijunaid8989
Copy link

any update on this guys? Can it be merged?

@sneako
Copy link
Owner

sneako commented Dec 13, 2020

any update on this guys? Can it be merged?

There are still some outstanding comments to be addressed...

@hubertlepicki
Copy link
Contributor Author

hubertlepicki commented Dec 13, 2020 via email

@sneako
Copy link
Owner

sneako commented Dec 13, 2020

Yes, i am aware. Will look into it later today.

niedz., 13 gru 2020, 14:29 użytkownik nico piderman <
notifications@github.com> napisał:

any update on this guys? Can it be merged?

There are still some outstanding comments to be addressed...


You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
https://github.com/keathley/finch/pull/107#issuecomment-744007999, or
unsubscribe
https://github.com/notifications/unsubscribe-auth/AAAID6RTZJH24Y3FNKQXOSDSUS6STANCNFSM4UFZV5TQ
.

No rush, that was just meant as a reply to @ijunaid8989 😁

@hubertlepicki
Copy link
Contributor Author

hubertlepicki commented Dec 13, 2020 via email

Renames function track_response_telemetry into handle_response, also
uses Enum.reduce_while instead of Enumerable.reduce directly.

Both things requested druring code review.
@hubertlepicki
Copy link
Contributor Author

@sneako whenever you have some time, I did two of 3 requested things. I think the third thing, i.e. Telemetry.stop(:request) should in fact stay the way it is now. Let me know otherwise

Copy link
Owner

@sneako sneako left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, I think this is getting very close to ready now. I would like to see just a bit of documentation about passing the :stream tuple as the request body and also a note that only HTTP/1 supports this at the moment

lib/finch/http1/conn.ex Show resolved Hide resolved
@hubertlepicki
Copy link
Contributor Author

@sneako perfect, I'll do that.

I also plan to have a go at adding some more over all documentation and examples after that to this project, unless you want to stop me for some reason. I think at the moment it's pretty sparse over all.

@sneako
Copy link
Owner

sneako commented Dec 14, 2020

A follow up PR improving the existing documentation would definitely be appreciated, I agree it is a bit sparse at the moment, thanks!

@hubertlepicki
Copy link
Contributor Author

@ijunaid8989
Copy link

Hi sorry for commenting here but its stream related.

Right now in HTTPoison, we are doing this.

    url = "http://#{host}:#{port}/PSIA/Custom/SelfExt/ContentMgmt/download"
    opts = [stream_to: self()]

    HTTPoison.post(
      url,
      xml,
      [
        "Content-Type": "application/x-www-form-urlencoded",
        Authorization: "Basic #{Base.encode64("#{username}:#{password}")}",
        SOAPAction: "http://www.w3.org/2003/05/soap-envelope"
      ],
      opts
    )
    |> collect_response(self(), <<>>)
  def collect_response(id, par, data) do
    receive do
      %HTTPoison.AsyncStatus{code: 200, id: id} ->
        Logger.debug("Collect response status")
        collect_response(id, par, data)

      %HTTPoison.AsyncHeaders{headers: _headers, id: id} ->
        Logger.debug("Collect response headers")
        collect_response(id, par, data)

      %HTTPoison.AsyncChunk{chunk: chunk, id: id} ->
        save_temporary(chunk)
        # <> chunk
        collect_response(id, par, data)

      %HTTPoison.AsyncEnd{id: _id} ->
        Logger.debug("Stream complete")

      _ ->
        Logger.debug("Unknown message in response")
        collect_response(id, par, data)
    after
      5000 ->
        Logger.debug("No response after 5 seconds.")
    end
  end

is it possible to do streaming in finch especially in this PR.

@hubertlepicki
Copy link
Contributor Author

hubertlepicki commented Dec 14, 2020

@ijunaid8989 yes, and you don't need that PR for that. The PR is for sending out the data in streaming fashion. You want to read the data in streaming fashion it seems. You can also do both (on this pr/branch so far).

For streaming responses you need to replace your request function call with "stream" function call: https://hexdocs.pm/finch/Finch.html#stream/5

@ijunaid8989
Copy link

@hubertlepicki Okay great thank you. so after this merge,

We can send a POST multipart request, and nothing will change just, where the body was json, it will accept such data as well

<<137, 80, 78, 71, 13, 10, 26, 10, 0, 0, 0, 13, 73, 72, 68, 82, 0, 0, 0, 200, 0,
  0, 0, 43, 8, 3, 0, 0, 0, 10, 86, 55, 128, 0, 0, 0, 3, 115, 66, 73, 84, 8, 8,
  8, 219, 225, 79, 224, 0, 0, ...>>

this is a result of File.read! "unnamed.png"

@sneako
Copy link
Owner

sneako commented Dec 14, 2020

@hubertlepicki Okay great thank you. so after this merge,

We can send a POST multipart request, and nothing will change just, where the body was json, it will accept such data as well

<<137, 80, 78, 71, 13, 10, 26, 10, 0, 0, 0, 13, 73, 72, 68, 82, 0, 0, 0, 200, 0,
  0, 0, 43, 8, 3, 0, 0, 0, 10, 86, 55, 128, 0, 0, 0, 3, 115, 66, 73, 84, 8, 8,
  8, 219, 225, 79, 224, 0, 0, ...>>

this is a result of File.read! "unnamed.png"

@ijunaid8989 did you try simply building and sending a request with this body? I think the ElixirForum, or Slack are probably more appropriate places to continue your discussion as to not derail our discussion here.

lib/finch.ex Outdated Show resolved Hide resolved
lib/finch.ex Outdated Show resolved Hide resolved
@hubertlepicki
Copy link
Contributor Author

@hubertlepicki Okay great thank you. so after this merge,
We can send a POST multipart request, and nothing will change just, where the body was json, it will accept such data as well

<<137, 80, 78, 71, 13, 10, 26, 10, 0, 0, 0, 13, 73, 72, 68, 82, 0, 0, 0, 200, 0,
  0, 0, 43, 8, 3, 0, 0, 0, 10, 86, 55, 128, 0, 0, 0, 3, 115, 66, 73, 84, 8, 8,
  8, 219, 225, 79, 224, 0, 0, ...>>

this is a result of File.read! "unnamed.png"

@ijunaid8989 did you try simply building and sending a request with this body? I think the ElixirForum, or Slack are probably more appropriate places to continue your discussion as to not derail our discussion here.

No, I have not tried. I also don't think this is in any way related to the PR nor it is a good place to further discuss it either. I also think this may not be a stream in first place but a binary.

Co-authored-by: nico piderman <nico.piderman@gmail.com>
lib/finch.ex Outdated Show resolved Hide resolved
Co-authored-by: nico piderman <nico.piderman@gmail.com>
lib/finch.ex Outdated Show resolved Hide resolved
Co-authored-by: nico piderman <nico.piderman@gmail.com>
@sneako sneako requested a review from keathley December 14, 2020 19:01
sneako and others added 2 commits December 15, 2020 20:07
Remove unnecessary anonymous function call.
Extract stream_request_body/3 function to let us replace pipe to case
with a `with` clause.
mix format.
Final tweaks to request streaming before merge
@sneako sneako merged commit 5cd03fe into sneako:main Dec 15, 2020
@sneako
Copy link
Owner

sneako commented Dec 15, 2020

@hubertlepicki thanks for working through this with me, really appreciate the contribution!

@hubertlepicki
Copy link
Contributor Author

hubertlepicki commented Dec 15, 2020 via email

@sneako
Copy link
Owner

sneako commented Dec 15, 2020

Ah sorry, I missed your comment. Appreciate the offer, but I have already released version 0.6.0

Let me know how it goes!

@hubertlepicki
Copy link
Contributor Author

hubertlepicki commented Dec 15, 2020 via email

@hubertlepicki
Copy link
Contributor Author

@sneako I am about to test this thing on staging, so far so good. Will keep you posted.

One thing I noticed is that you have forgotten to push a tag v0.6.0 to GitHub so that it doesn't show up on the list.

There's couple of semi-related things I'd like to discuss, will open a separate tickets instead of continuing here.

@sneako
Copy link
Owner

sneako commented Dec 16, 2020

@sneako I am about to test this thing on staging, so far so good. Will keep you posted.

One thing I noticed is that you have forgotten to push a tag v0.6.0 to GitHub so that it doesn't show up on the list.

There's couple of semi-related things I'd like to discuss, will open a separate tickets instead of continuing here.

Yeah, we have been kind of slacking on the github tags... You'll notice the last tag is actually several versions behind. I'll try to keep them updated going forward I'll update the tag now, the releases have neglected for a few versions...

@hubertlepicki
Copy link
Contributor Author

👍 Great thank you, that makes easier to figure out for people what they are actually using.

Ref. the other things, I actually opened an issue on Tesla. I would be interested in hearing from you (and / or @keathley ) what your view is on the idea: elixir-tesla/tesla#437

@sneako
Copy link
Owner

sneako commented Dec 17, 2020

Extracting that module to a separate lib makes sense to me. I don't think we would want to add similar functionality directly to Finch, so your proposal could be a good way around that 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants