Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

req_perform_stream(round = c("byte", "line")) #437

Merged
merged 18 commits into from
Feb 14, 2024

Conversation

romainfrancois
Copy link
Contributor

The backstory for this proposal is mlverse/chattr#63 The failure in chattr is because on occasions the callback in req_perform_stream() truncates the bytes mid multi byte string, e.g. only including 2 bytes in a 3 bytes utf-8 string.

This is because req_perform_stream() consumes the curl connection with a readBin() call and a fixed number of bytes.

This proposes to add a req_perform_stream_lines sister function that uses readLines() instead. This would be useful when the stream is known to return text, e.g. json that can then be dealt with line by line.

I can polish the PR if this gets buy-in. I'll do a pr to mlverse/chattr to demonstrate a potential use of req_perform_stream_lines().

However, what I really believe is that req_perform_stream() should instead evolve so that "get the next chunk" is abstracted, e.g. something like:

|> req_perform_stream(callback = show_bytes, chunk = stream_kb(64))
|> req_perform_stream(callback = show_bytes, chunk = stream_lines(1L))

romainfrancois added a commit to romainfrancois/chattr that referenced this pull request Feb 8, 2024
R/req-perform-stream.R Outdated Show resolved Hide resolved
Copy link
Member

@hadley hadley left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this! It seems like a solid approach and I like your proposal for req_perform_stream() syntax. I think the only thing that's missing is some way to test this, which I assume will need something in webfakes that involves sending some characters, then sleeping, then sending a newline?

R/req-perform-stream.R Outdated Show resolved Hide resolved
R/req-perform-stream.R Outdated Show resolved Hide resolved
R/req-perform-stream.R Outdated Show resolved Hide resolved
@romainfrancois
Copy link
Contributor Author

Thanks for the review. I'll follow up.

I think n= is not needed, and we can either:

  • have it always return line by line, i.e. n = 1L so that the callback has to handle one line. I think this is my preferred option with my initial use case.
  • have it return all complete lines that are available in the buffer

No strong preference, either option eliminate the n= argument so that both variants can have the same signature.

@hadley
Copy link
Member

hadley commented Feb 10, 2024

Yeah, good point. I think having it read all of the lines is fine, since most operations are going to be vectorised anyway.

Maybe you could have something like round = c("byte", "char", "line")? Then it would always return the same type of thing, but you'd know that you were getting an integer number of things once you split them up.

@romainfrancois romainfrancois changed the title proposal for req_perform_stream_lines() proposal req_perform_stream(round = "lines") Feb 11, 2024
@romainfrancois
Copy link
Contributor Author

Made another pass, Instead of adding the req_peform_stream_lines() function, this now adapts the req_perform_stream() with the extra argument round = c("bytes", "lines").

With this version, req_perform_stream() still returns a raw vector like before, only that raw vector may be truncated at its last occurence of the newline character. The leftover bytes are kept for later until the stream is exhausted.

I don't really know how to round at characters, and this would perhaps mean some sort of utf8 parsing which does not seem very useful.

I suppose instead of round = c("bytes", "lines"), this could be a logical argument.

library(httr2)

show_lines <- function(x) {
  writeLines(cli::rule())
  cat(rawToChar(x))
  TRUE
}

resp <- request(example_url()) |>
  req_url_path("/stream/10") |>
  req_perform_stream(show_lines, buffer_kb = 0.5, round = "lines")
