Skip to content

Commit

Permalink
[FEAT] added support for consuming objectstore feat from nbc (#159)
Browse files Browse the repository at this point in the history
  • Loading branch information
aricart committed Aug 12, 2022
1 parent bb4d848 commit 86ebcb8
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 8 deletions.
18 changes: 17 additions & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@
"nyc": "^15.1.0",
"shx": "^0.3.3",
"tslint": "^6.1.3",
"typescript": "^4.7.4"
"typescript": "^4.7.4",
"web-streams-polyfill": "^3.2.1"
},
"ava": {
"failFast": true,
Expand Down
7 changes: 5 additions & 2 deletions src/ws_transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const LANG = "nats.ws";
export type WsSocketFactory = (u: string, opts: ConnectionOptions) => Promise<{
socket: WebSocket;
encrypted: boolean;
}>
}>;
interface WsConnectionOptions extends ConnectionOptions {
wsFactory?: WsSocketFactory;
}
Expand Down Expand Up @@ -89,7 +89,10 @@ export class WsTransport implements Transport {
this.options = options;
const u = server.src;
if (options.wsFactory) {
const { socket, encrypted } = await options.wsFactory(server.src, options);
const { socket, encrypted } = await options.wsFactory(
server.src,
options,
);
this.socket = socket;
this.encrypted = encrypted;
} else {
Expand Down
89 changes: 86 additions & 3 deletions test/jetstream.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
*/
const test = require("ava");
const { delay } = require("../lib/nats-base-client/internal_mod");
const { connect, Empty, consumerOpts, AckPolicy, headers } = require(
"./index",
);
const { connect, Empty, consumerOpts, AckPolicy, headers, StringCodec } =
require(
"./index",
);
const { NatsServer, wsConfig } = require("./helpers/launcher");
const { jetstreamServerConf } = require("./helpers/jsutil");
const { DataBuffer } = require("../lib/nats-base-client/databuffer");

test("jetstream - jsm", async (t) => {
const ns = await NatsServer.start(jetstreamServerConf(wsConfig()));
Expand Down Expand Up @@ -288,3 +290,84 @@ test("jetstream - jetstream pullsub", async (t) => {
await nc.close();
await ns.stop();
});

test("jetstream - kv basics", async (t) => {
const ns = await NatsServer.start(jetstreamServerConf(wsConfig()));
const nc = await connect({ servers: `ws://127.0.0.1:${ns.websocket}` });
const js = nc.jetstream();

const kv = await js.views.kv("test");
const sc = StringCodec();
await kv.put("a", sc.encode("hello"));
const v = await kv.get("a");
t.truthy(v);
t.is(v.bucket, "test");
t.is(v.key, "a");
t.is(sc.decode(v.value), "hello");

await nc.close();
await ns.stop();
});

function readableStreamFrom(data) {
return new ReadableStream({
pull(controller) {
controller.enqueue(data);
controller.close();
},
});
}

async function fromReadableStream(
rs,
) {
const buf = new DataBuffer();
const reader = rs.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) {
return buf.drain();
}
if (value && value.length) {
buf.fill(value);
}
}
}

test("jetstream - os basics", async (t) => {
if (process.version.startsWith("v14.")) {
t.log(
`node ${process.version} cannot run objectstore as webcrypto is not available`,
);
t.pass();
return;
}

if (typeof globalThis.crypto === "undefined") {
const c = require("crypto");
global.crypto = c.webcrypto;
}

if (typeof globalThis.ReadableStream === "undefined") {
const streams = require("web-streams-polyfill/ponyfill");
global.ReadableStream = streams.ReadableStream;
}

const ns = await NatsServer.start(jetstreamServerConf(wsConfig()));
const nc = await connect({ servers: `ws://127.0.0.1:${ns.websocket}` });
const js = nc.jetstream();

const os = await js.views.os("test");
const sc = StringCodec();

await os.put({ name: "a" }, readableStreamFrom(sc.encode("hello")));
const v = await os.get("a");
t.truthy(v);
t.is(v.info.bucket, "test");
t.is(v.info.name, "a");
t.is(v.info.chunks, 1);
t.is(sc.decode(await fromReadableStream(v.data)), "hello");

await nc.close();
await ns.stop();
});
3 changes: 2 additions & 1 deletion tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
"outDir": "lib/",
"moduleResolution": "node",
"sourceMap": true,
"declaration": true
"declaration": true,
"allowJs": true
},
"include": [
"wst"
Expand Down

0 comments on commit 86ebcb8

Please sign in to comment.