From 071609f8b1796b611a5bbf708ed85a6c4fd49466 Mon Sep 17 00:00:00 2001 From: tsctx <91457664+tsctx@users.noreply.github.com> Date: Sun, 3 Mar 2024 19:09:54 +0900 Subject: [PATCH] feat: add sending data benchmark (#2905) * feat: add post benchmark * fixup * apply suggestions from code review * apply suggestions from code review --- .github/workflows/bench.yml | 43 ++++ benchmarks/benchmark.js | 19 +- benchmarks/package.json | 5 +- benchmarks/post-benchmark.js | 422 +++++++++++++++++++++++++++++++++++ 4 files changed, 478 insertions(+), 11 deletions(-) create mode 100644 benchmarks/post-benchmark.js diff --git a/.github/workflows/bench.yml b/.github/workflows/bench.yml index 5afcbddd807..112a98b6e72 100644 --- a/.github/workflows/bench.yml +++ b/.github/workflows/bench.yml @@ -49,3 +49,46 @@ jobs: - name: Run Benchmark run: npm run bench working-directory: ./benchmarks + + benchmark_post_current: + name: benchmark (sending data) current + runs-on: ubuntu-latest + steps: + - name: Checkout Code + uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1 + with: + persist-credentials: false + ref: ${{ github.base_ref }} + - name: Setup Node + uses: actions/setup-node@60edb5dd545a775178f52524783378180af0d1f8 # v4.0.2 + with: + node-version: lts/* + - name: Install Modules for undici + run: npm i --ignore-scripts --omit=dev + - name: Install Modules for Benchmarks + run: npm i + working-directory: ./benchmarks + - name: Run Benchmark + run: npm run bench-post + working-directory: ./benchmarks + + benchmark_post_branch: + name: benchmark (sending data) branch + runs-on: ubuntu-latest + steps: + - name: Checkout Code + uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1 + with: + persist-credentials: false + - name: Setup Node + uses: actions/setup-node@60edb5dd545a775178f52524783378180af0d1f8 # v4.0.2 + with: + node-version: lts/* + - name: Install Modules for undici + run: npm i --ignore-scripts --omit=dev + - name: Install Modules for Benchmarks + run: npm i + working-directory: ./benchmarks + - name: Run Benchmark + run: npm run bench-post + working-directory: ./benchmarks diff --git a/benchmarks/benchmark.js b/benchmarks/benchmark.js index 8775611138d..c48c00926a1 100644 --- a/benchmarks/benchmark.js +++ b/benchmarks/benchmark.js @@ -34,22 +34,16 @@ if (process.env.PORT) { dest.socketPath = path.join(os.tmpdir(), 'undici.sock') } +/** @type {http.RequestOptions} */ const httpBaseOptions = { protocol: 'http:', hostname: 'localhost', method: 'GET', path: '/', - query: { - frappucino: 'muffin', - goat: 'scone', - pond: 'moose', - foo: ['bar', 'baz', 'bal'], - bool: true, - numberKey: 256 - }, ...dest } +/** @type {http.RequestOptions} */ const httpNoKeepAliveOptions = { ...httpBaseOptions, agent: new http.Agent({ @@ -58,6 +52,7 @@ const httpNoKeepAliveOptions = { }) } +/** @type {http.RequestOptions} */ const httpKeepAliveOptions = { ...httpBaseOptions, agent: new http.Agent({ @@ -142,7 +137,11 @@ class SimpleRequest { } function makeParallelRequests (cb) { - return Promise.all(Array.from(Array(parallelRequests)).map(() => new Promise(cb))) + const promises = new Array(parallelRequests) + for (let i = 0; i < parallelRequests; ++i) { + promises[i] = new Promise(cb) + } + return Promise.all(promises) } function printResults (results) { @@ -303,7 +302,7 @@ if (process.env.PORT) { experiments.got = () => { return makeParallelRequests(resolve => { - got.get(dest.url, null, { http: gotAgent }).then(res => { + got.get(dest.url, { agent: { http: gotAgent } }).then(res => { res.pipe(new Writable({ write (chunk, encoding, callback) { callback() diff --git a/benchmarks/package.json b/benchmarks/package.json index 5781ae37478..a834377135f 100644 --- a/benchmarks/package.json +++ b/benchmarks/package.json @@ -2,9 +2,12 @@ "name": "benchmarks", "scripts": { "bench": "PORT=3042 concurrently -k -s first npm:bench:server npm:bench:run", + "bench-post": "PORT=3042 concurrently -k -s first npm:bench:server npm:bench-post:run", "bench:server": "node ./server.js", "prebench:run": "node ./wait.js", - "bench:run": "SAMPLES=100 CONNECTIONS=50 node ./benchmark.js" + "bench:run": "SAMPLES=100 CONNECTIONS=50 node ./benchmark.js", + "prebench-post:run": "node ./wait.js", + "bench-post:run": "SAMPLES=100 CONNECTIONS=50 node ./post-benchmark.js" }, "dependencies": { "axios": "^1.6.7", diff --git a/benchmarks/post-benchmark.js b/benchmarks/post-benchmark.js new file mode 100644 index 00000000000..829fc389476 --- /dev/null +++ b/benchmarks/post-benchmark.js @@ -0,0 +1,422 @@ +'use strict' + +const http = require('node:http') +const os = require('node:os') +const path = require('node:path') +const { Writable } = require('node:stream') +const { isMainThread } = require('node:worker_threads') + +const { Pool, Client, fetch, Agent, setGlobalDispatcher } = require('..') + +let nodeFetch +const axios = require('axios') +let superagent +let got + +const util = require('node:util') +const _request = require('request') +const request = util.promisify(_request) + +const iterations = (parseInt(process.env.SAMPLES, 10) || 10) + 1 +const errorThreshold = parseInt(process.env.ERROR_THRESHOLD, 10) || 3 +const connections = parseInt(process.env.CONNECTIONS, 10) || 50 +const pipelining = parseInt(process.env.PIPELINING, 10) || 10 +const parallelRequests = parseInt(process.env.PARALLEL, 10) || 100 +const headersTimeout = parseInt(process.env.HEADERS_TIMEOUT, 10) || 0 +const bodyTimeout = parseInt(process.env.BODY_TIMEOUT, 10) || 0 +const dest = {} + +const data = '_'.repeat(128 * 1024) +const dataLength = `${Buffer.byteLength(data)}` + +if (process.env.PORT) { + dest.port = process.env.PORT + dest.url = `http://localhost:${process.env.PORT}` +} else { + dest.url = 'http://localhost' + dest.socketPath = path.join(os.tmpdir(), 'undici.sock') +} + +const headers = { + 'Content-Type': 'text/plain; charset=UTF-8', + 'Content-Length': dataLength +} + +/** @type {http.RequestOptions} */ +const httpBaseOptions = { + protocol: 'http:', + hostname: 'localhost', + method: 'POST', + path: '/', + headers, + ...dest +} + +/** @type {http.RequestOptions} */ +const httpNoKeepAliveOptions = { + ...httpBaseOptions, + agent: new http.Agent({ + keepAlive: false, + maxSockets: connections + }) +} + +/** @type {http.RequestOptions} */ +const httpKeepAliveOptions = { + ...httpBaseOptions, + agent: new http.Agent({ + keepAlive: true, + maxSockets: connections + }) +} + +const axiosAgent = new http.Agent({ + keepAlive: true, + maxSockets: connections +}) + +const fetchAgent = new http.Agent({ + keepAlive: true, + maxSockets: connections +}) + +const gotAgent = new http.Agent({ + keepAlive: true, + maxSockets: connections +}) + +const requestAgent = new http.Agent({ + keepAlive: true, + maxSockets: connections +}) + +const superagentAgent = new http.Agent({ + keepAlive: true, + maxSockets: connections +}) + +/** @type {import("..").Dispatcher.DispatchOptions} */ +const undiciOptions = { + path: '/', + method: 'POST', + headersTimeout, + bodyTimeout, + body: data, + headers +} + +const Class = connections > 1 ? Pool : Client +const dispatcher = new Class(httpBaseOptions.url, { + pipelining, + connections, + ...dest +}) + +setGlobalDispatcher(new Agent({ + pipelining, + connections, + connect: { + rejectUnauthorized: false + } +})) + +class SimpleRequest { + constructor (resolve) { + this.dst = new Writable({ + write (chunk, encoding, callback) { + callback() + } + }).on('finish', resolve) + } + + onConnect (abort) { } + + onHeaders (statusCode, headers, resume) { + this.dst.on('drain', resume) + } + + onData (chunk) { + return this.dst.write(chunk) + } + + onComplete () { + this.dst.end() + } + + onError (err) { + throw err + } +} + +function makeParallelRequests (cb) { + const promises = new Array(parallelRequests) + for (let i = 0; i < parallelRequests; ++i) { + promises[i] = new Promise(cb) + } + return Promise.all(promises) +} + +function printResults (results) { + // Sort results by least performant first, then compare relative performances and also printing padding + let last + + const rows = Object.entries(results) + // If any failed, put on the top of the list, otherwise order by mean, ascending + .sort((a, b) => (!a[1].success ? -1 : b[1].mean - a[1].mean)) + .map(([name, result]) => { + if (!result.success) { + return { + Tests: name, + Samples: result.size, + Result: 'Errored', + Tolerance: 'N/A', + 'Difference with Slowest': 'N/A' + } + } + + // Calculate throughput and relative performance + const { size, mean, standardError } = result + const relative = last !== 0 ? (last / mean - 1) * 100 : 0 + + // Save the slowest for relative comparison + if (typeof last === 'undefined') { + last = mean + } + + return { + Tests: name, + Samples: size, + Result: `${((parallelRequests * 1e9) / mean).toFixed(2)} req/sec`, + Tolerance: `± ${((standardError / mean) * 100).toFixed(2)} %`, + 'Difference with slowest': relative > 0 ? `+ ${relative.toFixed(2)} %` : '-' + } + }) + + return console.table(rows) +} + +const experiments = { + 'http - no keepalive' () { + return makeParallelRequests(resolve => { + const request = http.request(httpNoKeepAliveOptions, res => { + res + .pipe( + new Writable({ + write (chunk, encoding, callback) { + callback() + } + }) + ) + .on('finish', resolve) + }) + request.end(data) + }) + }, + 'http - keepalive' () { + return makeParallelRequests(resolve => { + const request = http.request(httpKeepAliveOptions, res => { + res + .pipe( + new Writable({ + write (chunk, encoding, callback) { + callback() + } + }) + ) + .on('finish', resolve) + }) + request.end(data) + }) + }, + 'undici - pipeline' () { + return makeParallelRequests(resolve => { + dispatcher + .pipeline(undiciOptions, data => { + return data.body + }) + .end() + .pipe( + new Writable({ + write (chunk, encoding, callback) { + callback() + } + }) + ) + .on('finish', resolve) + }) + }, + 'undici - request' () { + return makeParallelRequests(resolve => { + dispatcher.request(undiciOptions).then(({ body }) => { + body + .pipe( + new Writable({ + write (chunk, encoding, callback) { + callback() + } + }) + ) + .on('finish', resolve) + }) + }) + }, + 'undici - stream' () { + return makeParallelRequests(resolve => { + return dispatcher + .stream(undiciOptions, () => { + return new Writable({ + write (chunk, encoding, callback) { + callback() + } + }) + }) + .then(resolve) + }) + }, + 'undici - dispatch' () { + return makeParallelRequests(resolve => { + dispatcher.dispatch(undiciOptions, new SimpleRequest(resolve)) + }) + } +} + +if (process.env.PORT) { + /** @type {RequestInit} */ + const fetchOptions = { + method: 'POST', + body: data, + headers + } + // fetch does not support the socket + experiments['undici - fetch'] = () => { + return makeParallelRequests(resolve => { + fetch(dest.url, fetchOptions).then(res => { + res.body.pipeTo(new WritableStream({ write () { }, close () { resolve() } })) + }).catch(console.log) + }) + } + + const nodeFetchOptions = { + ...fetchOptions, + agent: fetchAgent + } + experiments['node-fetch'] = () => { + return makeParallelRequests(resolve => { + nodeFetch(dest.url, nodeFetchOptions).then(res => { + res.body.pipe(new Writable({ + write (chunk, encoding, callback) { + callback() + } + })).on('finish', resolve) + }).catch(console.log) + }) + } + + const axiosOptions = { + url: dest.url, + method: 'POST', + headers, + responseType: 'stream', + httpAgent: axiosAgent, + data + } + experiments.axios = () => { + return makeParallelRequests(resolve => { + axios.request(axiosOptions).then(res => { + res.data.pipe(new Writable({ + write (chunk, encoding, callback) { + callback() + } + })).on('finish', resolve) + }).catch(console.log) + }) + } + + const gotOptions = { + method: 'POST', + headers, + agent: { + http: gotAgent + }, + body: data + } + experiments.got = () => { + return makeParallelRequests(resolve => { + got(dest.url, gotOptions).then(res => { + res.pipe(new Writable({ + write (chunk, encoding, callback) { + callback() + } + })).on('finish', resolve) + }).catch(console.log) + }) + } + + const requestOptions = { + url: dest.url, + method: 'POST', + headers, + agent: requestAgent, + data + } + experiments.request = () => { + return makeParallelRequests(resolve => { + request(requestOptions).then(res => { + res.pipe(new Writable({ + write (chunk, encoding, callback) { + callback() + } + })).on('finish', resolve) + }).catch(console.log) + }) + } + + experiments.superagent = () => { + return makeParallelRequests(resolve => { + superagent + .post(dest.url) + .send(data) + .set('Content-Type', 'text/plain; charset=UTF-8') + .set('Content-Length', dataLength) + .pipe(new Writable({ + write (chunk, encoding, callback) { + callback() + } + })).on('finish', resolve) + }) + } +} + +async function main () { + const { cronometro } = await import('cronometro') + const _nodeFetch = await import('node-fetch') + nodeFetch = _nodeFetch.default + const _got = await import('got') + got = _got.default + const _superagent = await import('superagent') + // https://github.com/ladjs/superagent/issues/1540#issue-561464561 + superagent = _superagent.agent().use((req) => req.agent(superagentAgent)) + + cronometro( + experiments, + { + iterations, + errorThreshold, + print: false + }, + (err, results) => { + if (err) { + throw err + } + + printResults(results) + dispatcher.destroy() + } + ) +} + +if (isMainThread) { + main() +} else { + module.exports = main +}