-
Notifications
You must be signed in to change notification settings - Fork 553
improvement(staging-mode): Clarify Squash/Rollback behavior for ContainerMessageType.GC #24774
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -786,8 +786,12 @@ function canStageMessageOfType( | |
| ContainerMessageType.GC | ||
| ContainerMessageType.DocumentSchemaChange { | ||
return ( | ||
// These are user changes coming up from the runtime's DataStores | ||
type === ContainerMessageType.FluidDataStoreOp || | ||
// GC ops are used to detect issues in the reference graph so all clients can repair their GC state. | ||
// These can be submitted at any time, including while in Staging Mode. | ||
type === ContainerMessageType.GC || | ||
// These are typically sent shortly after boot and will not be common in Staging Mode, but it's possible. | ||
type === ContainerMessageType.DocumentSchemaChange | ||
); | ||
} | ||
|
@@ -1182,6 +1186,8 @@ export class ContainerRuntime | |
|
||
public readonly clientDetails: IClientDetails; | ||
|
||
private readonly isSummarizerClient: boolean; | ||
|
||
public get storage(): IDocumentStorageService { | ||
return this._storage; | ||
} | ||
|
@@ -1528,6 +1534,11 @@ export class ContainerRuntime | |
this.mc = createChildMonitoringContext({ | ||
logger: this.baseLogger, | ||
namespace: "ContainerRuntime", | ||
properties: { | ||
all: { | ||
inStagingMode: this.inStagingMode, | ||
}, | ||
}, | ||
}); | ||
|
||
// If we support multiple algorithms in the future, then we would need to manage it here carefully. | ||
|
@@ -1584,7 +1595,7 @@ export class ContainerRuntime | |
// Values are generally expected to be set from the runtime side. | ||
this.options = options ?? {}; | ||
this.clientDetails = clientDetails; | ||
const isSummarizerClient = this.clientDetails.type === summarizerClientType; | ||
this.isSummarizerClient = this.clientDetails.type === summarizerClientType; | ||
this.loadedFromVersionId = context.getLoadedFromVersion()?.id; | ||
// eslint-disable-next-line unicorn/consistent-destructuring | ||
this._getClientId = () => context.clientId; | ||
|
@@ -1625,7 +1636,7 @@ export class ContainerRuntime | |
); | ||
|
||
// In cases of summarizer, we want to dispose instead since consumer doesn't interact with this container | ||
this.closeFn = isSummarizerClient ? this.disposeFn : closeFn; | ||
this.closeFn = this.isSummarizerClient ? this.disposeFn : closeFn; | ||
|
||
let loadSummaryNumber: number; | ||
// Get the container creation metadata. For new container, we initialize these. For existing containers, | ||
|
@@ -1786,7 +1797,7 @@ export class ContainerRuntime | |
existing, | ||
metadata, | ||
createContainerMetadata: this.createContainerMetadata, | ||
isSummarizerClient, | ||
isSummarizerClient: this.isSummarizerClient, | ||
getNodePackagePath: async (nodePath: string) => this.getGCNodePackagePath(nodePath), | ||
getLastSummaryTimestampMs: () => this.messageAtLastSummary?.timestamp, | ||
readAndParseBlob: async <T>(id: string) => readAndParse<T>(this.storage, id), | ||
|
@@ -2151,8 +2162,7 @@ export class ContainerRuntime | |
maxOpsSinceLastSummary, | ||
); | ||
|
||
const isSummarizerClient = this.clientDetails.type === summarizerClientType; | ||
if (isSummarizerClient) { | ||
if (this.isSummarizerClient) { | ||
// We want to dynamically import any thing inside summaryDelayLoadedModule module only when we are the summarizer client, | ||
// so that all non summarizer clients don't have to load the code inside this module. | ||
const module = await import( | ||
|
@@ -4641,51 +4651,87 @@ export class ContainerRuntime | |
|
||
/** | ||
* Resubmits each message in the batch, and then flushes the outbox. | ||
* This typically happens when we reconnect and there are pending messages. | ||
* | ||
* @remarks - If the "Offline Load" feature is enabled, the batchId is included in the resubmitted messages, | ||
* @remarks | ||
* Attempting to resubmit a batch that has been successfully sequenced will not happen due to | ||
* checks in the ConnectionStateHandler (Loader layer) | ||
* | ||
* The only exception to this would be if the Container "forks" due to misuse of the "Offline Load" feature. | ||
* If the "Offline Load" feature is enabled, the batchId is included in the resubmitted messages, | ||
* for correlation to detect container forking. | ||
*/ | ||
private reSubmitBatch( | ||
batch: PendingMessageResubmitData[], | ||
{ batchId, staged, squash }: PendingBatchResubmitMetadata, | ||
): void { | ||
assert( | ||
this._summarizer === undefined, | ||
0x8f2 /* Summarizer never reconnects so should never resubmit */, | ||
); | ||
|
||
const resubmitInfo = { | ||
// Only include Batch ID if "Offline Load" feature is enabled | ||
// It's only needed to identify batches across container forks arising from misuse of offline load. | ||
batchId: this.offlineEnabled ? batchId : undefined, | ||
staged, | ||
}; | ||
|
||
const resubmitFn = squash | ||
? this.reSubmitWithSquashing.bind(this) | ||
: this.reSubmit.bind(this); | ||
|
||
this.batchRunner.run(() => { | ||
for (const message of batch) { | ||
this.reSubmit(message, squash); | ||
resubmitFn(message); | ||
} | ||
}, resubmitInfo); | ||
|
||
this.flush(resubmitInfo); | ||
} | ||
|
||
private reSubmit(message: PendingMessageResubmitData, squash: boolean): void { | ||
this.reSubmitCore(message.runtimeOp, message.localOpMetadata, message.opMetadata, squash); | ||
} | ||
|
||
/** | ||
* Finds the right store and asks it to resubmit the message. This typically happens when we | ||
* reconnect and there are pending messages. | ||
* ! Note: successfully resubmitting an op that has been successfully sequenced is not possible due to checks in the ConnectionStateHandler (Loader layer) | ||
* @param message - The original LocalContainerRuntimeMessage. | ||
* @param localOpMetadata - The local metadata associated with the original message. | ||
* Resubmit the given message as part of a squash rebase upon exiting Staging Mode. | ||
* How exactly to resubmit the message is up to the subsystem that submitted the op to begin with. | ||
*/ | ||
private reSubmitCore( | ||
message: LocalContainerRuntimeMessage, | ||
localOpMetadata: unknown, | ||
opMetadata: Record<string, unknown> | undefined, | ||
squash: boolean, | ||
): void { | ||
private reSubmitWithSquashing(resubmitData: PendingMessageResubmitData): void { | ||
const message = resubmitData.runtimeOp; | ||
assert( | ||
this._summarizer === undefined, | ||
0x8f2 /* Summarizer never reconnects so should never resubmit */, | ||
canStageMessageOfType(message.type), | ||
"Expected message type to be compatible with staging", | ||
); | ||
switch (message.type) { | ||
case ContainerMessageType.FluidDataStoreOp: { | ||
this.channelCollection.reSubmit( | ||
message.type, | ||
message.contents, | ||
resubmitData.localOpMetadata, | ||
/* squash: */ true, | ||
); | ||
break; | ||
} | ||
// NOTE: Squash doesn't apply to GC or DocumentSchemaChange ops, fallback to typical resubmit logic. | ||
case ContainerMessageType.GC: | ||
case ContainerMessageType.DocumentSchemaChange: { | ||
this.reSubmit(resubmitData); | ||
break; | ||
} | ||
default: { | ||
unreachableCase(message.type); | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Resubmit the given message which was previously submitted to the ContainerRuntime but not successfully | ||
* transmitted to the ordering service (e.g. due to a disconnect, or being in Staging Mode) | ||
* How to resubmit is up to the subsystem that submitted the op to begin with | ||
*/ | ||
private reSubmit({ | ||
runtimeOp: message, | ||
localOpMetadata, | ||
opMetadata, | ||
}: PendingMessageResubmitData): void { | ||
switch (message.type) { | ||
case ContainerMessageType.FluidDataStoreOp: | ||
case ContainerMessageType.Attach: | ||
|
@@ -4696,7 +4742,7 @@ export class ContainerRuntime | |
message.type, | ||
message.contents, | ||
localOpMetadata, | ||
squash, | ||
/* squash: */ false, | ||
); | ||
break; | ||
} | ||
|
@@ -4752,7 +4798,16 @@ export class ContainerRuntime | |
this.channelCollection.rollback(type, contents, localOpMetadata); | ||
break; | ||
} | ||
case ContainerMessageType.GC: | ||
case ContainerMessageType.GC: { | ||
// Just drop it, but log an error, this is not expected and not ideal, but not critical failure either. | ||
// Currently the only expected type here is TombstoneLoaded, which will have been preceded by one of these events as well: | ||
// GC_Tombstone_DataStore_Requested, GC_Tombstone_SubDataStore_Requested, GC_Tombstone_Blob_Requested | ||
this.mc.logger.sendErrorEvent({ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it possible to log more details here such as the id of the object, pkg, etc? That will help debugging if and when we see this log. As of now, debugging this will be very hard. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it'll be ok because there will be a corresponding |
||
eventName: "GC_OpDiscarded", | ||
details: { subType: contents.type }, | ||
}); | ||
break; | ||
} | ||
case ContainerMessageType.DocumentSchemaChange: { | ||
throw new Error(`Handling ${type} ops in rolled back batch not yet implemented`); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How come you added a hex code here? Is this copilot generated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's just a move from LHS line 4687