Skip to content

Commit

Permalink
#28: Removing transaction hashing from core
Browse files Browse the repository at this point in the history
  • Loading branch information
gsvarovsky committed Nov 23, 2020
1 parent 5ed256f commit 57ab006
Show file tree
Hide file tree
Showing 15 changed files with 56 additions and 315 deletions.
6 changes: 1 addition & 5 deletions src/engine/ControlMessage.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { Hash } from './hash';
import { TreeClock } from './clocks';
import { MeldError, MeldErrorStatus } from './MeldError';
const inspect = Symbol.for('nodejs.util.inspect.custom');
Expand Down Expand Up @@ -76,19 +75,17 @@ export namespace Response {
constructor(
readonly lastTime: TreeClock,
readonly quadsAddress: string,
readonly lastHash: Hash,
readonly updatesAddress: string) {
}

readonly toJson = () => ({
'@type': 'http://control.m-ld.org/response/snapshot',
lastTime: this.lastTime.toJson(),
quadsAddress: this.quadsAddress,
lastHash: this.lastHash.encode(),
updatesAddress: this.updatesAddress
});

toString = () => `Snapshot at ${this.lastTime} with ${this.lastHash}`;
toString = () => `Snapshot at ${this.lastTime}`;
[inspect] = () => this.toString();
}

