Skip to content

Commit

Permalink
preparing for release (#126)
Browse files Browse the repository at this point in the history
[added] beta notice to use of the jetstream apis
[bumped] version prior to release of NATS
[changed] js `pullBatch` to `fetch`
  • Loading branch information
aricart committed Mar 15, 2021
1 parent 64c6da2 commit 25f69de
Show file tree
Hide file tree
Showing 13 changed files with 84 additions and 48 deletions.
23 changes: 19 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,19 @@ A Deno client for the [NATS messaging system](https://nats.io).

# Installation

** :warning: NATS.deno is a release candidate** you can get the current
development version by:
You can get the latest release version like this:

```bash
import * as nats from "https://raw.githubusercontent.com/nats-io/nats.deno/main/src/mod.ts"
```typescript
import * as nats from "https://deno.land/x/nats/src/mod.ts";
```

To specify a specific released version, simply replace nats with
nats@_versionTag_.

You can get the current development version by:

```typescript
import * as nats from "https://raw.githubusercontent.com/nats-io/nats.deno/main/src/mod.ts";
```

To see a list of items that are under development see [TODO](TODO.md).
Expand Down Expand Up @@ -822,3 +830,10 @@ The relationship between these is:
0 and `reconnectJitterTLS` and add it to `reconnectTimeWait`.
- If the client didn't specify TLS options, the client will generate a number
between 0 and `reconnectJitter` and add it to `reconnectTimeWait`.

## JetStream

[Support for JetStream is built-in](jetstream.md). However, the JetStream API
extensions are still in beta. Feel free to use them. The client will emit a
console message when either `nc.jetstream()` or `nc.jetstreamManager()` apis are
used to remind you they are in beta.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ await jsm.consumers.add(stream, {

const js = nc.jetstream();

// similar to pull, but this time we can ask for many messages
// if `expire` is set, the request will wait for the specified
// number of messages until then. The `no_wait` returns an empty
// result if no messages are available.
const batch = js.pullBatch(stream, "me", { batch: 25, no_wait: true });
// similar to `pull()`, `fetch()` requests one or more messages.
// if `expire` is set, the returned iterator will wait for the specified
// number of messages or expire at the specified time. The `no_wait`d option
// returns an empty result if no messages are available.
const batch = js.fetch(stream, "me", { batch: 25, expires: 1000 });
await (async () => {
for await (const m of batch) {
console.log(m.seq, sc.decode(m.data));
Expand Down
2 changes: 1 addition & 1 deletion examples/jetstream/07_pullsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ await js.publish(subj);
await js.publish(subj);
await js.publish(subj);

// this is similar to pullBatch, but the consumer is created
// this is similar to fetch, but the consumer is created
// behind the scenes. To pull messages, you call `pull()` on
// the PullSubscription.
const psub = await js.pullSubscribe(subj, {
Expand Down
8 changes: 4 additions & 4 deletions jetstream.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ The new generation of Javascript clients:
- [nats.ws](https://github.com/nats-io/nats.ws)
- [nats.deno](https://github.com/nats-io/nats.deno)

all support JetStream, however the functionality is still on _beta_, and the
APIs are subject to change. Please report any issues you find.
all support JetStream, however the functionality is a _preview_, and the APIs
are subject to change. Please report any issues you find.

## JetStreamManager

Expand Down Expand Up @@ -133,15 +133,15 @@ let msg = await js.pull(stream, durableName);
msg.ack();
```

#### Requesting a batch of messages
#### Fetching batch of messages

You can also request more than one message at time. The request is a _long_
poll. So it remains open until the desired number of messages is received, or
the expiration time triggers.

```typescript
// To get multiple messages in one request you can:
let msgs = await js.pullBatch(stream, durable, { batch: 10, expires: 5000 });
let msgs = await js.fetch(stream, durable, { batch: 10, expires: 5000 });
// the request returns an iterator that will get at most 10 seconds or wait
// for 5000ms for messages to arrive.

Expand Down
4 changes: 2 additions & 2 deletions nats-base-client/internal_mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ export {
StorageType,
} from "./types.ts";

export { consumerOpts } from "./consumeropts.ts";
export type { ConsumerOptsBuilder } from "./consumeropts.ts";
export { consumerOpts } from "./jsconsumeropts.ts";
export type { ConsumerOptsBuilder } from "./jsconsumeropts.ts";
export { toJsMsg } from "./jsmsg.ts";

export { DebugEvents, Empty, Events } from "./types.ts";
Expand Down
6 changes: 3 additions & 3 deletions nats-base-client/jsclient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ import { QueuedIterator, QueuedIteratorImpl } from "./queued_iterator.ts";
import { Timeout, timeout } from "./util.ts";
import { createInbox } from "./protocol.ts";
import { headers } from "./headers.ts";
import type { ConsumerOptsBuilder } from "./consumeropts.ts";
import { consumerOpts, isConsumerOptsBuilder } from "./consumeropts.ts";
import type { ConsumerOptsBuilder } from "./jsconsumeropts.ts";
import { consumerOpts, isConsumerOptsBuilder } from "./jsconsumeropts.ts";

export interface JetStreamSubscriptionInfoable {
info: JetStreamSubscriptionInfo | null;
Expand Down Expand Up @@ -148,7 +148,7 @@ export class JetStreamClientImpl extends BaseApiClient
* @param durable
* @param opts
*/
pullBatch(
fetch(
stream: string,
durable: string,
opts: Partial<PullOptions> = {},
Expand Down
File renamed without changes.
21 changes: 21 additions & 0 deletions nats-base-client/nats.ts
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ export class NatsConnectionImpl implements NatsConnection {
async jetstreamManager(
opts: JetStreamOptions = {},
): Promise<JetStreamManager> {
jetstreamPreview(this);
const adm = new JetStreamManagerImpl(this, opts);
try {
await adm.getAccountInfo();
Expand All @@ -274,6 +275,26 @@ export class NatsConnectionImpl implements NatsConnection {
jetstream(
opts: JetStreamOptions = {},
): JetStreamClient {
jetstreamPreview(this);
return new JetStreamClientImpl(this, opts);
}
}

const jetstreamPreview = (() => {
let once = false;
return (nci: NatsConnectionImpl) => {
if (!once) {
once = true;
const { lang } = nci?.protocol?.transport;
if (lang) {
console.log(
`\u001B[33m >> jetstream functionality in ${lang} is preview functionality \u001B[0m`,
);
} else {
console.log(
`\u001B[33m >> jetstream functionality is preview functionality \u001B[0m`,
);
}
}
};
})();
2 changes: 1 addition & 1 deletion nats-base-client/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ export interface JetStreamClient {
options?: Partial<JetStreamPublishOptions>,
): Promise<PubAck>;
pull(stream: string, durable: string): Promise<JsMsg>;
pullBatch(
fetch(
stream: string,
durable: string,
opts?: Partial<PullOptions>,
Expand Down
2 changes: 1 addition & 1 deletion src/deno_transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import {
} from "../nats-base-client/internal_mod.ts";
import type { TlsOptions } from "../nats-base-client/types.ts";

const VERSION = "1.0.0-rc4";
const VERSION = "1.0.0";
const LANG = "nats.deno";

// if trying to simply write to the connection for some reason
Expand Down
2 changes: 1 addition & 1 deletion tests/consumeropts_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import {
consumerOpts,
ConsumerOptsBuilderImpl,
isConsumerOptsBuilder,
} from "../nats-base-client/consumeropts.ts";
} from "../nats-base-client/jsconsumeropts.ts";
import {
AckPolicy,
ConsumerOpts,
Expand Down
Loading

0 comments on commit 25f69de

Please sign in to comment.