Skip to content

improvement(staging-mode): Clarify Squash/Rollback behavior for ContainerMessageType.GC #24774

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 6, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 81 additions & 26 deletions packages/runtime/container-runtime/src/containerRuntime.ts
Original file line number Diff line number Diff line change
@@ -786,8 +786,12 @@ function canStageMessageOfType(
| ContainerMessageType.GC
| ContainerMessageType.DocumentSchemaChange {
return (
// These are user changes coming up from the runtime's DataStores
type === ContainerMessageType.FluidDataStoreOp ||
// GC ops are used to detect issues in the reference graph so all clients can repair their GC state.
// These can be submitted at any time, including while in Staging Mode.
type === ContainerMessageType.GC ||
// These are typically sent shortly after boot and will not be common in Staging Mode, but it's possible.
type === ContainerMessageType.DocumentSchemaChange
);
}
@@ -1182,6 +1186,8 @@ export class ContainerRuntime

public readonly clientDetails: IClientDetails;

private readonly isSummarizerClient: boolean;

public get storage(): IDocumentStorageService {
return this._storage;
}
@@ -1528,6 +1534,11 @@ export class ContainerRuntime
this.mc = createChildMonitoringContext({
logger: this.baseLogger,
namespace: "ContainerRuntime",
properties: {
all: {
inStagingMode: this.inStagingMode,
},
},
});

// If we support multiple algorithms in the future, then we would need to manage it here carefully.
@@ -1584,7 +1595,7 @@ export class ContainerRuntime
// Values are generally expected to be set from the runtime side.
this.options = options ?? {};
this.clientDetails = clientDetails;
const isSummarizerClient = this.clientDetails.type === summarizerClientType;
this.isSummarizerClient = this.clientDetails.type === summarizerClientType;
this.loadedFromVersionId = context.getLoadedFromVersion()?.id;
// eslint-disable-next-line unicorn/consistent-destructuring
this._getClientId = () => context.clientId;
@@ -1625,7 +1636,7 @@ export class ContainerRuntime
);

// In cases of summarizer, we want to dispose instead since consumer doesn't interact with this container
this.closeFn = isSummarizerClient ? this.disposeFn : closeFn;
this.closeFn = this.isSummarizerClient ? this.disposeFn : closeFn;

let loadSummaryNumber: number;
// Get the container creation metadata. For new container, we initialize these. For existing containers,
@@ -1786,7 +1797,7 @@ export class ContainerRuntime
existing,
metadata,
createContainerMetadata: this.createContainerMetadata,
isSummarizerClient,
isSummarizerClient: this.isSummarizerClient,
getNodePackagePath: async (nodePath: string) => this.getGCNodePackagePath(nodePath),
getLastSummaryTimestampMs: () => this.messageAtLastSummary?.timestamp,
readAndParseBlob: async <T>(id: string) => readAndParse<T>(this.storage, id),
@@ -2151,8 +2162,7 @@ export class ContainerRuntime
maxOpsSinceLastSummary,
);