#> ────────────────────────────────────────────────────────────────────────────────
#> {"url":"http://127.0.0.1:52200/stream/10","args":{},"headers":{"Host":"127.0.0.1:52200","User-Agent":"httr2/1.0.0.9000 r-curl/5.2.0 libcurl/7.84.0","Accept":"*/*","Accept-Encoding":"deflate, gzip"},"origin":"127.0.0.1","id":0}
#> {"url":"http://127.0.0.1:52200/stream/10","args":{},"headers":{"Host":"127.0.0.1:52200","User-Agent":"httr2/1.0.0.9000 r-curl/5.2.0 libcurl/7.84.0","Accept":"*/*","Accept-Encoding":"deflate, gzip"},"origin":"127.0.0.1","id":1}
#> ────────────────────────────────────────────────────────────────────────────────
#> {"url":"http://127.0.0.1:52200/stream/10","args":{},"headers":{"Host":"127.0.0.1:52200","User-Agent":"httr2/1.0.0.9000 r-curl/5.2.0 libcurl/7.84.0","Accept":"*/*","Accept-Encoding":"deflate, gzip"},"origin":"127.0.0.1","id":2}
#> {"url":"http://127.0.0.1:52200/stream/10","args":{},"headers":{"Host":"127.0.0.1:52200","User-Agent":"httr2/1.0.0.9000 r-curl/5.2.0 libcurl/7.84.0","Accept":"*/*","Accept-Encoding":"deflate, gzip"},"origin":"127.0.0.1","id":3}
#> ────────────────────────────────────────────────────────────────────────────────
#> {"url":"http://127.0.0.1:52200/stream/10","args":{},"headers":{"Host":"127.0.0.1:52200","User-Agent":"httr2/1.0.0.9000 r-curl/5.2.0 libcurl/7.84.0","Accept":"*/*","Accept-Encoding":"deflate, gzip"},"origin":"127.0.0.1","id":4}
#> {"url":"http://127.0.0.1:52200/stream/10","args":{},"headers":{"Host":"127.0.0.1:52200","User-Agent":"httr2/1.0.0.9000 r-curl/5.2.0 libcurl/7.84.0","Accept":"*/*","Accept-Encoding":"deflate, gzip"},"origin":"127.0.0.1","id":5}
#> ────────────────────────────────────────────────────────────────────────────────
#> {"url":"http://127.0.0.1:52200/stream/10","args":{},"headers":{"Host":"127.0.0.1:52200","User-Agent":"httr2/1.0.0.9000 r-curl/5.2.0 libcurl/7.84.0","Accept":"*/*","Accept-Encoding":"deflate, gzip"},"origin":"127.0.0.1","id":6}
#> {"url":"http://127.0.0.1:52200/stream/10","args":{},"headers":{"Host":"127.0.0.1:52200","User-Agent":"httr2/1.0.0.9000 r-curl/5.2.0 libcurl/7.84.0","Accept":"*/*","Accept-Encoding":"deflate, gzip"},"origin":"127.0.0.1","id":7}
#> {"url":"http://127.0.0.1:52200/stream/10","args":{},"headers":{"Host":"127.0.0.1:52200","User-Agent":"httr2/1.0.0.9000 r-curl/5.2.0 libcurl/7.84.0","Accept":"*/*","Accept-Encoding":"deflate, gzip"},"origin":"127.0.0.1","id":8}
#> ────────────────────────────────────────────────────────────────────────────────
#> {"url":"http://127.0.0.1:52200/stream/10","args":{},"headers":{"Host":"127.0.0.1:52200","User-Agent":"httr2/1.0.0.9000 r-curl/5.2.0 libcurl/7.84.0","Accept":"*/*","Accept-Encoding":"deflate, gzip"},"origin":"127.0.0.1","id":9}

Created on 2024-02-11 with reprex v2.1.0

@romainfrancois
Copy link
Contributor Author

This works for the motivating example with the updated (and simpler) PR in mlverse/chattr: mlverse/chattr#65

@hadley
Copy link
Member

hadley commented Feb 11, 2024

If we did characters, I think it would have to mean utf8 characters, so it would be round = "utf8" and use the logic from https://en.wikipedia.org/wiki/UTF-8#Encoding to determine if we're part way through an UTF8 character. But I don't think it's worth it. (Or at least not until someone explicitly asks for it).

I like keeping it as an enum because we might add extra rounding features in the future, and you could maybe even imagine supplying a callback function if we really want to give the user a lot of extra control.

@romainfrancois
Copy link
Contributor Author

I updated it to use round = c("byte", "line") as you hinted before in #437 (comment)

It also makes it easier to document IMO.

@romainfrancois
Copy link
Contributor Author

romainfrancois commented Feb 12, 2024

Some followup 💅. I could do round = "utf8" but I agree it's probably not worth it.

library(httr2)

show_lines <- function(x) {
  cat(rawToChar(x))
  TRUE
}

