Skip to content

Commit

Permalink
Merge pull request #595 from nats-io/fix-594
Browse files Browse the repository at this point in the history
[FEAT] stream `compression` option
  • Loading branch information
aricart committed Sep 19, 2023
2 parents d023380 + 74de87a commit 7b050d0
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 1 deletion.
16 changes: 16 additions & 0 deletions jetstream/jsapi_types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,11 @@ export interface StreamUpdateConfig {
* This feature only supported on 2.10.x and better.
*/
subject_transform?: SubjectTransformConfig;
/**
* Sets the compression level of the stream. This feature is only supported in
* servers 2.10.x and better.
*/
compression?: StoreCompression;
}

export interface Republish {
Expand Down Expand Up @@ -410,6 +415,17 @@ export enum ReplayPolicy {
Original = "original",
}

export enum StoreCompression {
/**
* No compression
*/
None = "none",
/**
* S2 compression
*/
S2 = "s2",
}

/**
* Options for StreamAPI info requests
*/
Expand Down
6 changes: 6 additions & 0 deletions jetstream/jsmstream_api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,12 @@ export class StreamAPIImpl extends BaseApiClient implements StreamAPI {
throw new Error(`stream 'subject_transform' requires server ${min}`);
}
}
if (cfg.compression) {
const { min, ok } = nci.features.get(Feature.JS_STREAM_COMPRESSION);
if (!ok) {
throw new Error(`stream 'compression' requires server ${min}`);
}
}

function validateStreamSource(
context: string,
Expand Down
47 changes: 46 additions & 1 deletion jetstream/tests/jsm_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ import { JetStreamManagerImpl } from "../jsm.ts";
import { Feature } from "../../nats-base-client/semver.ts";
import { convertStreamSourceDomain } from "../jsmstream_api.ts";
import { ConsumerAPIImpl } from "../jsmconsumer_api.ts";
import { ConsumerApiAction } from "../jsapi_types.ts";
import { ConsumerApiAction, StoreCompression } from "../jsapi_types.ts";

const StreamNameRequired = "stream name required";
const ConsumerNameRequired = "durable name required";
Expand Down Expand Up @@ -2544,3 +2544,48 @@ Deno.test("jsm - source transforms rejected on old servers", async () => {

await cleanup(ns, nc);
});

Deno.test("jsm - stream compression not supported", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
const nci = nc as NatsConnectionImpl;
nci.features.update("2.9.0");
nci.info!.version = "2.9.0";

const jsm = await nc.jetstreamManager();

await assertRejects(
async () => {
await jsm.streams.add({
name: "n",
subjects: ["foo"],
storage: StorageType.File,
compression: StoreCompression.S2,
});
},
Error,
"stream 'compression' requires server 2.10.0",
);

await cleanup(ns, nc);
});

Deno.test("jsm - stream compression", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
if (await notCompatible(ns, nc, "2.10.0")) {
return;
}

const jsm = await nc.jetstreamManager();
let si = await jsm.streams.add({
name: "n",
subjects: ["foo"],
storage: StorageType.File,
compression: StoreCompression.S2,
});
assertEquals(si.config.compression, StoreCompression.S2);

si = await jsm.streams.update("n", { compression: StoreCompression.None });
assertEquals(si.config.compression, StoreCompression.None);

await cleanup(ns, nc);
});
2 changes: 2 additions & 0 deletions nats-base-client/semver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ export enum Feature {
JS_STREAM_FIRST_SEQ = "js_stream_first_seq",
JS_STREAM_SUBJECT_TRANSFORM = "js_stream_subject_transform",
JS_STREAM_SOURCE_SUBJECT_TRANSFORM = "js_stream_source_subject_transform",
JS_STREAM_COMPRESSION = "js_stream_compression",
}

type FeatureVersion = {
Expand Down Expand Up @@ -105,6 +106,7 @@ export class Features {
this.set(Feature.JS_STREAM_FIRST_SEQ, "2.10.0");
this.set(Feature.JS_STREAM_SUBJECT_TRANSFORM, "2.10.0");
this.set(Feature.JS_STREAM_SOURCE_SUBJECT_TRANSFORM, "2.10.0");
this.set(Feature.JS_STREAM_COMPRESSION, "2.10.0");

this.disabled.forEach((f) => {
this.features.delete(f);
Expand Down

0 comments on commit 7b050d0

Please sign in to comment.