Skip to content

Commit

Permalink
Merge pull request #490 from nats-io/fix-meta-encoding
Browse files Browse the repository at this point in the history
[FIX] ObjectStore meta subject entries name encoding
  • Loading branch information
aricart committed Mar 8, 2023
2 parents 1660798 + d052353 commit ab40594
Show file tree
Hide file tree
Showing 5 changed files with 193 additions and 26 deletions.
149 changes: 149 additions & 0 deletions bin/fix-os.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Copyright 2023 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { parse } from "https://deno.land/std@0.177.0/flags/mod.ts";
import { ObjectStoreImpl } from "https://raw.githubusercontent.com/nats-io/nats.deno/main/nats-base-client/objectstore.ts";
import {
connect,
ConnectionOptions,
credsAuthenticator,
JSONCodec,
} from "https://raw.githubusercontent.com/nats-io/nats.deno/main/src/mod.ts";

const argv = parse(
Deno.args,
{
alias: {
"s": ["server"],
"f": ["creds"],
"b": ["bucket"],
},
default: {
s: "127.0.0.1:4222",
c: 1,
i: 0,
},
boolean: ["dryrun"],
string: ["server", "creds", "bucket"],
},
);

const copts = { servers: argv.s } as ConnectionOptions;

if (argv.h || argv.help) {
console.log(
"Usage: fix-os [-s server] [--creds=/path/file.creds] [--check] --bucket=name",
);
console.log(
"\nThis tool fixes metadata entries in an object store that were written",
);
console.log(
"with base64 encoding without padding. Please backup your object stores",
);
console.log("before using this tool.");

Deno.exit(1);
}

if (argv.creds) {
const f = await Deno.open(argv.creds, { read: true });
const data = await Deno.readFile(f);
Deno.close(f.rid);
copts.authenticator = credsAuthenticator(data);
}

if (!argv.bucket) {
console.log("--bucket is required");
Deno.exit(1);
}

const nc = await connect(copts);

const js = nc.jetstream();
const jsm = await nc.jetstreamManager();
const lister = jsm.streams.listObjectStores();
let found = false;
const streamName = `OBJ_${argv.bucket}`;
for await (const oss of lister) {
if (oss.streamInfo.config.name === streamName) {
found = true;
break;
}
}
if (!found) {
console.log(`bucket '${argv.bucket}' was not found`);
Deno.exit(1);
}
const os = await js.views.os(argv.bucket) as ObjectStoreImpl;

// `$${osPrefix}${os.name}.M.>`
const osInfo = await os.status({ subjects_filter: "$O.*.M.*" });
const entries = Object.getOwnPropertyNames(
osInfo.streamInfo.state.subjects || {},
);
let fixes = 0;

for (let i = 0; i < entries.length; i++) {
const chunks = entries[i].split(".");
let key = chunks[3];
if (key.endsWith("=")) {
// this is already padded
continue;
}
const pad = key.length % 4;
if (pad === 0) {
continue;
}
// this entry is incorrect fix it
fixes++;
if (argv.check) {
continue;
}
const padding = pad > 0 ? "=".repeat(pad) : "";
chunks[3] += padding;
const fixedKey = chunks.join(".");

let m;
try {
m = await jsm.streams.getMessage(os.stream, {
last_by_subj: entries[i],
});
} catch (err) {
console.error(`[ERR] failed to update ${entries[i]}: ${err.message}`);
continue;
}
if (m) {
try {
await js.publish(fixedKey, m.data);
} catch (err) {
console.error(`[ERR] failed to update ${entries[i]}: ${err.message}`);
continue;
}
try {
const seq = m.seq;
await jsm.streams.deleteMessage(os.stream, seq);
} catch (err) {
console.error(
`[WARN] failed to delete bad entry ${
entries[i]
}: ${err.message} - new entry was added`,
);
}
}
}

const verb = argv.check ? "are" : "were";
console.log(`${fixes} fixes ${verb} required on bucket ${argv.bucket}`);

await nc.close();
26 changes: 26 additions & 0 deletions nats-base-client/base64.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,29 @@ export class Base64UrlCodec {
.replace(/-/g, "+");
}
}

