Skip to content

Commit f8ca762

Browse files
committed
fix: Connect sequencing issue with Scribe connections
1 parent 9eb3582 commit f8ca762

File tree

8 files changed

+66
-55
lines changed

8 files changed

+66
-55
lines changed

packages/prophet/AuthorityProphet/AuthorityPartitionConnection.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ export default class AuthorityPartitionConnection extends PartitionConnection {
2929

3030
_doConnect (/* options: ConnectOptions, onError: Function */) {}
3131

32-
async narrateEventLog (options: ?NarrateOptions = {}): Promise<any> {
32+
narrateEventLog (options: ?NarrateOptions = {}): Promise<any> {
3333
return !options ? undefined : {};
3434
}
3535

packages/prophet/FalseProphet/FalseProphetPartitionConnection.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import ValaaReference from "~/raem/ValaaReference";
66
import type { VRef } from "~/raem/ValaaReference"; // eslint-disable-line no-duplicate-imports
77

88
import PartitionConnection from "~/prophet/api/PartitionConnection";
9-
import { NarrateOptions, ChronicleOptions, ChronicleRequest } from "~/prophet/api/types";
9+
import { ChronicleOptions, ChronicleRequest } from "~/prophet/api/types";
1010
import { initializeAspects, obtainAspect, tryAspect } from "~/prophet/tools/EventAspects";
1111
import EVENT_VERSION from "~/prophet/tools/EVENT_VERSION";
1212

packages/prophet/Scribe/ScribePartitionConnection.js

Lines changed: 2 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -88,29 +88,8 @@ export default class ScribePartitionConnection extends PartitionConnection {
8888
this._prophet._persistedMediaLookup[mediaRawId] = info;
8989
}
9090
},
91-
() => (options.narrateOptions && this.narrateEventLog(options.narrateOptions)),
92-
(narration) => {
93-
if (!narration) return narration;
94-
const actionCount = Object.values(narration).reduce(
95-
(s, log) => s + (Array.isArray(log) ? log.length : 0),
96-
options.eventIdBegin || 0);
97-
if (!actionCount && (options.newPartition === false)) {
98-
throw new Error(`No events found when connecting to an expected existing partition '${
99-
this.getPartitionURI().toString()}'`);
100-
} else if (actionCount && (options.newPartition === true)) {
101-
throw new Error(`Existing events found when trying to create a new partition '${
102-
this.getPartitionURI().toString()}'`);
103-
}
104-
if ((options.requireLatestMediaContents !== false)
105-
&& (narration.mediaRetrievalStatus || { latestFailures: [] }).latestFailures.length) {
106-
// FIXME(iridian): This error temporarily demoted to log error
107-
this.outputErrorEvent(new Error(`Failed to connect to partition: encountered ${
108-
narration.mediaRetrievalStatus.latestFailures.length
109-
} latest media content retrieval failures (and ${
110-
""}options.requireLatestMediaContents does not equal false).`));
111-
}
112-
return narration;
113-
},
91+
() => ((options.narrateOptions !== false)
92+
&& this.narrateEventLog(options.narrateOptions)),
11493
], onError);
11594
}
11695

packages/prophet/Scribe/_eventOps.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,8 @@ export async function _narrateEventLog (connection: ScribePartitionConnection,
5656
}));
5757