Expand Down Expand Up @@ -145,7 +142,6 @@ export namespace Response {
if (lastTime)
return new Snapshot(
lastTime, json.quadsAddress,
Hash.decode(json.lastHash),
json.updatesAddress);
break;
case 'http://control.m-ld.org/response/revup':
Expand Down
19 changes: 1 addition & 18 deletions src/engine/MeldEncoding.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
import { MeldDelta, EncodedDelta, UUID } from '.';
import { NamedNode, Quad } from 'rdf-js';
import { literal, namedNode, blankNode, triple as newTriple, defaultGraph, quad as newQuad } from '@rdfjs/data-model';
import { HashBagBlock } from './blocks';
import { Hash } from './hash';
import { compact } from 'jsonld';
import { flatten } from './util';
import { Context, ExpandedTermDef } from '../jrql-support';
import { Iri } from 'jsonld/jsonld-spec';
import { Triple, tripleKey, TripleMap } from './quads';
import { Triple, TripleMap } from './quads';
import { rdfToJson, jsonToRdf } from "./jsonld";

export class DomainContext implements Context {
Expand Down Expand Up @@ -39,10 +37,6 @@ namespace rdf {
export const object = namedNode($id + 'object');
}

export function hashTriple(triple: Triple): Hash {
return Hash.digest(...tripleKey(triple));
}

export function reifyTriplesTids(triplesTids: TripleMap<UUID[]>): Triple[] {
return flatten([...triplesTids].map(([triple, tids]) => {
const rid = blankNode();
Expand Down Expand Up @@ -80,17 +74,6 @@ export function unreify(reifications: Triple[]): [Triple, UUID[]][] {
}, {} as { [rid: string]: [Triple, UUID[]] }));
}

export class JsonDeltaBagBlock extends HashBagBlock<EncodedDelta> {
constructor(id: Hash, data?: EncodedDelta) { super(id, data); }
protected construct = (id: Hash, data: EncodedDelta) => new JsonDeltaBagBlock(id, data);
protected hash = (data: EncodedDelta) => {
const [ver, tid, del, ins] = data;
if (ver !== 0)
throw new Error(`Encoded delta version ${ver} not supported`);
return Hash.digest(tid, ins, del); // Note delete insert reversed (historical)
};
}

/**
* TODO: re-sync with Java
* @see m-ld/m-ld-core/src/main/java/org/m_ld/MeldResource.java
Expand Down
8 changes: 4 additions & 4 deletions src/engine/PubsubRemotes.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Snapshot, DeltaMessage, MeldRemotes, MeldLocal, UUID, Revup } from '.';
import { Snapshot, DeltaMessage, MeldRemotes, MeldLocal, Revup } from '.';
import { Observable, Subject as Source, BehaviorSubject, identity } from 'rxjs';
import { TreeClock } from './clocks';
import { generate as uuid } from 'short-uuid';
Expand Down Expand Up @@ -157,7 +157,7 @@ export abstract class PubsubRemotes extends AbstractMeld implements MeldRemotes
// Ack the response to start the streams
ack.resolve();
sw.stop();
return { lastTime: res.lastTime, lastHash: res.lastHash, quads, updates };
return { lastTime: res.lastTime, quads, updates };
}

private triplesFromBuffer = (payload: Buffer) =>
Expand Down Expand Up @@ -365,10 +365,10 @@ export abstract class PubsubRemotes extends AbstractMeld implements MeldRemotes
}

private async replySnapshot(sentParams: DirectParams, snapshot: Snapshot): Promise<void> {
const { lastTime, lastHash, quads, updates } = snapshot;
const { lastTime, quads, updates } = snapshot;
const quadsAddress = uuid(), updatesAddress = uuid();
await this.reply(sentParams, new Response.Snapshot(
lastTime, quadsAddress, lastHash, updatesAddress
lastTime, quadsAddress, updatesAddress
), 'expectAck');
// Ack has been sent, start streaming the data and updates concurrently
await Promise.all([
Expand Down
26 changes: 0 additions & 26 deletions src/engine/blocks.ts

This file was deleted.

6 changes: 3 additions & 3 deletions src/engine/dataset/DatasetEngine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,9 @@ export class DatasetEngine extends AbstractMeld implements CloneEngine, MeldLoca
if (this.newClone)
// For a new non-genesis clone, the first connect is essential.
await comesAlive(this);
else
// For any other clone, just wait for decided liveness.
await comesAlive(this, 'notNull');
// else
// // For any other clone, just wait for decided liveness.
// await comesAlive(this, 'notNull');
}

private reconnectDelayer = (style: ConnectStyle): Observable<number> => {
Expand Down
21 changes: 3 additions & 18 deletions src/engine/dataset/SuSetDataset.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,20 @@ import { MeldUpdate, MeldConstraint, MeldReadState, readResult, Resource, ReadRe
import { Snapshot, UUID, DeltaMessage, MeldDelta } from '..';
import { Quad } from 'rdf-js';
import { TreeClock } from '../clocks';
import { Hash } from '../hash';
import { Subject } from '../../jrql-support';
import { Dataset, PatchQuads } from '.';
import { flatten as flatJsonLd } from 'jsonld';
import { Iri } from 'jsonld/jsonld-spec';
import { JrqlGraph } from './JrqlGraph';
import { MeldEncoding, unreify, hashTriple, toDomainQuad, reifyTriplesTids } from '../MeldEncoding';
import { MeldEncoding, unreify, toDomainQuad, reifyTriplesTids } from '../MeldEncoding';
import { Observable, from, Subject as Source, EMPTY } from 'rxjs';
import { bufferCount, mergeMap, reduce, map, filter, takeWhile, expand } from 'rxjs/operators';
import { flatten, Future, tapComplete, getIdLogger, check } from '../util';
import { generate as uuid } from 'short-uuid';
import { Logger } from 'loglevel';
import { MeldError } from '../MeldError';
import { LocalLock } from '../local';
import { SUSET_CONTEXT, qsName, toPrefixedId } from './SuSetGraph';
import { SUSET_CONTEXT, qsName, tripleId } from './SuSetGraph';
import { SuSetJournalGraph, SuSetJournalEntry } from './SuSetJournal';
import { MeldConfig, Read } from '../..';
import { QuadMap, TripleMap, Triple } from '../quads';
Expand Down Expand Up @@ -135,15 +134,6 @@ export class SuSetDataset extends JrqlGraph {
});
}

/**
* @return the last hash seen in the journal.
*/
@SuSetDataset.checkNotClosed.async
async lastHash(): Promise<Hash> {
const journal = await this.journalGraph.journal();
return (await journal.tail()).hash;
}

/**
* Emits entries from the journal since a time given as a clock or a tick.
* The clock variant accepts a remote or local clock and provides operations
Expand Down Expand Up @@ -432,7 +422,7 @@ export class SuSetDataset extends JrqlGraph {
const dataReset = this.dataset.transact({
id: 'suset-reset',
prepare: async () =>
({ patch: await this.journalGraph.reset(snapshot.lastHash, snapshot.lastTime, localTime) })
({ patch: await this.journalGraph.reset(snapshot.lastTime, localTime) })
});

const quadsApplied = snapshot.quads.pipe(
Expand Down Expand Up @@ -469,7 +459,6 @@ export class SuSetDataset extends JrqlGraph {
const tail = await journal.tail();
resolve({
lastTime: tail.gwc,
lastHash: tail.hash,
quads: this.graph.match().pipe(
bufferCount(10), // TODO batch size config
mergeMap(async batch => reifyTriplesTids(
Expand All @@ -483,7 +472,3 @@ export class SuSetDataset extends JrqlGraph {
});
}
}

function tripleId(triple: Triple): string {
return toPrefixedId('thash', hashTriple(triple).encode());
}
17 changes: 15 additions & 2 deletions src/engine/dataset/SuSetGraph.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import { Context } from '../../jrql-support';
import { namedNode } from '@rdfjs/data-model';
import { NamedNode } from 'rdf-js';
import { Iri } from 'jsonld/jsonld-spec';
import { Triple, tripleKey } from '../quads';
import { createHash } from 'crypto';

/**
* Context for SU-Set Dataset code to manipulate control content.
Expand All @@ -25,6 +28,16 @@ export function qsName(name: string): NamedNode {
return namedNode(SUSET_CONTEXT.qs + name);
}

export function toPrefixedId(prefix: string, ...path: string[]) {
export function toPrefixedId(prefix: string, ...path: string[]): Iri {
return `${prefix}:${path.map(encodeURIComponent).join('/')}`;
}
}

export function fromPrefixedId(prefix: string, id: Iri): string[] {
return id.match(`^${prefix}:(.+)`)?.[1]?.split('/').map(decodeURIComponent) ?? [];
}

export function tripleId(triple: Triple): string {
const hash = createHash('sha1'); // Fastest
tripleKey(triple).forEach(key => hash.update(key));
return toPrefixedId('thash', hash.digest('base64'));
}
Loading

0 comments on commit 57ab006

Please sign in to comment.