Skip to content

Commit

Permalink
[FIX] fixes related to cross-client compatibility hash-padding, mtime…
Browse files Browse the repository at this point in the history
… format

[CHANGE] remove beta from kv notice, targetted object store instead.
[FIX] name encoding was only replacing first char not sequences
  • Loading branch information
aricart committed Aug 9, 2022
1 parent c72c0ae commit 7077e0c
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 27 deletions.
6 changes: 3 additions & 3 deletions nats-base-client/jsclient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ class ViewsImpl implements Views {
js: JetStreamClientImpl;
constructor(js: JetStreamClientImpl) {
this.js = js;
jetstreamPreview(this.js.nc);
}
kv(name: string, opts: Partial<KvOptions> = {}): Promise<KV> {
if (opts.bindOnly) {
Expand All @@ -109,6 +108,7 @@ class ViewsImpl implements Views {
name: string,
opts: Partial<ObjectStoreOptions> = {},
): Promise<ObjectStore> {
jetstreamPreview(this.js.nc);
return ObjectStoreImpl.create(this.js, name, opts);
}
}
Expand Down Expand Up @@ -847,11 +847,11 @@ const jetstreamPreview = (() => {
const { lang } = nci?.protocol?.transport;
if (lang) {
console.log(
`\u001B[33m >> jetstream's materialized views functionality in ${lang} is beta functionality \u001B[0m`,
`\u001B[33m >> jetstream's materialized views object store functionality in ${lang} is beta functionality \u001B[0m`,
);
} else {
console.log(
`\u001B[33m >> jetstream's materialized views functionality is beta functionality \u001B[0m`,
`\u001B[33m >> jetstream's materialized views object store functionality is beta functionality \u001B[0m`,
);
}
}
Expand Down
36 changes: 20 additions & 16 deletions nats-base-client/objectstore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import {
JetStreamOptions,
JsHeaders,
JsMsg,
Nanos,
ObjectInfo,
ObjectResult,
ObjectStore,
Expand All @@ -43,7 +42,7 @@ import { JetStreamClientImpl } from "./jsclient.ts";
import { DataBuffer } from "./databuffer.ts";
import { headers, MsgHdrs, MsgHdrsImpl } from "./headers.ts";
import { consumerOpts } from "./jsconsumeropts.ts";
import { millis, nanos, NatsError } from "./mod.ts";
import { NatsError } from "./mod.ts";
import { QueuedIterator, QueuedIteratorImpl } from "./queued_iterator.ts";
import { SHA256 } from "./sha256.js";

Expand Down Expand Up @@ -107,7 +106,7 @@ type ServerObjectInfo = {
chunks: number;
digest: string;
deleted?: boolean;
mtime: Nanos;
mtime: string;
} & ServerObjectStoreMeta;

class ObjectInfoImpl implements ObjectInfo {
Expand Down Expand Up @@ -143,12 +142,9 @@ class ObjectInfoImpl implements ObjectInfo {
get digest(): string {
return this.info.digest;
}
get mtime(): Nanos {
get mtime(): string {
return this.info.mtime;
}
get modified(): Date {
return new Date(millis(this.info.mtime));
}
get nuid(): string {
return this.info.nuid;
}
Expand Down Expand Up @@ -207,8 +203,8 @@ export class ObjectStoreImpl implements ObjectStore {
if (!name || name.length === 0) {
return { name, error: new Error("name cannot be empty") };
}
name = name.replace(".", "_");
name = name.replace(" ", "_");
name = name.replaceAll(".", "_");
name = name.replaceAll(" ", "_");
let error = undefined;
try {
validateKey(name);
Expand Down Expand Up @@ -303,11 +299,10 @@ export class ObjectStoreImpl implements ObjectStore {
if (error) {
return Promise.reject(error);
}
meta.name = n;

const id = nuid.next();
const chunkSubj = this._chunkSubject(id);
const metaSubj = this._metaSubject(meta.name);
const metaSubj = this._metaSubject(n);

const info = Object.assign({
bucket: this.name,
Expand Down Expand Up @@ -335,8 +330,11 @@ export class ObjectStoreImpl implements ObjectStore {
sha.update(payload);
info.chunks!++;
info.size! += payload.length;
info.mtime = nanos(Date.now());
info.digest = `sha-256:${sha.digest("base64")}`;
info.mtime = new Date().toISOString();
const digest = sha.digest("base64");
const pad = digest.length % 3;
const padding = pad > 0 ? "=".repeat(pad) : "";
info.digest = `sha-256=${digest}${padding}`;
info.deleted = false;
proms.push(this.js.publish(chunkSubj, payload));
}
Expand Down Expand Up @@ -424,10 +422,16 @@ export class ObjectStoreImpl implements ObjectStore {
controller!.enqueue(jm.data);
}
if (jm.info.pending === 0) {
const hash = `sha-256:${sha.digest("base64")}`;
if (info.digest !== hash) {
const hash = sha.digest("base64");
// go pads the hash - which should be multiple of 3 - otherwise pads with '='
const pad = hash.length % 3;
const padding = pad > 0 ? "=".repeat(pad) : "";
const digest = `sha-256=${hash}${padding}`;
if (digest !== info.digest) {
controller!.error(
new Error("received a corrupt object, digests do not match"),
new Error(
`received a corrupt object, digests do not match received: ${info.digest} calculated ${digest}`,
),
);
} else {
controller!.close();
Expand Down
2 changes: 1 addition & 1 deletion nats-base-client/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2586,7 +2586,7 @@ export interface ObjectInfo extends ObjectStoreMeta {
chunks: number;
digest: string;
deleted: boolean;
mtime: Nanos;
mtime: string;
}

export interface ObjectLink {
Expand Down
28 changes: 21 additions & 7 deletions tests/objectstore_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,7 @@ import {
} from "https://deno.land/std@0.95.0/testing/asserts.ts";
import { DataBuffer } from "../nats-base-client/databuffer.ts";
import { crypto } from "https://deno.land/std@0.136.0/crypto/mod.ts";
import {
headers,
millis,
StorageType,
StringCodec,
} from "../nats-base-client/mod.ts";
import { headers, StorageType, StringCodec } from "../nats-base-client/mod.ts";
import { assertRejects } from "https://deno.land/std@0.125.0/testing/asserts.ts";
import { equals } from "https://deno.land/std@0.111.0/bytes/mod.ts";
import { ObjectInfo, ObjectStoreMeta } from "../nats-base-client/types.ts";
Expand Down Expand Up @@ -78,7 +73,7 @@ Deno.test("objectstore - basics", async () => {
assertEquals(oi.bucket, "OBJS");
assertEquals(oi.nuid.length, 22);
assertEquals(oi.name, "BLOB");
assert(1000 > (Date.now() - millis(oi.mtime)));
// assert(1000 > (Date.now() - millis(oi.mtime)));

const jsm = await nc.jetstreamManager();
const si = await jsm.streams.info("OBJ_OBJS");
Expand Down Expand Up @@ -592,3 +587,22 @@ Deno.test("objectstore - default chunk is 128k", async () => {

await cleanup(ns, nc);
});

// Deno.test("objectstore - compat", async () => {
// const nc = await connect();
// const js = nc.jetstream();
// const os = await js.views.os("test");
// console.log(await os.status({ subjects_filter: ">" }));
//
// const a = await os.list();
// console.log(a);
//
// const rs = await os.get("./main.go");
// const data = await fromReadableStream(rs!.data);
// const sc = StringCodec();
// console.log(sc.decode(data));
//
// await os.put({ name: "hello" }, readableStreamFrom(sc.encode("hello world")));
//
// await nc.close();
// });

0 comments on commit 7077e0c

Please sign in to comment.