Skip to content

Commit

Permalink
[FIX] [OS] [BREAKING] previous versions of javascript objectstore enc…
Browse files Browse the repository at this point in the history
…oded entry names containing periods to underbards prior to generating a metasubject storing the meta information. This is incorrectly and makes such entries not accessible on other clients. This fix changes to generate proper meta subjects for such entries. - Note that bin/fix-os.ts contains a migration tool to migrate those meta entries. Please run the migration tool after backing up your objectstore.

[FIX] [OS] [CHANGE] objectstore now always set allow_direct to true on new stores

[FIX] [OS] the `ttl` option was not properly propagated to the underlying stream when specified
  • Loading branch information
aricart committed Jul 3, 2023
1 parent f019f3b commit 3b1d057
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 85 deletions.
152 changes: 100 additions & 52 deletions bin/fix-os.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* limitations under the License.
*/
import { parse } from "https://deno.land/std@0.190.0/flags/mod.ts";
import { ObjectStoreImpl } from "https://raw.githubusercontent.com/nats-io/nats.deno/main/nats-base-client/objectstore.ts";
import { ObjectStoreImpl, ServerObjectInfo } from "../jetstream/objectstore.ts";
import {
connect,
ConnectionOptions,
Expand All @@ -33,7 +33,7 @@ const argv = parse(
c: 1,
i: 0,
},
boolean: ["dryrun"],
boolean: ["check"],
string: ["server", "creds", "bucket"],
},
);
Expand All @@ -56,9 +56,7 @@ if (argv.h || argv.help) {
}

if (argv.creds) {
const f = await Deno.open(argv.creds, { read: true });
const data = await Deno.readFile(f);
Deno.close(f.rid);
const data = await Deno.readFile(argv.creds);
copts.authenticator = credsAuthenticator(data);
}

Expand All @@ -85,64 +83,114 @@ if (!found) {
Deno.exit(1);
}
const os = await js.views.os(argv.bucket) as ObjectStoreImpl;
await fixHashes(os);
await metaFix(os);

