@@ -786,8 +786,12 @@ function canStageMessageOfType(
786
786
| ContainerMessageType . GC
787
787
| ContainerMessageType . DocumentSchemaChange {
788
788
return (
789
+ // These are user changes coming up from the runtime's DataStores
789
790
type === ContainerMessageType . FluidDataStoreOp ||
791
+ // GC ops are used to detect issues in the reference graph so all clients can repair their GC state.
792
+ // These can be submitted at any time, including while in Staging Mode.
790
793
type === ContainerMessageType . GC ||
794
+ // These are typically sent shortly after boot and will not be common in Staging Mode, but it's possible.
791
795
type === ContainerMessageType . DocumentSchemaChange
792
796
) ;
793
797
}
@@ -1182,6 +1186,8 @@ export class ContainerRuntime
1182
1186
1183
1187
public readonly clientDetails : IClientDetails ;
1184
1188
1189
+ private readonly isSummarizerClient : boolean ;
1190
+
1185
1191
public get storage ( ) : IDocumentStorageService {
1186
1192
return this . _storage ;
1187
1193
}
@@ -1528,6 +1534,11 @@ export class ContainerRuntime
1528
1534
this . mc = createChildMonitoringContext ( {
1529
1535
logger : this . baseLogger ,
1530
1536
namespace : "ContainerRuntime" ,
1537
+ properties : {
1538
+ all : {
1539
+ inStagingMode : this . inStagingMode ,
1540
+ } ,
1541
+ } ,
1531
1542
} ) ;
1532
1543
1533
1544
// If we support multiple algorithms in the future, then we would need to manage it here carefully.
@@ -1584,7 +1595,7 @@ export class ContainerRuntime
1584
1595
// Values are generally expected to be set from the runtime side.
1585
1596
this . options = options ?? { } ;
1586
1597
this . clientDetails = clientDetails ;
1587
- const isSummarizerClient = this . clientDetails . type === summarizerClientType ;
1598
+ this . isSummarizerClient = this . clientDetails . type === summarizerClientType ;
1588
1599
this . loadedFromVersionId = context . getLoadedFromVersion ( ) ?. id ;
1589
1600
// eslint-disable-next-line unicorn/consistent-destructuring
1590
1601
this . _getClientId = ( ) => context . clientId ;
@@ -1625,7 +1636,7 @@ export class ContainerRuntime
1625
1636
) ;
1626
1637
1627
1638
// In cases of summarizer, we want to dispose instead since consumer doesn't interact with this container
1628
- this . closeFn = isSummarizerClient ? this . disposeFn : closeFn ;
1639
+ this . closeFn = this . isSummarizerClient ? this . disposeFn : closeFn ;
1629
1640
1630
1641
let loadSummaryNumber : number ;
1631
1642
// Get the container creation metadata. For new container, we initialize these. For existing containers,
@@ -1786,7 +1797,7 @@ export class ContainerRuntime
1786
1797
existing,
1787
1798
metadata,
1788
1799
createContainerMetadata : this . createContainerMetadata ,
1789
- isSummarizerClient,
1800
+ isSummarizerClient : this . isSummarizerClient ,
1790
1801
getNodePackagePath : async ( nodePath : string ) => this . getGCNodePackagePath ( nodePath ) ,
1791
1802
getLastSummaryTimestampMs : ( ) => this . messageAtLastSummary ?. timestamp ,
1792
1803
readAndParseBlob : async < T > ( id : string ) => readAndParse < T > ( this . storage , id ) ,
@@ -2151,8 +2162,7 @@ export class ContainerRuntime
2151
2162
maxOpsSinceLastSummary ,
2152
2163
) ;
2153
2164
2154
- const isSummarizerClient = this . clientDetails . type === summarizerClientType ;
2155
- if ( isSummarizerClient ) {
2165
+ if ( this . isSummarizerClient ) {
2156
2166
// We want to dynamically import any thing inside summaryDelayLoadedModule module only when we are the summarizer client,
2157
2167
// so that all non summarizer clients don't have to load the code inside this module.
2158
2168
const module = await import (
@@ -4641,51 +4651,87 @@ export class ContainerRuntime
4641
4651
4642
4652
/**
4643
4653
* Resubmits each message in the batch, and then flushes the outbox.
4654
+ * This typically happens when we reconnect and there are pending messages.
4644
4655
*
4645
- * @remarks - If the "Offline Load" feature is enabled, the batchId is included in the resubmitted messages,
4656
+ * @remarks
4657
+ * Attempting to resubmit a batch that has been successfully sequenced will not happen due to
4658
+ * checks in the ConnectionStateHandler (Loader layer)
4659
+ *
4660
+ * The only exception to this would be if the Container "forks" due to misuse of the "Offline Load" feature.
4661
+ * If the "Offline Load" feature is enabled, the batchId is included in the resubmitted messages,
4646
4662
* for correlation to detect container forking.
4647
4663
*/
4648
4664
private reSubmitBatch (
4649
4665
batch : PendingMessageResubmitData [ ] ,
4650
4666
{ batchId, staged, squash } : PendingBatchResubmitMetadata ,
4651
4667
) : void {
4668
+ assert (
4669
+ this . _summarizer === undefined ,
4670
+ 0x8f2 /* Summarizer never reconnects so should never resubmit */ ,
4671
+ ) ;
4672
+
4652
4673
const resubmitInfo = {
4653
4674
// Only include Batch ID if "Offline Load" feature is enabled
4654
4675
// It's only needed to identify batches across container forks arising from misuse of offline load.
4655
4676
batchId : this . offlineEnabled ? batchId : undefined ,
4656
4677
staged,
4657
4678
} ;
4658
4679
4680
+ const resubmitFn = squash
4681
+ ? this . reSubmitWithSquashing . bind ( this )
4682
+ : this . reSubmit . bind ( this ) ;
4683
+
4659
4684
this . batchRunner . run ( ( ) => {
4660
4685
for ( const message of batch ) {
4661
- this . reSubmit ( message , squash ) ;
4686
+ resubmitFn ( message ) ;
4662
4687
}
4663
4688
} , resubmitInfo ) ;
4664
4689
4665
4690
this . flush ( resubmitInfo ) ;
4666
4691
}
4667
4692
4668
- private reSubmit ( message : PendingMessageResubmitData , squash : boolean ) : void {
4669
- this . reSubmitCore ( message . runtimeOp , message . localOpMetadata , message . opMetadata , squash ) ;
4670
- }
4671
-
4672
4693
/**
4673
- * Finds the right store and asks it to resubmit the message. This typically happens when we
4674
- * reconnect and there are pending messages.
4675
- * ! Note: successfully resubmitting an op that has been successfully sequenced is not possible due to checks in the ConnectionStateHandler (Loader layer)
4676
- * @param message - The original LocalContainerRuntimeMessage.
4677
- * @param localOpMetadata - The local metadata associated with the original message.
4694
+ * Resubmit the given message as part of a squash rebase upon exiting Staging Mode.
4695
+ * How exactly to resubmit the message is up to the subsystem that submitted the op to begin with.
4678
4696
*/
4679
- private reSubmitCore (
4680
- message : LocalContainerRuntimeMessage ,
4681
- localOpMetadata : unknown ,
4682
- opMetadata : Record < string , unknown > | undefined ,
4683
- squash : boolean ,
4684
- ) : void {
4697
+ private reSubmitWithSquashing ( resubmitData : PendingMessageResubmitData ) : void {
4698
+ const message = resubmitData . runtimeOp ;
4685
4699
assert (
4686
- this . _summarizer === undefined ,
4687
- 0x8f2 /* Summarizer never reconnects so should never resubmit */ ,
4700
+ canStageMessageOfType ( message . type ) ,
4701
+ "Expected message type to be compatible with staging" ,
4688
4702
) ;
4703
+ switch ( message . type ) {
4704
+ case ContainerMessageType . FluidDataStoreOp : {
4705
+ this . channelCollection . reSubmit (
4706
+ message . type ,
4707
+ message . contents ,
4708
+ resubmitData . localOpMetadata ,
4709
+ /* squash: */ true ,
4710
+ ) ;
4711
+ break ;
4712
+ }
4713
+ // NOTE: Squash doesn't apply to GC or DocumentSchemaChange ops, fallback to typical resubmit logic.
4714
+ case ContainerMessageType . GC :
4715
+ case ContainerMessageType . DocumentSchemaChange : {
4716
+ this . reSubmit ( resubmitData ) ;
4717
+ break ;
4718
+ }
4719
+ default : {
4720
+ unreachableCase ( message . type ) ;
4721
+ }
4722
+ }
4723
+ }
4724
+
4725
+ /**
4726
+ * Resubmit the given message which was previously submitted to the ContainerRuntime but not successfully
4727
+ * transmitted to the ordering service (e.g. due to a disconnect, or being in Staging Mode)
4728
+ * How to resubmit is up to the subsystem that submitted the op to begin with
4729
+ */
4730
+ private reSubmit ( {
4731
+ runtimeOp : message ,
4732
+ localOpMetadata,
4733
+ opMetadata,
4734
+ } : PendingMessageResubmitData ) : void {
4689
4735
switch ( message . type ) {
4690
4736
case ContainerMessageType . FluidDataStoreOp :
4691
4737
case ContainerMessageType . Attach :
@@ -4696,7 +4742,7 @@ export class ContainerRuntime
4696
4742
message . type ,
4697
4743
message . contents ,
4698
4744
localOpMetadata ,
4699
- squash ,
4745
+ /* squash: */ false ,
4700
4746
) ;
4701
4747
break ;
4702
4748
}
@@ -4752,7 +4798,16 @@ export class ContainerRuntime
4752
4798
this . channelCollection . rollback ( type , contents , localOpMetadata ) ;
4753
4799
break ;
4754
4800
}
4755
- case ContainerMessageType . GC :
4801
+ case ContainerMessageType . GC : {
4802
+ // Just drop it, but log an error, this is not expected and not ideal, but not critical failure either.
4803
+ // Currently the only expected type here is TombstoneLoaded, which will have been preceded by one of these events as well:
4804
+ // GC_Tombstone_DataStore_Requested, GC_Tombstone_SubDataStore_Requested, GC_Tombstone_Blob_Requested
4805
+ this . mc . logger . sendErrorEvent ( {
4806
+ eventName : "GC_OpDiscarded" ,
4807
+ details : { subType : contents . type } ,
4808
+ } ) ;
4809
+ break ;
4810
+ }
4756
4811
case ContainerMessageType . DocumentSchemaChange : {
4757
4812
throw new Error ( `Handling ${ type } ops in rolled back batch not yet implemented` ) ;
4758
4813
}
0 commit comments