diff --git a/src/deno_transport.ts b/src/deno_transport.ts index 769783fb..3cb0e4f8 100644 --- a/src/deno_transport.ts +++ b/src/deno_transport.ts @@ -13,7 +13,7 @@ * limitations under the License. */ -import { BufWriter } from "https://deno.land/std@0.177.0/io/mod.ts"; +import { writeAll } from "https://deno.land/std@0.177.0/streams/write_all.ts"; import { Deferred, deferred, @@ -39,15 +39,7 @@ import { const VERSION = "1.15.0"; const LANG = "nats.deno"; -// if trying to simply write to the connection for some reason -// messages are dropped - deno websocket implementation does this. -export async function write( - frame: Uint8Array, - writer: BufWriter, -): Promise { - await writer.write(frame); - await writer.flush(); -} +const ReadBufferSize = 1024 * 128; export class DenoTransport implements Transport { version: string = VERSION; @@ -60,7 +52,6 @@ export class DenoTransport implements Transport { private closedNotification: Deferred = deferred(); // @ts-ignore: Deno 1.9.0 broke compatibility by adding generics to this private conn!: Conn; - private writer!: BufWriter; // the async writes to the socket do not guarantee // the order of the writes - this leads to interleaving @@ -71,7 +62,7 @@ export class DenoTransport implements Transport { }> = []; constructor() { - this.buf = new Uint8Array(1024 * 8); + this.buf = new Uint8Array(ReadBufferSize); } async connect( @@ -88,8 +79,6 @@ export class DenoTransport implements Transport { if (tlsRequired || desired) { const tlsn = hp.tlsName ? hp.tlsName : hp.hostname; await this.startTLS(tlsn); - } else { - this.writer = new BufWriter(this.conn); } } catch (err) { this.conn?.close(); @@ -162,7 +151,6 @@ export class DenoTransport implements Transport { // to identify this as the error, we force it await this.conn.write(Empty); this.encrypted = true; - this.writer = new BufWriter(this.conn); } async *[Symbol.asyncIterator](): AsyncIterableIterator { @@ -172,7 +160,7 @@ export class DenoTransport implements Transport { while (!this.done) { try { - this.buf = new Uint8Array(64 * 1024); + this.buf = new Uint8Array(ReadBufferSize); const c = await this.conn.read(this.buf); if (c === null) { break; @@ -209,7 +197,7 @@ export class DenoTransport implements Transport { if (!entry) return; if (this.done) return; const { frame, d } = entry; - write(frame, this.writer) + writeAll(this.conn, frame) .then(() => { if (this.options.debug) { console.info(`< ${render(frame)}`);