Skip to content

Commit

Permalink
[deno] changed socket read buffers 128K
Browse files Browse the repository at this point in the history
[deno] removed buffer writer on connection, and instead changed to writeAll()
  • Loading branch information
aricart committed Jun 1, 2023
1 parent a0d00a5 commit 745306d
Showing 1 changed file with 5 additions and 17 deletions.
22 changes: 5 additions & 17 deletions src/deno_transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* limitations under the License.
*/

import { BufWriter } from "https://deno.land/std@0.177.0/io/mod.ts";
import { writeAll } from "https://deno.land/std@0.177.0/streams/write_all.ts";
import {
Deferred,
deferred,
Expand All @@ -39,15 +39,7 @@ import {
const VERSION = "1.15.0";
const LANG = "nats.deno";

// if trying to simply write to the connection for some reason
// messages are dropped - deno websocket implementation does this.
export async function write(
frame: Uint8Array,
writer: BufWriter,
): Promise<void> {
await writer.write(frame);
await writer.flush();
}
const ReadBufferSize = 1024 * 128;

export class DenoTransport implements Transport {
version: string = VERSION;
Expand All @@ -60,7 +52,6 @@ export class DenoTransport implements Transport {
private closedNotification: Deferred<void | Error> = deferred();
// @ts-ignore: Deno 1.9.0 broke compatibility by adding generics to this
private conn!: Conn<NetAddr>;
private writer!: BufWriter;

// the async writes to the socket do not guarantee
// the order of the writes - this leads to interleaving
Expand All @@ -71,7 +62,7 @@ export class DenoTransport implements Transport {
}> = [];

constructor() {
this.buf = new Uint8Array(1024 * 8);
this.buf = new Uint8Array(ReadBufferSize);
}

async connect(
Expand All @@ -88,8 +79,6 @@ export class DenoTransport implements Transport {
if (tlsRequired || desired) {
const tlsn = hp.tlsName ? hp.tlsName : hp.hostname;
await this.startTLS(tlsn);
} else {
this.writer = new BufWriter(this.conn);
}
} catch (err) {
this.conn?.close();
Expand Down Expand Up @@ -162,7 +151,6 @@ export class DenoTransport implements Transport {
// to identify this as the error, we force it
await this.conn.write(Empty);
this.encrypted = true;
this.writer = new BufWriter(this.conn);
}

async *[Symbol.asyncIterator](): AsyncIterableIterator<Uint8Array> {
Expand All @@ -172,7 +160,7 @@ export class DenoTransport implements Transport {

while (!this.done) {
try {
this.buf = new Uint8Array(64 * 1024);
this.buf = new Uint8Array(ReadBufferSize);
const c = await this.conn.read(this.buf);
if (c === null) {
break;
Expand Down Expand Up @@ -209,7 +197,7 @@ export class DenoTransport implements Transport {
if (!entry) return;
if (this.done) return;
const { frame, d } = entry;
write(frame, this.writer)
writeAll(this.conn, frame)
.then(() => {
if (this.options.debug) {
console.info(`< ${render(frame)}`);
Expand Down

0 comments on commit 745306d

Please sign in to comment.