-
Notifications
You must be signed in to change notification settings - Fork 549
Enable rollback for ConsensusRegisterCollection #24734
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,7 +3,7 @@ | |
* Licensed under the MIT License. | ||
*/ | ||
|
||
import { bufferToString } from "@fluid-internal/client-utils"; | ||
import { bufferToString, createEmitter } from "@fluid-internal/client-utils"; | ||
import { assert, unreachableCase } from "@fluidframework/core-utils/internal"; | ||
import { | ||
IChannelAttributes, | ||
|
@@ -101,11 +101,20 @@ type IIncomingRegisterOperation<T> = IRegisterOperationSerialized | IRegisterOpe | |
const incomingOpMatchesPlainFormat = <T>(op): op is IRegisterOperationPlain<T> => | ||
"value" in op; | ||
|
||
/** The type of the resolve function to call after the local operation is ack'd */ | ||
type PendingResolve = (winner: boolean) => void; | ||
|
||
const snapshotFileName = "header"; | ||
|
||
interface IConsensusRegisterCollectionInternalEvents { | ||
/** | ||
* Emitted when a pending message is rolled back. | ||
*/ | ||
pendingMessageRollback: (rollbackMessageId: number) => void; | ||
|
||
/** | ||
* Emitted when a pending message is acknowledged. | ||
*/ | ||
pendingMessageAck: (ackMessageId: number, winner: boolean) => void; | ||
} | ||
|
||
/** | ||
* {@inheritDoc IConsensusRegisterCollection} | ||
* @legacy | ||
|
@@ -116,6 +125,10 @@ export class ConsensusRegisterCollection<T> | |
implements IConsensusRegisterCollection<T> | ||
{ | ||
private readonly data = new Map<string, ILocalData<T>>(); | ||
private readonly internalEvents = | ||
createEmitter<IConsensusRegisterCollectionInternalEvents>(); | ||
|
||
private nextPendingMessageId: number = 0; | ||
|
||
/** | ||
* Constructs a new consensus register collection. If the object is non-local an id and service interfaces will | ||
|
@@ -136,6 +149,11 @@ export class ConsensusRegisterCollection<T> | |
* @returns Promise<true> if write was non-concurrent | ||
*/ | ||
public async write(key: string, value: T): Promise<boolean> { | ||
if (this.runtime.disposed) { | ||
// Return false if disposed to signify that we did not write. | ||
return false; | ||
} | ||
|
||
if (!this.isAttached()) { | ||
this.processInboundWrite(key, value, 0, 0, true); | ||
return true; | ||
|
@@ -152,12 +170,46 @@ export class ConsensusRegisterCollection<T> | |
refSeq: this.deltaManager.lastSequenceNumber, | ||
}; | ||
|
||
return this.newAckBasedPromise<boolean>((resolve) => { | ||
// Send the resolve function as the localOpMetadata. This will be provided back to us when the | ||
// op is ack'd. | ||
this.submitLocalMessage(message, resolve); | ||
// If we fail due to runtime being disposed, it's better to return false then unhandled exception. | ||
}).catch((error) => false); | ||
const pendingMessageId = this.nextPendingMessageId++; | ||
|
||
// There are three ways the write promise can resolve: | ||
// 1. The write is acked | ||
// 2. The write is rolled back | ||
// 3. The runtime is disposed | ||
// The boolean value returned by the promise is true if the attempted write was ack'd and won, false otherwise. | ||
return new Promise<boolean>((resolve) => { | ||
const handleAck = (ackMessageId: number, winner: boolean) => { | ||
if (ackMessageId === pendingMessageId) { | ||
resolve(winner); | ||
removeListeners(); | ||
} | ||
}; | ||
|
||
const handleRollback = (rollbackMessageId: number) => { | ||
if (rollbackMessageId === pendingMessageId) { | ||
// If we rolled back the pending message, resolve the promise with false. | ||
resolve(false); | ||
removeListeners(); | ||
} | ||
}; | ||
|
||
const handleDisposed = () => { | ||
resolve(false); | ||
removeListeners(); | ||
}; | ||
|
||
const removeListeners = () => { | ||
this.internalEvents.off("pendingMessageAck", handleAck); | ||
this.internalEvents.off("pendingMessageRollback", handleRollback); | ||
this.runtime.off("dispose", handleDisposed); | ||
}; | ||
|
||
this.internalEvents.on("pendingMessageAck", handleAck); | ||
this.internalEvents.on("pendingMessageRollback", handleRollback); | ||
this.runtime.on("dispose", handleDisposed); | ||
|
||
this.submitLocalMessage(message, pendingMessageId); | ||
}); | ||
} | ||
|
||
/** | ||
|
@@ -252,8 +304,8 @@ export class ConsensusRegisterCollection<T> | |
); | ||
if (local) { | ||
// Resolve the pending promise for this operation now that we have received an ack for it. | ||
const resolve = localOpMetadata as PendingResolve; | ||
resolve(winner); | ||
const ackMessageId = localOpMetadata as number; | ||
this.internalEvents.emit("pendingMessageAck", ackMessageId, winner); | ||
} | ||
break; | ||
} | ||
|
@@ -343,6 +395,16 @@ export class ConsensusRegisterCollection<T> | |
return serializer.parse(content); | ||
} | ||
|
||
/** | ||
* @sealed | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Semantic docs? |
||
*/ | ||
protected rollback(content: unknown, localOpMetadata: unknown): void { | ||
// We don't need to do anything to roll back CRC, it's safe to just drop | ||
// the op on the floor since we don't modify the DDS until the ack. | ||
// We emit an internal event so we know to resolve the pending promise. | ||
this.internalEvents.emit("pendingMessageRollback", localOpMetadata as number); | ||
} | ||
|
||
protected applyStashedOp(): void { | ||
// empty implementation | ||
} | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -6,7 +6,12 @@ | |||||
import { strict as assert } from "assert"; | ||||||
|
||||||
import { describeCompat } from "@fluid-private/test-version-utils"; | ||||||
import { IFluidHandle } from "@fluidframework/core-interfaces"; | ||||||
import type { IContainer } from "@fluidframework/container-definitions/internal"; | ||||||
import type { | ||||||
ConfigTypes, | ||||||
IConfigProviderBase, | ||||||
IFluidHandle, | ||||||
} from "@fluidframework/core-interfaces"; | ||||||
import type { ISharedMap } from "@fluidframework/map/internal"; | ||||||
import type { IConsensusRegisterCollection } from "@fluidframework/register-collection/internal"; | ||||||
import { | ||||||
|
@@ -37,14 +42,15 @@ describeCompat("ConsensusRegisterCollection", "FullCompat", (getTestObjectProvid | |||||
beforeEach("getTestObjectProvider", () => { | ||||||
provider = getTestObjectProvider(); | ||||||
}); | ||||||
let container1: IContainer; | ||||||
let dataStore1: ITestFluidObject; | ||||||
let sharedMap1: ISharedMap; | ||||||
let sharedMap2: ISharedMap; | ||||||
let sharedMap3: ISharedMap; | ||||||
|
||||||
beforeEach("createSharedMaps", async () => { | ||||||
// Create a Container for the first client. | ||||||
const container1 = await provider.makeTestContainer(testContainerConfig); | ||||||
container1 = await provider.makeTestContainer(testContainerConfig); | ||||||
dataStore1 = await getContainerEntryPointBackCompat<ITestFluidObject>(container1); | ||||||
sharedMap1 = await dataStore1.getSharedObject<ISharedMap>(mapId); | ||||||
|
||||||
|
@@ -254,6 +260,21 @@ describeCompat("ConsensusRegisterCollection", "FullCompat", (getTestObjectProvid | |||||
assert.strictEqual(versions6[0], "value10", "Happened after value did not overwrite"); | ||||||
}); | ||||||
|
||||||
it("Resolves write promise with false when disposed", async () => { | ||||||
const collection1 = ConsensusRegisterCollection.create(dataStore1.runtime); | ||||||
sharedMap1.set("collection", collection1.handle); | ||||||
await provider.ensureSynchronized(); | ||||||
const write1P = collection1.write("key1", "value1"); | ||||||
if (container1.dispose !== undefined) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [nitpick] Consider checking for the existence of the dispose method using 'typeof container1.dispose === "function"' to ensure the check is robust.
Suggested change
Copilot uses AI. Check for mistakes. Positive FeedbackNegative Feedback |
||||||
container1.dispose(); | ||||||
} else { | ||||||
// 1.4.0 doesn't have dispose, close is good enough | ||||||
container1.close(); | ||||||
} | ||||||
const write1Result = await write1P; | ||||||
assert.strictEqual(write1Result, false, "Write should resolve with false when disposed"); | ||||||
}); | ||||||
|
||||||
it("Can store handles", async () => { | ||||||
// Set up the collection with two handles and add it to the map so other containers can find it | ||||||
const collection1 = ConsensusRegisterCollection.create(dataStore1.runtime); | ||||||
|
@@ -281,10 +302,8 @@ describeCompat("ConsensusRegisterCollection", "FullCompat", (getTestObjectProvid | |||||
}); | ||||||
}); | ||||||
|
||||||
describeCompat( | ||||||
"ConsensusRegisterCollection grouped batching", | ||||||
"NoCompat", | ||||||
(getTestObjectProvider, apis) => { | ||||||
describeCompat("ConsensusRegisterCollection", "NoCompat", (getTestObjectProvider, apis) => { | ||||||
describe("grouped batching", () => { | ||||||
const { SharedMap, ConsensusRegisterCollection } = apis.dds; | ||||||
const registry: ChannelFactoryRegistry = [ | ||||||
[mapId, SharedMap.getFactory()], | ||||||
|
@@ -314,5 +333,64 @@ describeCompat( | |||||
await Promise.all([write1P, write2P]); | ||||||
await provider.ensureSynchronized(); | ||||||
}); | ||||||
}, | ||||||
); | ||||||
}); | ||||||
|
||||||
describe("rollback", () => { | ||||||
const { SharedMap, ConsensusRegisterCollection } = apis.dds; | ||||||
|
||||||
const registry: ChannelFactoryRegistry = [ | ||||||
[mapId, SharedMap.getFactory()], | ||||||
[undefined, ConsensusRegisterCollection.getFactory()], | ||||||
]; | ||||||
const configProvider = (settings: Record<string, ConfigTypes>): IConfigProviderBase => ({ | ||||||
getRawConfig: (name: string): ConfigTypes => settings[name], | ||||||
}); | ||||||
const testContainerConfig: ITestContainerConfig = { | ||||||
fluidDataObjectType: DataObjectFactoryType.Test, | ||||||
registry, | ||||||
loaderProps: { | ||||||
configProvider: configProvider({ | ||||||
"Fluid.ContainerRuntime.EnableRollback": true, | ||||||
}), | ||||||
}, | ||||||
}; | ||||||
|
||||||
let provider: ITestObjectProvider; | ||||||
beforeEach("getTestObjectProvider", () => { | ||||||
provider = getTestObjectProvider(); | ||||||
}); | ||||||
let dataStore1: ITestFluidObject; | ||||||
let sharedMap1: ISharedMap; | ||||||
|
||||||
beforeEach("createSharedMaps", async () => { | ||||||
// Create a Container for the first client. | ||||||
const container1 = await provider.makeTestContainer(testContainerConfig); | ||||||
dataStore1 = await getContainerEntryPointBackCompat<ITestFluidObject>(container1); | ||||||
sharedMap1 = await dataStore1.getSharedObject<ISharedMap>(mapId); | ||||||
}); | ||||||
|
||||||
it("Resolves write promise with false when rollback happens", async () => { | ||||||
const collection1 = ConsensusRegisterCollection.create(dataStore1.runtime); | ||||||
sharedMap1.set("collection", collection1.handle); | ||||||
await provider.ensureSynchronized(); | ||||||
let write1P: Promise<boolean> | undefined; | ||||||
let error: Error | undefined; | ||||||
try { | ||||||
dataStore1.context.containerRuntime.orderSequentially(() => { | ||||||
write1P = collection1.write("key1", "value1"); | ||||||
throw new Error("Force rollback"); | ||||||
}); | ||||||
} catch (err) { | ||||||
error = err as Error; | ||||||
} | ||||||
assert.notStrictEqual(error, undefined, "Expect the error we threw"); | ||||||
assert.notStrictEqual(write1P, undefined, "Write promise should be defined"); | ||||||
const write1Result = await write1P; | ||||||
assert.strictEqual( | ||||||
write1Result, | ||||||
false, | ||||||
"Write should resolve with false when rolled back", | ||||||
); | ||||||
}); | ||||||
}); | ||||||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Add inline comments explaining the use of pendingMessageId along with the internalEvents listeners in the write method to improve readability and maintainability.
Copilot uses AI. Check for mistakes.