Skip to content

Enable rollback for ConsensusRegisterCollection #24734

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ export class ConsensusRegisterCollectionClass<T> extends SharedObject<IConsensus
read(key: string, readPolicy?: ReadPolicy): T | undefined;
// (undocumented)
readVersions(key: string): T[] | undefined;
// @sealed (undocumented)
protected rollback(content: unknown, localOpMetadata: unknown): void;
// (undocumented)
protected summarizeCore(serializer: IFluidSerializer): ISummaryTreeWithStats;
write(key: string, value: T): Promise<boolean>;
Expand Down
86 changes: 74 additions & 12 deletions packages/dds/register-collection/src/consensusRegisterCollection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* Licensed under the MIT License.
*/

import { bufferToString } from "@fluid-internal/client-utils";
import { bufferToString, createEmitter } from "@fluid-internal/client-utils";
import { assert, unreachableCase } from "@fluidframework/core-utils/internal";
import {
IChannelAttributes,
Expand Down Expand Up @@ -101,11 +101,20 @@ type IIncomingRegisterOperation<T> = IRegisterOperationSerialized | IRegisterOpe
const incomingOpMatchesPlainFormat = <T>(op): op is IRegisterOperationPlain<T> =>
"value" in op;

/** The type of the resolve function to call after the local operation is ack'd */
type PendingResolve = (winner: boolean) => void;

const snapshotFileName = "header";

interface IConsensusRegisterCollectionInternalEvents {
/**
* Emitted when a pending message is rolled back.
*/
pendingMessageRollback: (rollbackMessageId: number) => void;

/**
* Emitted when a pending message is acknowledged.
*/
pendingMessageAck: (ackMessageId: number, winner: boolean) => void;
}

/**
* {@inheritDoc IConsensusRegisterCollection}
* @legacy
Expand All @@ -116,6 +125,10 @@ export class ConsensusRegisterCollection<T>
implements IConsensusRegisterCollection<T>
{
private readonly data = new Map<string, ILocalData<T>>();
private readonly internalEvents =
createEmitter<IConsensusRegisterCollectionInternalEvents>();

private nextPendingMessageId: number = 0;

/**
* Constructs a new consensus register collection. If the object is non-local an id and service interfaces will
Expand All @@ -136,6 +149,11 @@ export class ConsensusRegisterCollection<T>
* @returns Promise<true> if write was non-concurrent
*/
public async write(key: string, value: T): Promise<boolean> {
if (this.runtime.disposed) {
// Return false if disposed to signify that we did not write.
return false;
}

if (!this.isAttached()) {
this.processInboundWrite(key, value, 0, 0, true);
return true;
Expand All @@ -152,12 +170,46 @@ export class ConsensusRegisterCollection<T>
refSeq: this.deltaManager.lastSequenceNumber,
};

return this.newAckBasedPromise<boolean>((resolve) => {
// Send the resolve function as the localOpMetadata. This will be provided back to us when the
// op is ack'd.
this.submitLocalMessage(message, resolve);
// If we fail due to runtime being disposed, it's better to return false then unhandled exception.
}).catch((error) => false);
const pendingMessageId = this.nextPendingMessageId++;

// There are three ways the write promise can resolve:
// 1. The write is acked
// 2. The write is rolled back
// 3. The runtime is disposed
// The boolean value returned by the promise is true if the attempted write was ack'd and won, false otherwise.
return new Promise<boolean>((resolve) => {
Copy link
Preview

Copilot AI May 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Add inline comments explaining the use of pendingMessageId along with the internalEvents listeners in the write method to improve readability and maintainability.

Copilot uses AI. Check for mistakes.

const handleAck = (ackMessageId: number, winner: boolean) => {
if (ackMessageId === pendingMessageId) {
resolve(winner);
removeListeners();
}
};

const handleRollback = (rollbackMessageId: number) => {
if (rollbackMessageId === pendingMessageId) {
// If we rolled back the pending message, resolve the promise with false.
resolve(false);
removeListeners();
}
};

const handleDisposed = () => {
resolve(false);
removeListeners();
};

const removeListeners = () => {
this.internalEvents.off("pendingMessageAck", handleAck);
this.internalEvents.off("pendingMessageRollback", handleRollback);
this.runtime.off("dispose", handleDisposed);
};

this.internalEvents.on("pendingMessageAck", handleAck);
this.internalEvents.on("pendingMessageRollback", handleRollback);
this.runtime.on("dispose", handleDisposed);

this.submitLocalMessage(message, pendingMessageId);
});
}

/**
Expand Down Expand Up @@ -252,8 +304,8 @@ export class ConsensusRegisterCollection<T>
);
if (local) {
// Resolve the pending promise for this operation now that we have received an ack for it.
const resolve = localOpMetadata as PendingResolve;
resolve(winner);
const ackMessageId = localOpMetadata as number;
this.internalEvents.emit("pendingMessageAck", ackMessageId, winner);
}
break;
}
Expand Down Expand Up @@ -343,6 +395,16 @@ export class ConsensusRegisterCollection<T>
return serializer.parse(content);
}

/**
* @sealed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Semantic docs?

*/
protected rollback(content: unknown, localOpMetadata: unknown): void {
// We don't need to do anything to roll back CRC, it's safe to just drop
// the op on the floor since we don't modify the DDS until the ack.
// We emit an internal event so we know to resolve the pending promise.
this.internalEvents.emit("pendingMessageRollback", localOpMetadata as number);
}

protected applyStashedOp(): void {
// empty implementation
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,12 @@
import { strict as assert } from "assert";

import { describeCompat } from "@fluid-private/test-version-utils";
import { IFluidHandle } from "@fluidframework/core-interfaces";
import type { IContainer } from "@fluidframework/container-definitions/internal";
import type {
ConfigTypes,
IConfigProviderBase,
IFluidHandle,
} from "@fluidframework/core-interfaces";
import type { ISharedMap } from "@fluidframework/map/internal";
import type { IConsensusRegisterCollection } from "@fluidframework/register-collection/internal";
import {
Expand Down Expand Up @@ -37,14 +42,15 @@ describeCompat("ConsensusRegisterCollection", "FullCompat", (getTestObjectProvid
beforeEach("getTestObjectProvider", () => {
provider = getTestObjectProvider();
});
let container1: IContainer;
let dataStore1: ITestFluidObject;
let sharedMap1: ISharedMap;
let sharedMap2: ISharedMap;
let sharedMap3: ISharedMap;

beforeEach("createSharedMaps", async () => {
// Create a Container for the first client.
const container1 = await provider.makeTestContainer(testContainerConfig);
container1 = await provider.makeTestContainer(testContainerConfig);
dataStore1 = await getContainerEntryPointBackCompat<ITestFluidObject>(container1);
sharedMap1 = await dataStore1.getSharedObject<ISharedMap>(mapId);

Expand Down Expand Up @@ -254,6 +260,21 @@ describeCompat("ConsensusRegisterCollection", "FullCompat", (getTestObjectProvid
assert.strictEqual(versions6[0], "value10", "Happened after value did not overwrite");
});

it("Resolves write promise with false when disposed", async () => {
const collection1 = ConsensusRegisterCollection.create(dataStore1.runtime);
sharedMap1.set("collection", collection1.handle);
await provider.ensureSynchronized();
const write1P = collection1.write("key1", "value1");
if (container1.dispose !== undefined) {
Copy link
Preview

Copilot AI May 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Consider checking for the existence of the dispose method using 'typeof container1.dispose === "function"' to ensure the check is robust.

Suggested change
if (container1.dispose !== undefined) {
if (typeof container1.dispose === "function") {

Copilot uses AI. Check for mistakes.

container1.dispose();
} else {
// 1.4.0 doesn't have dispose, close is good enough
container1.close();
}
const write1Result = await write1P;
assert.strictEqual(write1Result, false, "Write should resolve with false when disposed");
});

it("Can store handles", async () => {
// Set up the collection with two handles and add it to the map so other containers can find it
const collection1 = ConsensusRegisterCollection.create(dataStore1.runtime);
Expand Down Expand Up @@ -281,10 +302,8 @@ describeCompat("ConsensusRegisterCollection", "FullCompat", (getTestObjectProvid
});
});

describeCompat(
"ConsensusRegisterCollection grouped batching",
"NoCompat",
(getTestObjectProvider, apis) => {
describeCompat("ConsensusRegisterCollection", "NoCompat", (getTestObjectProvider, apis) => {
describe("grouped batching", () => {
const { SharedMap, ConsensusRegisterCollection } = apis.dds;
const registry: ChannelFactoryRegistry = [
[mapId, SharedMap.getFactory()],
Expand Down Expand Up @@ -314,5 +333,64 @@ describeCompat(
await Promise.all([write1P, write2P]);
await provider.ensureSynchronized();
});
},
);
});

describe("rollback", () => {
const { SharedMap, ConsensusRegisterCollection } = apis.dds;

const registry: ChannelFactoryRegistry = [
[mapId, SharedMap.getFactory()],
[undefined, ConsensusRegisterCollection.getFactory()],
];
const configProvider = (settings: Record<string, ConfigTypes>): IConfigProviderBase => ({
getRawConfig: (name: string): ConfigTypes => settings[name],
});
const testContainerConfig: ITestContainerConfig = {
fluidDataObjectType: DataObjectFactoryType.Test,
registry,
loaderProps: {
configProvider: configProvider({
"Fluid.ContainerRuntime.EnableRollback": true,
}),
},
};

let provider: ITestObjectProvider;
beforeEach("getTestObjectProvider", () => {
provider = getTestObjectProvider();
});
let dataStore1: ITestFluidObject;
let sharedMap1: ISharedMap;

beforeEach("createSharedMaps", async () => {
// Create a Container for the first client.
const container1 = await provider.makeTestContainer(testContainerConfig);
dataStore1 = await getContainerEntryPointBackCompat<ITestFluidObject>(container1);
sharedMap1 = await dataStore1.getSharedObject<ISharedMap>(mapId);
});

it("Resolves write promise with false when rollback happens", async () => {
const collection1 = ConsensusRegisterCollection.create(dataStore1.runtime);
sharedMap1.set("collection", collection1.handle);
await provider.ensureSynchronized();
let write1P: Promise<boolean> | undefined;
let error: Error | undefined;
try {
dataStore1.context.containerRuntime.orderSequentially(() => {
write1P = collection1.write("key1", "value1");
throw new Error("Force rollback");
});
} catch (err) {
error = err as Error;
}
assert.notStrictEqual(error, undefined, "Expect the error we threw");
assert.notStrictEqual(write1P, undefined, "Write promise should be defined");
const write1Result = await write1P;
assert.strictEqual(
write1Result,
false,
"Write should resolve with false when rolled back",
);
});
});
});
Loading