const isSummarizerClient = this.clientDetails.type === summarizerClientType;
if (isSummarizerClient) {
if (this.isSummarizerClient) {
// We want to dynamically import any thing inside summaryDelayLoadedModule module only when we are the summarizer client,
// so that all non summarizer clients don't have to load the code inside this module.
const module = await import(
@@ -4641,51 +4651,87 @@ export class ContainerRuntime

/**
* Resubmits each message in the batch, and then flushes the outbox.
* This typically happens when we reconnect and there are pending messages.
*
* @remarks - If the "Offline Load" feature is enabled, the batchId is included in the resubmitted messages,
* @remarks
* Attempting to resubmit a batch that has been successfully sequenced will not happen due to
* checks in the ConnectionStateHandler (Loader layer)
*
* The only exception to this would be if the Container "forks" due to misuse of the "Offline Load" feature.
* If the "Offline Load" feature is enabled, the batchId is included in the resubmitted messages,
* for correlation to detect container forking.
*/
private reSubmitBatch(
batch: PendingMessageResubmitData[],
{ batchId, staged, squash }: PendingBatchResubmitMetadata,
): void {
assert(
this._summarizer === undefined,
0x8f2 /* Summarizer never reconnects so should never resubmit */,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How come you added a hex code here? Is this copilot generated?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just a move from LHS line 4687

);

const resubmitInfo = {
// Only include Batch ID if "Offline Load" feature is enabled
// It's only needed to identify batches across container forks arising from misuse of offline load.
batchId: this.offlineEnabled ? batchId : undefined,
staged,
};

const resubmitFn = squash
? this.reSubmitWithSquashing.bind(this)
: this.reSubmit.bind(this);

this.batchRunner.run(() => {
for (const message of batch) {
this.reSubmit(message, squash);
resubmitFn(message);
}
}, resubmitInfo);

this.flush(resubmitInfo);
}

private reSubmit(message: PendingMessageResubmitData, squash: boolean): void {
this.reSubmitCore(message.runtimeOp, message.localOpMetadata, message.opMetadata, squash);
}

/**
* Finds the right store and asks it to resubmit the message. This typically happens when we
* reconnect and there are pending messages.
* ! Note: successfully resubmitting an op that has been successfully sequenced is not possible due to checks in the ConnectionStateHandler (Loader layer)
* @param message - The original LocalContainerRuntimeMessage.
* @param localOpMetadata - The local metadata associated with the original message.
* Resubmit the given message as part of a squash rebase upon exiting Staging Mode.
* How exactly to resubmit the message is up to the subsystem that submitted the op to begin with.
*/
private reSubmitCore(
message: LocalContainerRuntimeMessage,
localOpMetadata: unknown,
opMetadata: Record<string, unknown> | undefined,
squash: boolean,
): void {
private reSubmitWithSquashing(resubmitData: PendingMessageResubmitData): void {
const message = resubmitData.runtimeOp;
assert(
this._summarizer === undefined,
0x8f2 /* Summarizer never reconnects so should never resubmit */,
canStageMessageOfType(message.type),
"Expected message type to be compatible with staging",
);
switch (message.type) {
case ContainerMessageType.FluidDataStoreOp: {
this.channelCollection.reSubmit(
message.type,
message.contents,
resubmitData.localOpMetadata,
/* squash: */ true,
);
break;
}
// NOTE: Squash doesn't apply to GC or DocumentSchemaChange ops, fallback to typical resubmit logic.
case ContainerMessageType.GC:
case ContainerMessageType.DocumentSchemaChange: {
this.reSubmit(resubmitData);
break;
}
default: {
unreachableCase(message.type);
}
}
}

/**
* Resubmit the given message which was previously submitted to the ContainerRuntime but not successfully
* transmitted to the ordering service (e.g. due to a disconnect, or being in Staging Mode)
* How to resubmit is up to the subsystem that submitted the op to begin with
*/
private reSubmit({
runtimeOp: message,
localOpMetadata,
opMetadata,
}: PendingMessageResubmitData): void {
switch (message.type) {
case ContainerMessageType.FluidDataStoreOp:
case ContainerMessageType.Attach:
@@ -4696,7 +4742,7 @@ export class ContainerRuntime
message.type,
message.contents,
localOpMetadata,
squash,
/* squash: */ false,
);
break;
}
@@ -4752,7 +4798,16 @@ export class ContainerRuntime
this.channelCollection.rollback(type, contents, localOpMetadata);
break;
}
case ContainerMessageType.GC:
case ContainerMessageType.GC: {
// Just drop it, but log an error, this is not expected and not ideal, but not critical failure either.
// Currently the only expected type here is TombstoneLoaded, which will have been preceded by one of these events as well:
// GC_Tombstone_DataStore_Requested, GC_Tombstone_SubDataStore_Requested, GC_Tombstone_Blob_Requested
this.mc.logger.sendErrorEvent({
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to log more details here such as the id of the object, pkg, etc? That will help debugging if and when we see this log. As of now, debugging this will be very hard.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it'll be ok because there will be a corresponding GC_Tombstone_DataStore_Requested (or other object) event with more info

eventName: "GC_OpDiscarded",
details: { subType: contents.type },
});
break;
}
case ContainerMessageType.DocumentSchemaChange: {
throw new Error(`Handling ${type} ops in rolled back batch not yet implemented`);
}
Original file line number Diff line number Diff line change
@@ -1045,6 +1045,8 @@ export class GarbageCollector implements IGarbageCollector {

// Any time we log a Tombstone Loaded error (via Telemetry Tracker),
// we want to also trigger autorecovery to avoid the object being deleted
// i.e. this will be preceded by one of these telemetry events;
// GC_Tombstone_DataStore_Requested, GC_Tombstone_SubDataStore_Requested, GC_Tombstone_Blob_Requested
// Note: We don't need to trigger on "Changed" because any change will cause the object
// to be loaded by the Summarizer, and auto-recovery will be triggered then.
if (isTombstoned && reason === "Loaded") {
11 changes: 11 additions & 0 deletions packages/runtime/container-runtime/src/pendingStateManager.ts
Original file line number Diff line number Diff line change
@@ -50,8 +50,19 @@ export interface IPendingMessage {
* Unless this pending message came from stashed content, in which case this was roundtripped through string
*/
runtimeOp?: LocalContainerRuntimeMessage | undefined; // Undefined for empty batches and initial messages before parsing
/**
* Local Op Metadata that was passed to the ContainerRuntime when the op was submitted.
* This contains state needed when processing the ack, or to resubmit or rollback the op.
*/
localOpMetadata: unknown;
/**
* Metadata that was passed to the ContainerRuntime when the op was submitted.
* This is rarely used, and may be inspected by the service (as opposed to op contents which is opaque)
*/
opMetadata: Record<string, unknown> | undefined;
/**
* Populated upon processing the op's ack, before moving the pending message to savedOps.
*/
sequenceNumber?: number;
/**
* Info about the batch this pending message belongs to, for validation and for computing the batchId on reconnect
Loading