Skip to content

Commit

Permalink
sha256 digests
Browse files Browse the repository at this point in the history
  • Loading branch information
aricart committed Aug 8, 2022
1 parent 5ab01ca commit c72c0ae
Show file tree
Hide file tree
Showing 2 changed files with 378 additions and 4 deletions.
22 changes: 18 additions & 4 deletions nats-base-client/objectstore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import { headers, MsgHdrs, MsgHdrsImpl } from "./headers.ts";
import { consumerOpts } from "./jsconsumeropts.ts";
import { millis, nanos, NatsError } from "./mod.ts";
import { QueuedIterator, QueuedIteratorImpl } from "./queued_iterator.ts";
import { SHA256 } from "./sha256.js";

export function objectStoreStreamName(bucket: string): string {
validateBucket(bucket);
Expand Down Expand Up @@ -321,6 +322,8 @@ export class ObjectStoreImpl implements ObjectStore {
const db = new DataBuffer();
try {
const reader = rs ? rs.getReader() : null;
const sha = new SHA256();

while (true) {
const { done, value } = reader
? await reader.read()
Expand All @@ -329,17 +332,17 @@ export class ObjectStoreImpl implements ObjectStore {
// put any partial chunk in
if (db.size() > 0) {
const payload = db.drain();
sha.update(payload);
info.chunks!++;
info.size! += payload.length;
info.mtime = nanos(Date.now());
info.digest = "";
info.digest = `sha-256:${sha.digest("base64")}`;
info.deleted = false;
proms.push(this.js.publish(chunkSubj, payload));
}
// trailing md for the object
const h = headers();
h.set(JsHeaders.RollupHdr, JsHeaders.RollupValueSubject);

proms.push(
this.js.publish(metaSubj, JSONCodec().encode(info), { headers: h }),
);
Expand All @@ -363,8 +366,10 @@ export class ObjectStoreImpl implements ObjectStore {
while (db.size() > maxChunk) {
info.chunks!++;
info.size! += maxChunk;
const payload = db.drain(meta.options.max_chunk_size);
sha.update(payload);
proms.push(
this.js.publish(chunkSubj, db.drain(meta.options.max_chunk_size)),
this.js.publish(chunkSubj, payload),
);
}
}
Expand Down Expand Up @@ -409,15 +414,24 @@ export class ObjectStoreImpl implements ObjectStore {

const oc = consumerOpts();
oc.orderedConsumer();
const sha = new SHA256();
const subj = `$O.${this.name}.C.${info.nuid}`;
const sub = await this.js.subscribe(subj, oc);
(async () => {
for await (const jm of sub) {
if (jm.data.length > 0) {
sha.update(jm.data);
controller!.enqueue(jm.data);
}
if (jm.info.pending === 0) {
controller!.close();
const hash = `sha-256:${sha.digest("base64")}`;
if (info.digest !== hash) {
controller!.error(
new Error("received a corrupt object, digests do not match"),
);
} else {
controller!.close();
}
sub.unsubscribe();
}
}
Expand Down
Loading

0 comments on commit c72c0ae

Please sign in to comment.