// `$${osPrefix}${os.name}.M.>`
const osInfo = await os.status({ subjects_filter: "$O.*.M.*" });
const entries = Object.getOwnPropertyNames(
osInfo.streamInfo.state.subjects || {},
);
let fixes = 0;
async function fixHashes(os: ObjectStoreImpl): Promise<void> {
let fixes = 0;
// `$${osPrefix}${os.name}.M.>`
const osInfo = await os.status({ subjects_filter: "$O.*.M.*" });
const entries = Object.getOwnPropertyNames(
osInfo.streamInfo.state.subjects || {},
);

for (let i = 0; i < entries.length; i++) {
const chunks = entries[i].split(".");
const key = chunks[3];
if (key.endsWith("=")) {
// this is already padded
continue;
}
const pad = key.length % 4;
if (pad === 0) {
continue;
}
// this entry is incorrect fix it
fixes++;
if (argv.check) {
continue;
}
const padding = pad > 0 ? "=".repeat(pad) : "";
chunks[3] += padding;
const fixedKey = chunks.join(".");
for (let i = 0; i < entries.length; i++) {
const chunks = entries[i].split(".");
const key = chunks[3];
if (key.endsWith("=")) {
// this is already padded
continue;
}
const pad = key.length % 4;
if (pad === 0) {
continue;
}
// this entry is incorrect fix it
fixes++;
if (argv.check) {
continue;
}
const padding = pad > 0 ? "=".repeat(pad) : "";
chunks[3] += padding;
const fixedKey = chunks.join(".");

let m;
try {
m = await jsm.streams.getMessage(os.stream, {
last_by_subj: entries[i],
});
} catch (err) {
console.error(`[ERR] failed to update ${entries[i]}: ${err.message}`);
continue;
}
if (m) {
let m;
try {
await js.publish(fixedKey, m.data);
m = await jsm.streams.getMessage(os.stream, {
last_by_subj: entries[i],
});
} catch (err) {
console.error(`[ERR] failed to update ${entries[i]}: ${err.message}`);
continue;
}
if (m) {
try {
await js.publish(fixedKey, m.data);
} catch (err) {
console.error(`[ERR] failed to update ${entries[i]}: ${err.message}`);
continue;
}
try {
const seq = m.seq;
await jsm.streams.deleteMessage(os.stream, seq);
} catch (err) {
console.error(
`[WARN] failed to delete bad entry ${
entries[i]
}: ${err.message} - new entry was added`,
);
}
}
}

const verb = argv.check ? "are" : "were";
console.log(`${fixes} hash fixes ${verb} required on bucket ${argv.bucket}`);
}

// metaFix addresses an issue where keys that contained `.` were serialized
// using a subject meta that replaced the `.` with `_`.
async function metaFix(os: ObjectStoreImpl): Promise<void> {
let fixes = 0;
const osInfo = await os.status({ subjects_filter: "$O.*.M.*" });
const subjects = Object.getOwnPropertyNames(
osInfo.streamInfo.state.subjects || {},
);
for (let i = 0; i < subjects.length; i++) {
const metaSubj = subjects[i];
try {
const seq = m.seq;
await jsm.streams.deleteMessage(os.stream, seq);
const m = await os.jsm.streams.getMessage(os.stream, {
last_by_subj: metaSubj,
});
const soi = m.json<ServerObjectInfo>();
const calcMeta = os._metaSubject(soi.name);
if (calcMeta !== metaSubj) {
fixes++;
if (argv.check) {
continue;
}
try {
await js.publish(calcMeta, m.data);
} catch (err) {
console.error(`[ERR] failed to update ${metaSubj}: ${err.message}`);
continue;
}
try {
const seq = m.seq;
await jsm.streams.deleteMessage(os.stream, seq);
} catch (err) {
console.error(
`[WARN] failed to delete bad entry ${metaSubj}: ${err.message} - new entry was added`,
);
}
}
} catch (err) {
console.error(
`[WARN] failed to delete bad entry ${
entries[i]
}: ${err.message} - new entry was added`,
);
console.error(`[ERR] failed to update ${metaSubj}: ${err.message}`);
}
}
const verb = argv.check ? "are" : "were";
console.log(
`${fixes} meta fixes ${verb} required on bucket ${argv.bucket}`,
);
}

const verb = argv.check ? "are" : "were";
console.log(`${fixes} fixes ${verb} required on bucket ${argv.bucket}`);

await nc.close();
await nc.drain();
36 changes: 14 additions & 22 deletions jetstream/objectstore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* limitations under the License.
*/

import { validateBucket, validateKey } from "./kv.ts";
import { validateBucket } from "./kv.ts";
import { Base64UrlPaddedCodec } from "../nats-base-client/base64.ts";
import { JSONCodec } from "../nats-base-client/codec.ts";
import { nuid } from "../nats-base-client/nuid.ts";
Expand Down Expand Up @@ -103,14 +103,14 @@ export class ObjectStoreStatusImpl implements ObjectStoreStatus {
}
}

type ServerObjectStoreMeta = {
export type ServerObjectStoreMeta = {
name: string;
description?: string;
headers?: Record<string, string[]>;
options?: ObjectStoreMetaOptions;
};

type ServerObjectInfo = {
export type ServerObjectInfo = {
bucket: string;
nuid: string;
size: number;
Expand Down Expand Up @@ -214,22 +214,11 @@ export class ObjectStoreImpl implements ObjectStore {
this.js = js;
}

_sanitizeName(name: string): { name: string; error?: Error } {
_checkNotEmpty(name: string): { name: string; error?: Error } {
if (!name || name.length === 0) {
return { name, error: new Error("name cannot be empty") };
}
// cannot use replaceAll - node until node 16 is min
// name = name.replaceAll(".", "_");
// name = name.replaceAll(" ", "_");
name = name.replace(/[. ]/g, "_");

let error = undefined;
try {
validateKey(name);
} catch (err) {
error = err;
}
return { name, error };
return { name };
}

async info(name: string): Promise<ObjectInfo | null> {
Expand All @@ -255,7 +244,7 @@ export class ObjectStoreImpl implements ObjectStore {
}

async rawInfo(name: string): Promise<ServerObjectInfo | null> {
const { name: obj, error } = this._sanitizeName(name);
const { name: obj, error } = this._checkNotEmpty(name);
if (error) {
return Promise.reject(error);
}
Expand Down Expand Up @@ -334,7 +323,7 @@ export class ObjectStoreImpl implements ObjectStore {
meta.options.max_chunk_size = maxChunk;

const old = await this.info(meta.name);
const { name: n, error } = this._sanitizeName(meta.name);
const { name: n, error } = this._checkNotEmpty(meta.name);
if (error) {
return Promise.reject(error);
}
Expand Down Expand Up @@ -590,7 +579,7 @@ export class ObjectStoreImpl implements ObjectStore {
return Promise.reject("bucket required");
}
const osi = bucket as ObjectStoreImpl;
const { name: n, error } = this._sanitizeName(name);
const { name: n, error } = this._checkNotEmpty(name);
if (error) {
return Promise.reject(error);
}
Expand All @@ -606,7 +595,7 @@ export class ObjectStoreImpl implements ObjectStore {
if (info.deleted) {
return Promise.reject(new Error("object is deleted"));
}
const { name: n, error } = this._sanitizeName(name);
const { name: n, error } = this._checkNotEmpty(name);
if (error) {
return Promise.reject(error);
}
Expand Down Expand Up @@ -670,7 +659,7 @@ export class ObjectStoreImpl implements ObjectStore {
// effectively making the object available under 2 names, but it doesn't remove the
// older one.
meta.name = meta.name ?? info.name;
const { name: n, error } = this._sanitizeName(meta.name);
const { name: n, error } = this._checkNotEmpty(meta.name);
if (error) {
return Promise.reject(error);
}
Expand Down Expand Up @@ -773,8 +762,11 @@ export class ObjectStoreImpl implements ObjectStore {
} catch (err) {
return Promise.reject(err);
}
const sc = Object.assign({}, opts) as StreamConfig;
const max_age = opts?.ttl || 0;
delete opts.ttl;
const sc = Object.assign({ max_age }, opts) as StreamConfig;
sc.name = this.stream;
sc.allow_direct = true;
sc.allow_rollup_hdrs = true;
sc.discard = DiscardPolicy.New;
sc.subjects = [`$O.${this.name}.C.>`, `$O.${this.name}.M.>`];
Expand Down
1 change: 0 additions & 1 deletion jetstream/tests/consumers_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1006,7 +1006,6 @@ Deno.test("consumers - next listener leaks", async () => {

const consumer = await js.consumers.get("messages", "myconsumer");

let done = false;
while (true) {
const m = await consumer.next();
if (m) {
Expand Down
42 changes: 32 additions & 10 deletions jetstream/tests/objectstore_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import {
import { DataBuffer } from "../../nats-base-client/databuffer.ts";
import { crypto } from "https://deno.land/std@0.190.0/crypto/mod.ts";
import { ObjectInfo, ObjectStoreMeta, StorageType } from "../mod.ts";
import { Empty, headers, StringCodec } from "../../src/mod.ts";
import { Empty, headers, nanos, StringCodec } from "../../src/mod.ts";
import { equals } from "https://deno.land/std@0.190.0/bytes/mod.ts";
import { SHA256 } from "../../nats-base-client/sha256.js";
import { Base64UrlPaddedCodec } from "../../nats-base-client/base64.ts";
Expand Down Expand Up @@ -316,13 +316,8 @@ Deno.test("objectstore - object names", async () => {
await os.put({ name: "blob.txt" }, readableStreamFrom(sc.encode("A")));
await os.put({ name: "foo bar" }, readableStreamFrom(sc.encode("A")));
await os.put({ name: " " }, readableStreamFrom(sc.encode("A")));

await assertRejects(async () => {
await os.put({ name: "*" }, readableStreamFrom(sc.encode("A")));
});
await assertRejects(async () => {
await os.put({ name: ">" }, readableStreamFrom(sc.encode("A")));
});
await os.put({ name: "*" }, readableStreamFrom(sc.encode("A")));
await os.put({ name: ">" }, readableStreamFrom(sc.encode("A")));
await assertRejects(async () => {
await os.put({ name: "" }, readableStreamFrom(sc.encode("A")));
});
Expand Down Expand Up @@ -679,13 +674,13 @@ Deno.test("objectstore - sanitize", async () => {
});
assertEquals(
info.streamInfo.state
?.subjects![`$O.test.M.${Base64UrlPaddedCodec.encode("has_dots_here")}`],
?.subjects![`$O.test.M.${Base64UrlPaddedCodec.encode("has.dots.here")}`],
1,
);
assertEquals(
info.streamInfo.state
.subjects![
`$O.test.M.${Base64UrlPaddedCodec.encode("the_spaces_are_here")}`
`$O.test.M.${Base64UrlPaddedCodec.encode("the spaces are here")}`
],
1,
);
Expand Down Expand Up @@ -1016,3 +1011,30 @@ Deno.test("objectstore - put/get blob", async () => {

await cleanup(ns, nc);
});

Deno.test("objectstore - ttl", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}, true));
if (await notCompatible(ns, nc, "2.6.3")) {
return;
}
const js = nc.jetstream();
const ttl = nanos(60 * 1000);
const os = await js.views.os("OBJS", { ttl });
const status = await os.status();
assertEquals(status.ttl, ttl);

await cleanup(ns, nc);
});

Deno.test("objectstore - allow direct", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}, true));
if (await notCompatible(ns, nc, "2.6.3")) {
return;
}
const js = nc.jetstream();
const os = await js.views.os("OBJS");
const status = await os.status();
assertEquals(status.streamInfo.config.allow_direct, true);

await cleanup(ns, nc);
});

0 comments on commit 3b1d057

Please sign in to comment.