Skip to content

Commit 828bcd9

Browse files
committed
feat: Extract chronicleEventLog from narrateEventLog, changes flags
Also wraps _receiveTruth sites into createReceiveTruth/Batch Renames/moves _decorateRetrieveMediaContent into _downstreamOps
1 parent 95da65f commit 828bcd9

File tree

10 files changed

+262
-225
lines changed

10 files changed

+262
-225
lines changed

packages/inspire/InspireGateway.js

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -342,7 +342,7 @@ export default class InspireGateway extends LogEventGenerator {
342342
this.warnEvent(`Narrated revelation with ${prologues.length} prologues`,
343343
"\n\tprologue partitions:",
344344
`'${prologues.map(({ partitionURI }) => String(partitionURI)).join("', '")}'`);
345-
const ret = await Promise.all(prologues.map(this._connectAndNarratePrologue));
345+
const ret = await Promise.all(prologues.map(this._connectChronicleAndNarratePrologue));
346346
this.warnEvent(`Acquired active connections for all revelation prologue partitions:`,
347347
"\n\tconnections:", ...dumpObject(ret));
348348
return ret;
@@ -391,21 +391,20 @@ export default class InspireGateway extends LogEventGenerator {
391391
}
392392
}
393393

394-
_connectAndNarratePrologue = async ({ partitionURI, info }: any) => {
394+
_connectChronicleAndNarratePrologue = async ({ partitionURI, info }: any) => {
395395
if ((await info.commandId) >= 0) {
396396
throw new Error("Command queues in revelation are not supported yet");
397397
}
398398
// Acquire connection without remote narration to determine the current last authorized event
399399
// so that we can narrate any content in the prologue before any remote activity.
400400
const connection = await this.falseProphet.acquirePartitionConnection(partitionURI, {
401-
dontRemoteNarrate: true,
401+
narrateRemote: false, subscribeRemote: false,
402402
});
403-
const eventId = await info.eventId;
404-
const lastEventId = connection.getLastAuthorizedEventId();
405-
if ((typeof eventId === "undefined") || (eventId <= lastEventId)) {
406-
const remoteNarration = connection.narrateEventLog();
407-
if (!(lastEventId >= 0)) await remoteNarration;
408-
} else {
403+
const lastPrologueEventId = await info.eventId;
404+
const lastChronicledEventId = connection.getLastAuthorizedEventId() || 0;
405+
const shouldChroniclePrologue = (lastPrologueEventId !== undefined)
406+
&& (lastPrologueEventId > lastChronicledEventId);
407+
if (shouldChroniclePrologue) {
409408
// If no event logs are replayed, we don't need to precache the blobs either, so we delay
410409
// loading them up to this point.
411410
await (this.blobInfos || (this.blobInfos = this._getBlobInfos()));
@@ -416,9 +415,8 @@ export default class InspireGateway extends LogEventGenerator {
416415
throw new Error("commandQueue revelation not implemented yet");
417416
}
418417
const latestMediaInfos = await logs.latestMediaInfos;
419-
await connection.narrateEventLog({
420-
eventLog,
421-
firstEventId: lastEventId + 1,
418+
await connection.chronicleEventLog(eventLog, {
419+
firstEventId: lastChronicledEventId + 1,
422420
retrieveMediaContent (mediaId: VRef, mediaInfo: Object) {
423421
if (!latestMediaInfos[mediaId.rawId()] ||
424422
(mediaInfo.blobId !== latestMediaInfos[mediaId.rawId()].mediaInfo.blobId)) {
@@ -433,6 +431,9 @@ export default class InspireGateway extends LogEventGenerator {
433431
}
434432
});
435433
}
434+
// Initiate remote narration.
435+
const remoteNarration = connection.narrateEventLog();
436+
if (!shouldChroniclePrologue && !(lastChronicledEventId >= 0)) await remoteNarration;
436437
return connection;
437438
}
438439

packages/prophet/Oracle/OraclePartitionConnection.js

Lines changed: 36 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
11
// @flow
22

3-
import type Command from "~/raem/command";
3+
import type Command, { UniversalEvent } from "~/raem/command";
44
import { VRef } from "~/raem/ValaaReference";
55

66
import PartitionConnection from "~/prophet/api/PartitionConnection";
7-
import type { NarrateOptions, MediaInfo, RetrieveMediaContent } from "~/prophet/api/Prophet";
7+
import type { ChronicleOptions, NarrateOptions, MediaInfo, RetrieveMediaContent }
8+
from "~/prophet/api/Prophet";
89

910
import { dumpObject, thenChainEagerly } from "~/tools";
1011

11-
import { _connect, _narrateEventLog } from "./_connectionOps";
12-
import { _onConfirmTruth } from "./_downstreamOps";
12+
import { _connect, _chronicleEventLog, _narrateEventLog } from "./_connectionOps";
13+
import { _createReceiveTruthBatch, _receiveTruthOf } from "./_downstreamOps";
1314
import { _readMediaContent, _getMediaURL, _prepareBlob } from "./_mediaOps";
1415

1516
/**
@@ -29,8 +30,7 @@ import { _readMediaContent, _getMediaURL, _prepareBlob } from "./_mediaOps";
2930
export default class OraclePartitionConnection extends PartitionConnection {
3031
_lastAuthorizedEventId: number;
3132
_downstreamTruthQueue: Object[];
32-
_authorityRetrieveMediaContent: ?RetrieveMediaContent;
33-
_narrationRetrieveMediaContent: ?RetrieveMediaContent;
33+
_retrieveMediaContentFromAuthority: ?RetrieveMediaContent;
3434

3535
constructor (options: Object) {
3636
super(options);
@@ -44,7 +44,7 @@ export default class OraclePartitionConnection extends PartitionConnection {
4444
}
4545

4646
getRetrieveMediaContent () {
47-
return this._narrationRetrieveMediaContent || this._authorityRetrieveMediaContent;
47+
return this._retrieveMediaContentFromAuthority;
4848
}
4949

5050
isConnected (): boolean {
@@ -85,30 +85,47 @@ export default class OraclePartitionConnection extends PartitionConnection {
8585

8686
async narrateEventLog (options: NarrateOptions = {}): Promise<any> {
8787
const ret = {};
88-
const retrievals = {};
8988
try {
90-
return await _narrateEventLog(this, options, ret, retrievals);
89+
return await _narrateEventLog(this, options, ret);
9190
} catch (error) {
9291
throw this.wrapErrorEvent(error, "narrateEventLog()",
9392
"\n\toptions:", ...dumpObject(options),
9493
"\n\tcurrent ret:", ...dumpObject(ret));
95-
} finally {
96-
if (options.retrieveMediaContent) {
97-
delete this._narrationRetrieveMediaContent;
98-
}
9994
}
10095
}
10196

102-
async _receiveTruth (originName: string, truthEvent: UniversalEvent): Promise<Object> {
97+
async chronicleEventLog (eventLog: UniversalEvent[], options: ChronicleOptions = {}):
98+
Promise<any> {
99+
const ret = {};
100+
try {
101+
return await _chronicleEventLog(this, eventLog, options, ret);
102+
} catch (error) {
103+
throw this.wrapErrorEvent(error, "chronicleEventLog()",
104+
"\n\toptions:", ...dumpObject(options),
105+
"\n\tcurrent ret:", ...dumpObject(ret),
106+
);
107+
}
108+
}
109+
110+
createReceiveTruth (originName: string) {
111+
return this._receiveTruthOf.bind(this, { name: originName });
112+
}
113+
114+
createReceiveTruthBatch (batchName: string,
115+
retrieveMediaContent: Function = this.getRetrieveMediaContent()) {
116+
return _createReceiveTruthBatch(this, batchName, retrieveMediaContent);
117+
}
118+
119+
async _receiveTruthOf (group: Object, truthEvent: UniversalEvent): Promise<Object> {
103120
const partitionData = truthEvent.partitions && truthEvent.partitions[this.partitionRawId()];
104121
try {
105122
if (!partitionData) {
106-
throw new Error(`truthEvent is missing partition info for ${this.debugId()}`);
123+
throw new Error(`truthEvent of '${group.name}' has no partition ${this.debugId()} info`);
107124
}
108-
return _onConfirmTruth(this, originName, authorizedEvent, partitionData);
125+
return _receiveTruthOf(this, group, partitionData.eventId, truthEvent);
109126
} catch (error) {
110-
throw this.wrapErrorEvent(error, `_receiveTruth('${originName}')`,
111-
"\n\toriginName:", originName,
127+
throw this.wrapErrorEvent(error, `_receiveTruthOf('${group.name}')`,
128+
"\n\tgroup:", ...dumpObject(group),
112129
"\n\teventId:", partitionData && partitionData.eventId,
113130
"\n\ttruthEvent:", ...dumpObject(truthEvent),
114131
"\n\tthis:", ...dumpObject(this));
@@ -119,8 +136,7 @@ export default class OraclePartitionConnection extends PartitionConnection {
119136
return this.getScribeConnection().claimCommandEvent(command, this.getRetrieveMediaContent());
120137
}
121138

122-
_preAuthorizeCommand = (preAuthorizedEvent: Object) =>
123-
this._receiveTruth("preAuthorizer", preAuthorizedEvent)
139+
_preAuthorizeCommand = this.createReceiveTruth("preAuthorizer")
124140

125141
// Coming from downstream: tries scribe first, otherwise forwards the request to authority.
126142
// In latter case forwards the result received from authority to Scribe for caching.

0 commit comments

Comments
 (0)