5858
if ((options.fullNarrate !== true)
59-
&& ((ret.scribeEventLog || []).length || (ret.scribeCommandQueue || []).length)) {
59+
&& (options.newPartition
60+
|| (ret.scribeEventLog || []).length || (ret.scribeCommandQueue || []).length)) {
6061
connection.logEvent(2, () => [
6162
"Initiated async upstream narration, local narration results:", ret,
6263
]);

packages/prophet/api/PartitionConnection.js

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ import { ConnectOptions, MediaInfo, NarrateOptions, ChronicleOptions, ChronicleR
1010
import Follower from "~/prophet/api/Follower";
1111

1212
import Logger from "~/tools/Logger";
13-
import { dumpObject, invariantifyArray, invariantifyObject, thenChainEagerly } from "~/tools";
13+
import { dumpObject, invariantifyArray, invariantifyObject, isPromise, thenChainEagerly }
14+
from "~/tools";
1415

1516
/**
1617
* Interface for sending commands to upstream and registering for downstream truth updates
@@ -123,8 +124,29 @@ export default class PartitionConnection extends Follower {
123124
"\n\tBegun connecting with options", ...dumpObject(options), ...dumpObject(this)
124125
]);
125126
return (this._activeConnection = thenChainEagerly(
126-
this._doConnect(options, onError),
127+
this._doConnect(Object.create(options), onError),
127128
(connectResults) => {
129+
if (options.narrateOptions !== false) {
130+
const actionCount = Object.values(connectResults).reduce(
131+
(s, log) => s + (Array.isArray(log) ? log.length : 0),
132+
options.eventIdBegin || 0);
133+
if (!actionCount && (options.newPartition === false)) {
134+
throw new Error(`No events found when connecting to an existing partition '${
135+
this.getPartitionURI().toString()}'`);
136+
} else if (actionCount && (options.newPartition === true)) {
137+
throw new Error(`Existing events found when trying to create a new partition '${
138+
this.getPartitionURI().toString()}'`);
139+
}
140+
if ((options.requireLatestMediaContents !== false)
141+
&& (connectResults.mediaRetrievalStatus
142+
|| { latestFailures: [] }).latestFailures.length) {
143+
// FIXME(iridian): This error temporarily demoted to log error
144+
this.outputErrorEvent(new Error(`Failed to connect to partition: encountered ${
145+
connectResults.mediaRetrievalStatus.latestFailures.length
146+
} latest media content retrieval failures (and ${
147+
""}options.requireLatestMediaContents does not equal false).`));
148+
}
149+
}
128150
this.warnEvent(1, () => [
129151
"\n\tDone connecting with results:", connectResults,
130152
"\n\tstatus:", this.getStatus(),
@@ -135,20 +157,24 @@ export default class PartitionConnection extends Follower {
135157
));
136158
} catch (error) { return onError(error); }
137159
function errorOnConnect (wrapper, error) {
138-
throw this.wrapErrorEvent(error, wrapper, "\n\toptions:", ...dumpObject(options));
160+
throw this.wrapErrorEvent(error, wrapper,
161+
"\n\toptions:", ...dumpObject(options));
139162
}
140163
}
141164

142165
_doConnect (options: ConnectOptions, onError: Function) {
143-
if (!this._prophet._upstream) throw new Error("Cannot connect: upstream missing");
166+
if (!this._prophet._upstream) {
167+
throw new Error("Cannot connect using default _doConnect with no upstream");
168+
}
144169
options.receiveTruths = this.getReceiveTruths(options.receiveTruths);
145170
options.receiveCommands = this.getReceiveCommands(options.receiveCommands);
146-
const postponeNarrateOptions = options.narrateOptions;
171+
const postponedNarrateOptions = options.narrateOptions;
147172
options.narrateOptions = false;
148173
this.setUpstreamConnection(this._prophet._upstream.acquirePartitionConnection(
149174
this.getPartitionURI(), options));
150175
return thenChainEagerly(this._upstreamConnection.getActiveConnection(),
151-
() => (postponeNarrateOptions && this.narrateEventLog(postponeNarrateOptions)),
176+
() => ((postponedNarrateOptions !== false)
177+
&& this.narrateEventLog(postponedNarrateOptions)),
152178
onError);
153179
}
154180

@@ -171,6 +197,7 @@ export default class PartitionConnection extends Follower {
171197
}
172198

173199
setUpstreamConnection (connection: PartitionConnection) {
200+
if (isPromise(connection)) throw new Error("setUpstreamConnection must not be a promise");
174201
this._upstreamConnection = connection;
175202
}
176203

@@ -196,7 +223,8 @@ export default class PartitionConnection extends Follower {
196223
this.getName()}`);
197224
}
198225
if (this._activeConnection) return this._activeConnection;
199-
throw new Error(`Connection not being activated`);
226+
throw new Error(
227+
`Cannot get an active connection promise from connection which is not being activated`);
200228
} catch (error) {
201229
throw this.wrapErrorEvent(error, new Error(`getActiveConnection(${
202230
requireSynchronous ? "sync" : "async"})`));

packages/prophet/api/Prophet.js

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -130,19 +130,18 @@ export default class Prophet extends LogEventGenerator {
130130
throw new Error(
131131
"Can't create new partition connection with options.newConnection === false");
132132
}
133-
connection = this._createPartitionConnection(partitionURI, options);
133+
connection = this._createPartitionConnection(partitionURI, { ...options, connect: false });
134134
if (!connection) return undefined;
135135
connection.addReference();
136136
this._connections[String(partitionURI)] = connection;
137-
if (options.connect !== false) {
138-
connection.connect(options); // Initiates the connection but doesn't wait for it to complete.
139-
}
137+
if (options.connect !== false) connection.connect(options); // Initiate connect but dont wait.
140138
return connection;
141139
} catch (error) {
142140
throw this.wrapErrorEvent(error,
143141
new Error(`acquirePartitionConnection(${String(partitionURI)})`),
144142
"\n\toptions:", ...dumpObject(options));
145143
}
144+
146145
/*
147146
if (options.newPartition || options.synchronous) {
148147
if (options.synchronous) return connection;

packages/prophet/test/ProphetTestHarness.js

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -131,16 +131,18 @@ export default class ProphetTestHarness extends ScriptTestHarness {
131131
this.testPartitionURI = options.testPartitionURI
132132
|| (options.testAuthorityURI && createPartitionURI(this.testAuthorityURI, "test_partition"))
133133
|| testPartitionURI;
134+
const activeTestPartitionConnection = this.prophet.acquirePartitionConnection(
135+
this.testPartitionURI, { newPartition: true }).getActiveConnection();
134136

135-
this.testPartitionConnection = thenChainEagerly(
136-
this.prophet.acquirePartitionConnection(this.testPartitionURI, { newPartition: true })
137-
.getActiveConnection(), [
138-
(connection) => Promise.all([
139-
connection,
140-
this.chronicleEvent(createdTestPartitionEntity, { isTruth: true }).getPremiereStory(),
141-
]),
142-
([connection]) => (this.testPartitionConnection = connection),
143-
]);
137+
this.testPartitionConnection = thenChainEagerly(activeTestPartitionConnection, [
138+
(connection) => {
139+
const testPartitionStory =
140+
this.chronicleEvent(createdTestPartitionEntity, { isTruth: true })
141+
.getPremiereStory();
142+
return Promise.all([connection, testPartitionStory]);
143+
},
144+
([connection]) => (this.testPartitionConnection = connection),
145+
]);
144146
}
145147

146148
chronicleEvents (events: EventBase[], ...rest: any) {

packages/raem/VALK/builtinSteppers.js

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -817,7 +817,7 @@ function _advanceCapture (valker, thisArgument, vakon, callScope, capturingValke
817817
: !advanceError ? `call/releaseTransaction (valk caller with active valker)`
818818
: `call/releaseTransaction({ abort: true }) (valk caller with active valker)`,
819819
...((transactionError && advanceError)
820-
? ["\n\t\tabort-cause:", ...dumpObject(advanceError)] : []),
820+
? ["\n\t\tadvance abort cause:", ...dumpObject(advanceError)] : []),
821821
"\n\tthis:", ...dumpObject(thisArgument),
822822
"\n\tcallee vakon:", ...dumpKuery(vakon),
823823
"\n\tscope:", ...dumpObject(callScope),
@@ -846,32 +846,34 @@ function _runCapture (valker, thisArgument, vakon, callScope, capturingValker: V
846846
advanceError = error;
847847
}
848848

849-
let releaseError;
849+
let transactionError;
850850
try {
851851
if (!advanceError) {
852852
transaction.releaseTransaction();
853853
return ret;
854854
}
855855
transaction.releaseTransaction({ abort: true });
856856
} catch (error) {
857-
releaseError = error;
857+
transactionError = error;
858858
}
859859
let opName;
860-
if (!advanceError) {
860+
if (transactionError) {
861+
const prefix = `call/releaseTransaction ${advanceError ? "({ abort: true })" : "()"}`;
861862
opName = (actualValker !== valker.rootDiscourse)
862-
? `call/releaseTransaction (non-valk caller in active transactional callback ${
863-
""}context)`
864-
: `call/releaseTransaction (non-valk caller as outermost context)`;
863+
? `${prefix} (non-valk caller in active transactional callback context)`
864+
: `${prefix} (non-valk caller as outermost context)`;
865865
} else if (actualValker !== valker.rootDiscourse) {
866866
opName = `call/run (non-valk caller in active transactional callback context)`;
867867
} else {
868868
opName = `call/run (non-valk caller as outermost context)`;
869-
const connectingMissingPartitions = tryConnectToMissingPartitionsAndThen(error,
869+
const connectingMissingPartitions = tryConnectToMissingPartitionsAndThen(advanceError,
870870
() => _runCapture(valker, thisArgument, vakon, callScope, capturingValker));
871871
if (connectingMissingPartitions) return connectingMissingPartitions;
872872
}
873873
throw capturingValker.addVALKRuntimeErrorStackFrame(
874-
actualValker.wrapErrorEvent(advanceError || releaseError, opName,
874+
actualValker.wrapErrorEvent(transactionError || advanceError, opName,
875+
...((transactionError && advanceError)
876+
? ["\n\t\tadvance abort cause:", ...dumpObject(advanceError)] : []),
875877
"\n\ttransaction:", ...dumpObject(transaction),
876878
"\n\tthis:", ...dumpObject(thisArgument),
877879
"\n\tcallee vakon:", ...dumpKuery(vakon),

0 commit comments

Comments
 (0)