req <- request(example_url()) |> req_url_path("/stream/3")
resp <- req_perform_stream(req, show_lines, buffer_kb = 0.5, round = "line")
#> {"url":"http://127.0.0.1:64801/stream/3","args":{},"headers":{"Host":"127.0.0.1:64801","User-Agent":"httr2/1.0.0.9000 r-curl/5.2.0 libcurl/7.84.0","Accept":"*/*","Accept-Encoding":"deflate, gzip"},"origin":"127.0.0.1","id":0}
#> {"url":"http://127.0.0.1:64801/stream/3","args":{},"headers":{"Host":"127.0.0.1:64801","User-Agent":"httr2/1.0.0.9000 r-curl/5.2.0 libcurl/7.84.0","Accept":"*/*","Accept-Encoding":"deflate, gzip"},"origin":"127.0.0.1","id":1}
#> {"url":"http://127.0.0.1:64801/stream/3","args":{},"headers":{"Host":"127.0.0.1:64801","User-Agent":"httr2/1.0.0.9000 r-curl/5.2.0 libcurl/7.84.0","Accept":"*/*","Accept-Encoding":"deflate, gzip"},"origin":"127.0.0.1","id":2}
resp <- req_perform_stream(req, show_lines, buffer_kb = 0.5, round = "byte")
#> {"url":"http://127.0.0.1:64801/stream/3","args":{},"headers":{"Host":"127.0.0.1:64801","User-Agent":"httr2/1.0.0.9000 r-curl/5.2.0 libcurl/7.84.0","Accept":"*/*","Accept-Encoding":"deflate, gzip"},"origin":"127.0.0.1","id":0}
#> {"url":"http://127.0.0.1:64801/stream/3","args":{},"headers":{"Host":"127.0.0.1:64801","User-Agent":"httr2/1.0.0.9000 r-curl/5.2.0 libcurl/7.84.0","Accept":"*/*","Accept-Encoding":"deflate, gzip"},"origin":"127.0.0.1","id":1}
#> {"url":"http://127.0.0.1:64801/stream/3","args":{},"headers":{"Host":"127.0.0.1:64801","User-Agent":"httr2/1.0.0.9000 r-curl/5.2.0 libcurl/7.84.0","Accept":"*/*","Accept-Encoding":"deflate, gzip"},"origin":"127.0.0.1","id":2}
resp <- req_perform_stream(req, show_lines, buffer_kb = 0.5, round = \(bytes) length(bytes) / 2)
#> {"url":"http://127.0.0.1:64801/stream/3","args":{},"headers":{"Host":"127.0.0.1:64801","User-Agent":"httr2/1.0.0.9000 r-curl/5.2.0 libcurl/7.84.0","Accept":"*/*","Accept-Encoding":"deflate, gzip"},"origin":"127.0.0.1","id":0}
#> {"url":"http://127.0.0.1:64801/stream/3","args":{},"headers":{"Host":"127.0.0.1:64801","User-Agent":"httr2/1.0.0.9000 r-curl/5.2.0 libcurl/7.84.0","Accept":"*/*","Accept-Encoding":"deflate, gzip"},"origin":"127.0.0.1","id":1}
#> {"url":"http://127.0.0.1:64801/stream/3","args":{},"headers":{"Host":"127.0.0.1:64801","User-Agent":"httr2/1.0.0.9000 r-curl/5.2.0 libcurl/7.84.0","Accept":"*/*","Accept-Encoding":"deflate, gzip"},"origin":"127.0.0.1","id":2}
resp <- req_perform_stream(req, show_lines, buffer_kb = 0.5, round = 2L)
#> Error in `req_perform_stream()`:
#> ! `round` must be "byte", "line" or a function
resp <- req_perform_stream(req, show_lines, buffer_kb = 0.5, round = "banana")
#> Error in `req_perform_stream()`:
#> ! `round` must be one of "byte" or "line", not "banana".

Created on 2024-02-12 with reprex v2.1.0

@romainfrancois romainfrancois changed the title proposal req_perform_stream(round = "lines") req_perform_stream(round = c("byte", "line")) Feb 12, 2024
#' @param buffer_kb Buffer size, in kilobytes.
#' @param round How should the raw vector sent to `callback` be rounded?
#' Choose `"byte"`, `"line"`, or supply your own function that takes a
#' raw vector of `bytes` and returns the locations of possible cut points
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think returning a vector of cut points makes the implementation a bit simpler; see the new implementation of round_lines() below. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for improving the doc. With this reframe, returning the vector of cut points does make more sense, although, the current implementation only cares about the last one.

buffer_kb = 0.1,
round = function(bytes) integer()
)
expect_equal(length(out), 0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this correct, or should it return 1024? i.e. should we always call callback on the final bytes in the buffer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think 0L makes sense here. Should the response be different though in that case ? Is this a 408 ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well the HTTP request succeeded, and returned all it's data, but it never used a cut point we expected. I'm now leaning more towards 0 being wrong, because it's surprising for data to silent vanish if there's not a trailing newline (which must be fairly common since readLines() handles it).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😱 yeah, tweaked it the incomplete logic a bit to accommodate for these cases.

Copy link
Contributor Author

@romainfrancois romainfrancois left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the extra 💅✨

#' @param buffer_kb Buffer size, in kilobytes.
#' @param round How should the raw vector sent to `callback` be rounded?
#' Choose `"byte"`, `"line"`, or supply your own function that takes a
#' raw vector of `bytes` and returns the locations of possible cut points
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for improving the doc. With this reframe, returning the vector of cut points does make more sense, although, the current implementation only cares about the last one.

buffer_kb = 0.1,
round = function(bytes) integer()
)
expect_equal(length(out), 0)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think 0L makes sense here. Should the response be different though in that case ? Is this a 408 ?

# there are leftover bytes, but the stream is complete
# break the loop so that the callback() is given the
# whole buffer
if (!incomplete) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Work it work to move this higher? i.e. make it the other side of the if (incomplete) statement above.

while(continue && isIncomplete(stream) && Sys.time() < stop_time) {
buf <- readBin(stream, raw(), buffer_kb * 1024)
buf <- c(buf, readBin(stream, raw(), buffer_kb * 1024))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This update looks great!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like how it rolls back to the original while(continue && isIncomplete(stream) && Sys.time() < stop_time) loop

@hadley hadley merged commit 8d16c01 into r-lib:main Feb 14, 2024
12 checks passed
@hadley
Copy link
Member

hadley commented Feb 14, 2024

Thanks for working on this!

@romainfrancois romainfrancois deleted the req_perform_stream_lines branch February 15, 2024 05:20
romainfrancois added a commit to romainfrancois/chattr that referenced this pull request Feb 15, 2024
romainfrancois added a commit to tadascience/mistral.ai that referenced this pull request Mar 9, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants