-
Notifications
You must be signed in to change notification settings - Fork 502
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feature: add onBodyChunkSent callback to dispatch #847
feature: add onBodyChunkSent callback to dispatch #847
Conversation
Codecov Report
@@ Coverage Diff @@
## main #847 +/- ##
===========================================
- Coverage 100.00% 99.90% -0.10%
===========================================
Files 26 26
Lines 2106 2116 +10
===========================================
+ Hits 2106 2114 +8
- Misses 0 2 +2
Continue to review full report at Codecov.
|
@szmarczak ptal, would this do? |
This comment has been minimized.
This comment has been minimized.
docs/api/Dispatcher.md
Outdated
@@ -207,6 +207,7 @@ Returns: `void` | |||
* **onHeaders** `(statusCode: number, headers: Buffer[], resume: () => void) => boolean` - Invoked when statusCode and headers have been received. May be invoked multiple times due to 1xx informational headers. Not required for `upgrade` requests. | |||
* **onData** `(chunk: Buffer) => boolean` - Invoked when response payload data is received. Not required for `upgrade` requests. | |||
* **onComplete** `(trailers: Buffer[]) => void` - Invoked when response payload and trailers have been received and the request has completed. Not required for `upgrade` requests. | |||
* **onBodyChunkSent** `(chunkSize: number, totalSent: number) => void` - Invoked when a body chunk is sent to the server. Not required. For a stream body this will be invoked for every chunk. For other body types, it will be invoked once after the body is sent. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For other body types, it will be invoked once after the body is sent.
So this doesn't exactly fix the issue. It will be called only once when passing a Buffer...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because we pass through the buffer to the inner layers without doing any splitting ourselves.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, right, it's okay :)
lib/client.js
Outdated
@@ -1397,7 +1397,10 @@ function write (client, request) { | |||
|
|||
socket.cork() | |||
socket.write(`${header}content-length: ${contentLength}\r\n\r\n`, 'ascii') | |||
socket.write(body) | |||
const endCallback = request.onBodyChunkSent | |||
? request.onBodyChunkSent.bind(request, contentLength, contentLength) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is bind
really necessary in this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could create an arrow function instead, if it makes a difference.
Also, I think that I still need to update d.ts files as well. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
(we shuld do a round of benchmark before/after)
wdyt @ronag? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't actually solve #820 though... it's not really needed if you are passing a stream as body and if passing a buffer it has the same problem. You would have to manually split the buffer into smaller writes for this to solve the referenced issue. So I don't think this actually solves anything...
I would prefer something like: onBodySent(chunk: Buffer) |
Yes, it doesn't solve #820. I think that this feature would probably be more useful once iterator (async or regular) bodies are supported, and once they're supported I assume that creating chunks using an iterator would be more performant than with a stream. |
What does it solve?
Not sure I follow. |
It solves getting a notification when a chunk was uploaded without adding a listener to the stream, and if/when async iterators are added this would be more useful (because you can't "simply" add a listener to the iterator).
In the original issue, the main complaint was that creating chunks using a stream would cause performance/memory issues. I assumed that yielding chunks of a buffer from an iterator instead of a stream would be more performant, as it doesn't have all of the stream machinery (I'm not saying that you can't do |
I don't see adding a listener to a stream as a problem.
I don't think the difference is of significance. The only think that makes sense here IMO is that if onBodyChunkSent is set then we split the buffer ourselves internally. But I don't think having 10MB+ buffers in memory is a good idea in general. @szmarczak what are we actually trying to solve here? |
I don't think so. Even if we use a stream to feed data to dispatch, there is no way to know if a given chunk was written, only if it was fed to the socket. Not sure if it makes a difference. |
No, but if we wait for |
But yea, it kind of goes back to what are we actually solving here? |
Sorry I haven't explained this more extensively. I though that
Correct. But some people do this.
I think what @Linkgoron was trying to say that you don't need to initialize the usual stream stuff when using generators.
Yes, although we would have to remember the size of all the queued chunks. This PR makes getting upload progress much easier. We don't have to manually provide the callbacks for |
be85408
to
ea739b2
Compare
There is not much we can help with :(. If they want progress updates on that, the user would need to segment the buffer and write it as a stream. We could potentially do that ourselves but it's a different problem than the one addressed in this PR. |
@szmarczak It's quite simple to segment the buffer without using too much memory. I assume that something like the following would work? function* chunkify(buffer, chunkSize = 10000) {
for (let pos = 0; pos < buffer.byteLength; pos += chunkSize) {
yield buffer.subarray(pos, pos + chunkSize)
}
} Then, either use |
I think that could work. Will check this on Sunday. |
Looking at the CI benchmarks, they seem way too different between this branch and |
I've checked the benchmarks on my dedicated server and there is no regressions. @szmarczak have you had a chance to look at this? |
Sorry to be difficult but I'm still unsure how this is significantly better than just attaching a |
I've run benchmarks on my PC and got these:
this branch:
There is negligible difference. chunkify benchmarksdiff --git a/benchmarks/server.js b/benchmarks/server.js
index 2690be7..3f04053 100644
--- a/benchmarks/server.js
+++ b/benchmarks/server.js
@@ -24,9 +24,9 @@ if (cluster.isPrimary) {
}
} else {
const server = createServer((req, res) => {
- setTimeout(function () {
+ req.resume().on('end', () => {
res.end('hello world')
- }, timeout)
+ });
}).listen(port)
server.keepAliveTimeout = 600e3
} diff --git a/lib/core/request.js b/lib/core/request.js
index 928f4a7..5d97e43 100644
--- a/lib/core/request.js
+++ b/lib/core/request.js
@@ -9,6 +9,12 @@ const assert = require('assert')
const kHandler = Symbol('handler')
+function* chunkify(buffer, chunkSize = 2048) { // 2KB
+ for (let pos = 0; pos < buffer.byteLength; pos += chunkSize) {
+ yield buffer.subarray(pos, pos + chunkSize)
+ }
+}
+
class Request {
constructor ({
path,
@@ -53,9 +59,9 @@ class Request {
} else if (util.isReadable(body)) {
this.body = body
} else if (util.isBuffer(body)) {
- this.body = body.length ? body : null
+ this.body = body.length ? chunkify(body) : null
} else if (typeof body === 'string') {
- this.body = body.length ? Buffer.from(body) : null
+ this.body = body.length ? chunkify(Buffer.from(body)) : null
} else if (util.isIterable(body)) {
this.body = body
} else { // benchmark.js
undiciOptions.body = Buffer.alloc(1024 * 1024 * 10) // 10MB
cronometro(
{
'chunkify' () {
return makeParallelRequests(resolve => {
dispatcher.dispatch(undiciOptions, new SimpleRequest(resolve))
})
}
},
{
iterations,
errorThreshold,
print: false
},
(err, results) => {
if (err) {
throw err
}
console.log(printResults(results))
dispatcher.destroy()
}
) // │ Tests │ Samples │ Result │ Tolerance │ Difference with slowest │
// |─────────────|─────────|────────────────|───────────|─────────────────────────|
// │ no chunkify │ 101 │ 101.67 req/sec │ ± 3.07 % │ - │
// │ Tests │ Samples │ Result │ Tolerance │ Difference with slowest │
// |──────────|─────────|───────────────|───────────|─────────────────────────|
// │ chunkify │ 10 │ 24.49 req/sec │ ± 0.12 % │ - │ After switching from 2KB to 16KB chunks:
It's still a 25% performance drop. |
There's only 5% performance drop when using 10MB body and 64KB chunks:
1MB body and 64KB chunks:
100KB chunks:
100KB chunks and 1KB body:
no chunkify:
|
Given these numbers, there's almost no difference if the chunks are 64KB or 100KB. It's still 5% performance drop. So I think it would be best to include |
How about just adding an example in the examples folder? |
Sounds good. |
For iterator bodies you'd have to wrap them in a stream (or build a wrapper). Regarding the |
Yes, I just don't see how/when that would matter? |
I don't think this would matter. Can we get this merged? This way we don't have to wrap some bodies into streams. |
7cdb7ff
to
e506c31
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm -0 with this after comments are resolved.
lib/client.js
Outdated
@@ -1490,8 +1493,11 @@ function writeStream ({ client, request, socket, contentLength, header, expectsP | |||
} | |||
|
|||
bytesWritten += len | |||
const endCallback = request.onBodyChunkSent | |||
? () => request.onBodyChunkSent(chunk) | |||
: undefined |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's probably better and faster to just call onBodyChunkSent
after/before write. Using the callback doesn't really do that much, it's no guarantee that it was sent, just that it's written to the kernels buffer.
lib/core/request.js
Outdated
} catch (err) { | ||
this.onError(err) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to lazyily do this. Just put a onBodySent
method on the class.
docs/api/Dispatcher.md
Outdated
@@ -203,6 +203,7 @@ Returns: `void` | |||
* **onHeaders** `(statusCode: number, headers: Buffer[], resume: () => void) => boolean` - Invoked when statusCode and headers have been received. May be invoked multiple times due to 1xx informational headers. Not required for `upgrade` requests. | |||
* **onData** `(chunk: Buffer) => boolean` - Invoked when response payload data is received. Not required for `upgrade` requests. | |||
* **onComplete** `(trailers: Buffer[]) => void` - Invoked when response payload and trailers have been received and the request has completed. Not required for `upgrade` requests. | |||
* **onBodyChunkSent** `(chunk: string | Buffer | Uint8Array) => void` - Invoked when a body chunk is sent to the server. Not required. For a stream or iterable body this will be invoked for every chunk. For other body types, it will be invoked once after the body is sent. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* **onBodyChunkSent** `(chunk: string | Buffer | Uint8Array) => void` - Invoked when a body chunk is sent to the server. Not required. For a stream or iterable body this will be invoked for every chunk. For other body types, it will be invoked once after the body is sent. | |
* **onBodySent** `(chunk: string | Buffer | Uint8Array) => void` - Invoked when a body chunk is sent to the server. Not required. For a stream or iterable body this will be invoked for every chunk. For other body types, it will be invoked once after the body is sent. |
Should we rename/alias |
Eventually |
I'm okay with |
e506c31
to
de80264
Compare
I've pushed the naming changes and the behaviour changes. Note that now the callback is called after |
Add a callback that's called when a body chunk is sent to the server to
dispatch
.The callback is invoked only after
stream.write
calls the end callback (I wasn't sure if it should be called synchronously with the call towrite
or not).For stream bodies, the callback will get invoked after every chunk is written, and for other types (buffer etc.) it will be invoked once after the whole body is sent.
refs: #820