export class Base64UrlPaddedCodec {
static encode(bytes: string | Uint8Array): string {
return Base64UrlPaddedCodec.toB64URLEncoding(Base64Codec.encode(bytes));
}

static decode(s: string, binary = false): Uint8Array | string {
return Base64UrlPaddedCodec.decode(
Base64UrlPaddedCodec.fromB64URLEncoding(s),
binary,
);
}

static toB64URLEncoding(b64str: string): string {
return b64str
.replace(/\+/g, "-")
.replace(/\//g, "_");
}

static fromB64URLEncoding(b64str: string): string {
// pads are % 4, but not necessary on decoding
return b64str
.replace(/_/g, "/")
.replace(/-/g, "+");
}
}
4 changes: 2 additions & 2 deletions nats-base-client/objectstore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import {
StreamInfoRequestOptions,
} from "./types.ts";
import { validateBucket, validateKey } from "./kv.ts";
import { Base64UrlCodec } from "./base64.ts";
import { Base64UrlCodec, Base64UrlPaddedCodec } from "./base64.ts";
import { JSONCodec } from "./codec.ts";
import { nuid } from "./nuid.ts";
import { deferred } from "./util.ts";
Expand Down Expand Up @@ -685,7 +685,7 @@ export class ObjectStoreImpl implements ObjectStore {
}

_metaSubject(n: string): string {
return `$O.${this.name}.M.${Base64UrlCodec.encode(n)}`;
return `$O.${this.name}.M.${Base64UrlPaddedCodec.encode(n)}`;
}

_metaSubjectAll(): string {
Expand Down
2 changes: 1 addition & 1 deletion src/deno_transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import {
} from "../nats-base-client/internal_mod.ts";
import type { TlsOptions } from "../nats-base-client/types.ts";

const VERSION = "1.12.1";
const VERSION = "1.13.0";
const LANG = "nats.deno";

// if trying to simply write to the connection for some reason
Expand Down
38 changes: 15 additions & 23 deletions tests/objectstore_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,24 @@ import { crypto } from "https://deno.land/std@0.177.0/crypto/mod.ts";
import {
Empty,
headers,
JSONCodec,
StorageType,
StringCodec,
} from "../nats-base-client/mod.ts";
import { assertRejects } from "https://deno.land/std@0.177.0/testing/asserts.ts";
import { equals } from "https://deno.land/std@0.177.0/bytes/mod.ts";
import { ObjectInfo, ObjectStoreMeta } from "../nats-base-client/types.ts";
import { SHA256 } from "../nats-base-client/sha256.js";
import { Base64UrlCodec } from "../nats-base-client/base64.ts";
import { digestType } from "../nats-base-client/objectstore.ts";
import {
Base64UrlCodec,
Base64UrlPaddedCodec,
} from "../nats-base-client/base64.ts";
import {
digestType,
ObjectStoreImpl,
osPrefix,
} from "../nats-base-client/objectstore.ts";
import { connect } from "../src/mod.ts";

function readableStreamFrom(data: Uint8Array): ReadableStream<Uint8Array> {
return new ReadableStream<Uint8Array>({
Expand Down Expand Up @@ -680,37 +689,20 @@ Deno.test("objectstore - sanitize", async () => {
});
assertEquals(
info.streamInfo.state
?.subjects![`$O.test.M.${Base64UrlCodec.encode("has_dots_here")}`],
?.subjects![`$O.test.M.${Base64UrlPaddedCodec.encode("has_dots_here")}`],
1,
);
assertEquals(
info.streamInfo.state
.subjects![`$O.test.M.${Base64UrlCodec.encode("the_spaces_are_here")}`],
.subjects![
`$O.test.M.${Base64UrlPaddedCodec.encode("the_spaces_are_here")}`
],
1,
);

await cleanup(ns, nc);
});

// Deno.test("objectstore - compat", async () => {
// const nc = await connect();
// const js = nc.jetstream();
// const os = await js.views.os("test");
// console.log(await os.status({ subjects_filter: ">" }));
//
// const a = await os.list();
// console.log(a);
//
// const rs = await os.get("./main.go");
// const data = await fromReadableStream(rs!.data);
// const sc = StringCodec();
// console.log(sc.decode(data));
//
// await os.put({ name: "hello" }, readableStreamFrom(sc.encode("hello world")));
//
// await nc.close();
// });

Deno.test("objectstore - partials", async () => {
const { ns, nc } = await setup(jetstreamServerConf({
max_payload: 1024 * 1024,
Expand Down

0 comments on commit ab40594

Please sign in to comment.