Skip to content

Commit

Permalink
prototype
Browse files Browse the repository at this point in the history
  • Loading branch information
aricart committed Jun 5, 2023
1 parent a0d00a5 commit c9221b0
Showing 1 changed file with 25 additions and 37 deletions.
62 changes: 25 additions & 37 deletions src/deno_transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@
* limitations under the License.
*/

import { BufWriter } from "https://deno.land/std@0.177.0/io/mod.ts";
import {
Deferred,
deferred,
TlsOptions,
} from "../nats-base-client/internal_mod.ts";
import Conn = Deno.Conn;
import { writeAll } from "https://deno.land/std@0.190.0/streams/write_all.ts";
import {
checkOptions,
checkUnsupportedOption,
Expand All @@ -39,15 +39,7 @@ import {
const VERSION = "1.15.0";
const LANG = "nats.deno";

// if trying to simply write to the connection for some reason
// messages are dropped - deno websocket implementation does this.
export async function write(
frame: Uint8Array,
writer: BufWriter,
): Promise<void> {
await writer.write(frame);
await writer.flush();
}
const ReadBufferSize = 1024 * 128;

export class DenoTransport implements Transport {
version: string = VERSION;
Expand All @@ -60,18 +52,13 @@ export class DenoTransport implements Transport {
private closedNotification: Deferred<void | Error> = deferred();
// @ts-ignore: Deno 1.9.0 broke compatibility by adding generics to this
private conn!: Conn<NetAddr>;
private writer!: BufWriter;

// the async writes to the socket do not guarantee
// the order of the writes - this leads to interleaving
// which results in protocol errors on the server
private sendQueue: Array<{
frame: Uint8Array;
d: Deferred<void>;
}> = [];

private out: DataBuffer;
private outProm: Deferred<void>;
constructor() {
this.buf = new Uint8Array(1024 * 8);
this.buf = new Uint8Array(ReadBufferSize);
this.out = new DataBuffer();
this.outProm = deferred();
}

async connect(
Expand All @@ -88,8 +75,6 @@ export class DenoTransport implements Transport {
if (tlsRequired || desired) {
const tlsn = hp.tlsName ? hp.tlsName : hp.hostname;
await this.startTLS(tlsn);
} else {
this.writer = new BufWriter(this.conn);
}
} catch (err) {
this.conn?.close();
Expand Down Expand Up @@ -162,7 +147,6 @@ export class DenoTransport implements Transport {
// to identify this as the error, we force it
await this.conn.write(Empty);
this.encrypted = true;
this.writer = new BufWriter(this.conn);
}

async *[Symbol.asyncIterator](): AsyncIterableIterator<Uint8Array> {
Expand All @@ -172,7 +156,7 @@ export class DenoTransport implements Transport {

while (!this.done) {
try {
this.buf = new Uint8Array(64 * 1024);
this.buf = new Uint8Array(ReadBufferSize);
const c = await this.conn.read(this.buf);
if (c === null) {
break;
Expand All @@ -196,34 +180,38 @@ export class DenoTransport implements Transport {
if (this.done) {
return Promise.resolve();
}
const d = deferred<void>();
this.sendQueue.push({ frame, d });
if (this.sendQueue.length === 1) {
const start = this.out.length();
this.out.fill(frame);
const d = this.outProm;
if (start === 0) {
this.dequeue();
}
return d;
}

private dequeue(): void {
const [entry] = this.sendQueue;
if (!entry) return;
if (this.done) return;
const { frame, d } = entry;
write(frame, this.writer)
if (this.out.length() === 0) {
return;
}
const data = this.out.drain();
this.out.reset();
const p = this.outProm;
this.outProm = deferred();

writeAll(this.conn, data)
.then(() => {
if (this.options.debug) {
console.info(`< ${render(frame)}`);
console.info(`< ${render(data)}`);
}
d.resolve();
p.resolve();
})
.catch((err) => {
if (this.options.debug) {
console.error(`!!! ${render(frame)}: ${err}`);
console.error(`!!! ${render(data)}: ${err}`);
}
d.reject(err);
p.reject(err);
})
.finally(() => {
this.sendQueue.shift();
this.dequeue();
});
}
Expand Down

0 comments on commit c9221b0

Please sign in to comment.