diff --git a/examples/services/03_bigdata-client.ts b/examples/services/03_bigdata-client.ts new file mode 100644 index 00000000..dcba65e9 --- /dev/null +++ b/examples/services/03_bigdata-client.ts @@ -0,0 +1,67 @@ +/* + * Copyright 2023 The NATS Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import { parse } from "https://deno.land/std@0.190.0/flags/mod.ts"; +import { connect, ConnectionOptions, RequestStrategy } from "../../src/mod.ts"; +import { humanizeBytes } from "./03_util.ts"; + +const argv = parse( + Deno.args, + { + alias: { + "s": ["server"], + "a": ["asset"], + "c": ["chunk"], + }, + default: { + s: "127.0.0.1:4222", + c: 0, + a: 1024 * 1024, + }, + string: ["server"], + }, +); + +const copts = { servers: argv.s } as ConnectionOptions; +const nc = await connect(copts); + +const max_chunk = argv.c; +const size = argv.a as number; +console.log(`requesting ${humanizeBytes(size)}`); + +const start = performance.now(); +const iter = await nc.requestMany( + "data", + JSON.stringify({ max_chunk, size }), + { + strategy: RequestStrategy.SentinelMsg, + maxWait: 10000, + }, +); + +let received = 0; +let count = 0; +for await (const m of iter) { + count++; + received += m.data.length; + if (m.data.length === 0) { + // sentinel + count--; + } +} + +console.log(performance.now() - start, "ms"); + +console.log(`received ${count} responses: ${humanizeBytes(received)} bytes`); +await nc.close(); diff --git a/examples/services/03_bigdata.ts b/examples/services/03_bigdata.ts new file mode 100644 index 00000000..c06ce776 --- /dev/null +++ b/examples/services/03_bigdata.ts @@ -0,0 +1,70 @@ +/* + * Copyright 2023 The NATS Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { connect } from "../../src/mod.ts"; +import { DataRequest, humanizeBytes } from "./03_util.ts"; + +const nc = await connect(); +const srv = await nc.services.add({ + name: "big-data", + version: "0.0.1", +}); + +srv.addEndpoint("data", (_err, msg) => { + queueMicrotask(() => { + if (msg.data.length === 0) { + msg.respondError(400, "missing request options"); + return; + } + const max = nc.info?.max_payload ?? 1024 * 1024; + + const r = msg.json(); + + const size = r.size || max; + let chunk = r.max_chunk || max; + chunk = chunk > size ? size : chunk; + + const full = Math.floor(size / chunk); + const partial = size % chunk; + + console.log( + "size of request", + humanizeBytes(size), + "chunk", + humanizeBytes(chunk), + ); + console.log( + "full buffers:", + full, + "partial:", + partial, + "bytes", + "empty:", + 1, + ); + + const payload = new Uint8Array(chunk); + for (let i = 0; i < full; i++) { + msg.respond(payload); + } + if (partial) { + msg.respond(payload.subarray(0, partial)); + } + // sentinel + msg.respond(); + }); +}); + +console.log(srv.info()); diff --git a/examples/services/03_util.ts b/examples/services/03_util.ts new file mode 100644 index 00000000..95a27305 --- /dev/null +++ b/examples/services/03_util.ts @@ -0,0 +1,36 @@ +/* + * Copyright 2023 The NATS Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +export function humanizeBytes(bytes: number, si = false): string { + const base = si ? 1000 : 1024; + const pre = si + ? ["k", "M", "G", "T", "P", "E"] + : ["K", "M", "G", "T", "P", "E"]; + const post = si ? "iB" : "B"; + + if (bytes < base) { + return `${bytes.toFixed(2)} ${post}`; + } + const exp = parseInt(Math.log(bytes) / Math.log(base) + ""); + const index = parseInt((exp - 1) + ""); + return `${(bytes / Math.pow(base, exp)).toFixed(2)} ${pre[index]}${post}`; +} + +export type DataRequest = { + // generate data that is this size + size: number; + // send using multiple messages - note server will reject if too big + max_chunk?: number; +}; diff --git a/nats-base-client/mod.ts b/nats-base-client/mod.ts index e51d00fe..fd6a7077 100644 --- a/nats-base-client/mod.ts +++ b/nats-base-client/mod.ts @@ -20,6 +20,7 @@ export { nkeys, Nuid, nuid, + RequestStrategy, ServiceError, ServiceErrorCodeHeader, ServiceErrorHeader,