Skip to content

Commit

Permalink
Added used state tracking in summarizer node during summary (#4688)
Browse files Browse the repository at this point in the history
  • Loading branch information
agarwal-navin committed Dec 31, 2020
1 parent 532d047 commit fa35947
Show file tree
Hide file tree
Showing 22 changed files with 616 additions and 186 deletions.
12 changes: 8 additions & 4 deletions packages/dds/shared-object-base/src/sharedObject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@ import {
IChannelServices,
} from "@fluidframework/datastore-definitions";
import { ISequencedDocumentMessage, ITree } from "@fluidframework/protocol-definitions";
import { IChannelSummarizeResult, IGCData, ISummaryTreeWithStats } from "@fluidframework/runtime-definitions";
import {
IChannelSummarizeResult,
IGarbageCollectionData,
ISummaryTreeWithStats,
} from "@fluidframework/runtime-definitions";
import { convertToSummaryTreeWithStats, FluidSerializer } from "@fluidframework/runtime-utils";
import { ChildLogger, EventEmitterWithErrorHandling } from "@fluidframework/telemetry-utils";
import { SharedObjectHandle } from "./handle";
Expand Down Expand Up @@ -215,7 +219,7 @@ export abstract class SharedObject<TEvent extends ISharedObjectEvents = ISharedO
this._isSummarizing = true;

let summaryTree: ISummaryTreeWithStats;
let gcData: IGCData;
let gcData: IGarbageCollectionData;
try {
const serializer = new SummarySerializer(this.runtime.channelsRoutingContext);
const snapshot: ITree = this.snapshotCore(serializer);
Expand All @@ -241,7 +245,7 @@ export abstract class SharedObject<TEvent extends ISharedObjectEvents = ISharedO
/**
* {@inheritDoc (ISharedObject:interface).getGCData}
*/
public getGCData(): IGCData {
public getGCData(): IGarbageCollectionData {
// We run the full summarize logic to get the list of outbound routes from this object. This is a little
// expensive but its okay for now. It will be udpated to not use full summarize and make it more efficient.
// See: https://github.com/microsoft/FluidFramework/issues/4547
Expand All @@ -252,7 +256,7 @@ export abstract class SharedObject<TEvent extends ISharedObjectEvents = ISharedO
assert(!this._isSummarizing, "Possible re-entrancy! Summary should not already be in progress.");
this._isSummarizing = true;

let gcData: IGCData;
let gcData: IGarbageCollectionData;
try {
const serializer = new SummarySerializer(this.runtime.channelsRoutingContext);
this.snapshotCore(serializer);
Expand Down
4 changes: 2 additions & 2 deletions packages/dds/shared-object-base/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import { IErrorEvent, IEventProvider, IEventThisPlaceHolder } from "@fluidframework/common-definitions";
import { IChannel, IChannelServices } from "@fluidframework/datastore-definitions";
import { ISequencedDocumentMessage } from "@fluidframework/protocol-definitions";
import { IChannelSummarizeResult, IGCData } from "@fluidframework/runtime-definitions";
import { IChannelSummarizeResult, IGarbageCollectionData } from "@fluidframework/runtime-definitions";

export interface ISharedObjectEvents extends IErrorEvent {
(event: "pre-op" | "op",
Expand Down Expand Up @@ -46,5 +46,5 @@ export interface ISharedObject<TEvent extends ISharedObjectEvents = ISharedObjec
* Returns the GC data for this shared object. It contains a list of GC nodes that contains references to
* other GC nodes.
*/
getGCData(): IGCData;
getGCData(): IGarbageCollectionData;
}
19 changes: 13 additions & 6 deletions packages/runtime/container-runtime/src/containerRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ import {
IFluidDataStoreContextDetached,
IFluidDataStoreRegistry,
IFluidDataStoreChannel,
IGCData,
IGarbageCollectionData,
IEnvelope,
IInboundSignalMessage,
ISignalEnvelope,
Expand All @@ -91,6 +91,7 @@ import {
IChannelSummarizeResult,
CreateChildSummarizerNodeParam,
SummarizeInternalFn,
IGarbageCollectionSummaryDetails,
} from "@fluidframework/runtime-definitions";
import {
addBlobToSummary,
Expand Down Expand Up @@ -699,15 +700,17 @@ export class ContainerRuntime extends TypedEventEmitter<IContainerRuntimeEvents>
(attachMsg) => this.submit(ContainerMessageType.Attach, attachMsg),
(id: string, createParam: CreateChildSummarizerNodeParam) => (
summarizeInternal: SummarizeInternalFn,
getGCDataFn: () => Promise<IGCData>,
getInitialGCDataFn: () => Promise<IGCData | undefined>,
getGCDataFn: () => Promise<IGarbageCollectionData>,
getInitialGCSummaryDetailsFn: () => Promise<IGarbageCollectionSummaryDetails>,
usedRoutes: string[],
) => this.summarizerNode.createChild(
summarizeInternal,
id,
createParam,
undefined,
getGCDataFn,
getInitialGCDataFn,
getInitialGCSummaryDetailsFn,
usedRoutes,
),
this._logger);

Expand Down Expand Up @@ -1355,9 +1358,13 @@ export class ContainerRuntime extends TypedEventEmitter<IContainerRuntimeEvents>
}

if (this.runtimeOptions.runGC) {
// Get the container's GC data and run GC on the reference graph in the GC data.
// Get the container's GC data and run GC on the reference graph in it.
const gcData = await this.dataStores.getGCData();
runGarbageCollection(gcData.gcNodes, [ "/" ], this.logger);
const { referencedNodeIds } = runGarbageCollection(gcData.gcNodes, [ "/" ], this.logger);

// Remove this node's route ("/") and notify data stores of routes that are used in it.
const usedRoutes = referencedNodeIds.filter((id: string) => { return id !== "/"; });
this.dataStores.updateUsedRoutes(usedRoutes);
}

const trace = Trace.start();
Expand Down
60 changes: 37 additions & 23 deletions packages/runtime/container-runtime/src/dataStoreContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,16 @@ import {
CreateChildSummarizerNodeFn,
CreateChildSummarizerNodeParam,
FluidDataStoreRegistryEntry,
gcBlobKey,
IAttachMessage,
IContextSummarizeResult,
IFluidDataStoreChannel,
IFluidDataStoreContext,
IFluidDataStoreContextDetached,
IFluidDataStoreContextEvents,
IFluidDataStoreRegistry,
IGCData,
IGCDetails,
IGarbageCollectionData,
IGarbageCollectionSummaryDetails,
IInboundSignalMessage,
IProvideFluidDataStoreFactory,
ISummarizeInternalResult,
Expand All @@ -56,7 +57,6 @@ import { ContainerRuntime } from "./containerRuntime";
export const currentSnapshotFormatVersion = "0.1";

export const attributesBlobKey = ".component";
export const gcBlobKey = "gc";

function createAttributes(pkg: readonly string[], isRootDataStore: boolean): IFluidDataStoreAttributes {
const stringifiedPkg = JSON.stringify(pkg);
Expand Down Expand Up @@ -238,10 +238,14 @@ export abstract class FluidDataStoreContext extends TypedEventEmitter<IFluidData

const thisSummarizeInternal =
async (fullTree: boolean, trackState: boolean) => this.summarizeInternal(fullTree, trackState);

// Add self route (empty string) to used routes in the summarizer node. If GC is enabled, the used routes will
// be updated as per the GC data.
this.summarizerNode = createSummarizerNode(
thisSummarizeInternal,
async () => this.getGCDataInternal(),
async () => this.getInitialGCData(),
async () => this.getInitialGCSummaryDetails(),
[""] /* usedRoutes */,
);
}

Expand Down Expand Up @@ -405,19 +409,20 @@ export abstract class FluidDataStoreContext extends TypedEventEmitter<IFluidData
addBlobToSummary(summarizeResult, attributesBlobKey, JSON.stringify(attributes));

// Add GC details to the summary.
const gcDetails: IGCDetails = {
const gcDetails: IGarbageCollectionSummaryDetails = {
usedRoutes: this.summarizerNode.usedRoutes,
gcData: summarizeResult.gcData,
};
addBlobToSummary(summarizeResult, gcBlobKey, JSON.stringify(gcDetails));

return { ...summarizeResult, id: this.id };
}

public async getGCData(): Promise<IGCData> {
public async getGCData(): Promise<IGarbageCollectionData> {
return this.summarizerNode.getGCData();
}

private async getGCDataInternal(): Promise<IGCData> {
private async getGCDataInternal(): Promise<IGarbageCollectionData> {
await this.realize();
assert(this.channel !== undefined, "Channel should not be undefined when running GC");

Expand All @@ -431,11 +436,15 @@ export abstract class FluidDataStoreContext extends TypedEventEmitter<IFluidData
}

/**
* This returns the initial GC data of this context.
* After GC has run, called to notify this data store of routes that are used in it.
* @param usedRoutes - The routes that are used in this data store.
*/
protected async getInitialGCData(): Promise<IGCData | undefined> {
const initialGCDetails = await this.getInitialGCDetails();
return initialGCDetails.gcData;
public updateUsedRoutes(usedRoutes: string[]) {
// Currently, only data stores can be collected. Once we have GC at DDS layer, the DDS' in the data store will
// also be notified of their used routes. See - https://github.com/microsoft/FluidFramework/issues/4611

// Update the used routes in this data store's summarizer node.
this.summarizerNode.usedRoutes = usedRoutes;
}

/**
Expand Down Expand Up @@ -561,7 +570,7 @@ export abstract class FluidDataStoreContext extends TypedEventEmitter<IFluidData

protected abstract getInitialSnapshotDetails(): Promise<ISnapshotDetails>;

protected abstract getInitialGCDetails(): Promise<IGCDetails>;
protected abstract getInitialGCSummaryDetails(): Promise<IGarbageCollectionSummaryDetails>;

public reSubmit(contents: any, localOpMetadata: unknown) {
assert(!!this.channel, "Channel must exist when resubmitting ops");
Expand All @@ -578,16 +587,18 @@ export abstract class FluidDataStoreContext extends TypedEventEmitter<IFluidData
public getCreateChildSummarizerNodeFn(id: string, createParam: CreateChildSummarizerNodeParam) {
return (
summarizeInternal: SummarizeInternalFn,
getGCDataFn: () => Promise<IGCData>,
getInitialGCDataFn: () => Promise<IGCData | undefined>,
getGCDataFn: () => Promise<IGarbageCollectionData>,
getInitialGCSummaryDetailsFn: () => Promise<IGarbageCollectionSummaryDetails>,
usedRoutes: string[],
) => this.summarizerNode.createChild(
summarizeInternal,
id,
createParam,
// DDS will not create failure summaries
{ throwOnFailure: true },
getGCDataFn,
getInitialGCDataFn,
getInitialGCSummaryDetailsFn,
usedRoutes,
);
}

Expand Down Expand Up @@ -678,13 +689,15 @@ export class RemotedFluidDataStoreContext extends FluidDataStoreContext {
};
});

private readonly initialGCDetailsP = new LazyPromise<IGCDetails>(async () => {
private readonly gcDetailsInInitialSummaryP = new LazyPromise<IGarbageCollectionSummaryDetails>(async () => {
// If the initial snapshot is null or string, the snapshot is in old format and won't have GC details.
if (!(this.initSnapshotValue === null || typeof this.initSnapshotValue === "string")
&& this.initSnapshotValue.blobs[gcBlobKey] !== undefined) {
return readAndParse<IGCDetails>(this.storage, this.initSnapshotValue.blobs[gcBlobKey]);
return readAndParse<IGarbageCollectionSummaryDetails>(
this.storage,
this.initSnapshotValue.blobs[gcBlobKey],
);
} else {
// Default value of initial GC details in case the initial snapshot does not have GC details.
return {};
}
});
Expand All @@ -693,8 +706,8 @@ export class RemotedFluidDataStoreContext extends FluidDataStoreContext {
return this.initialSnapshotDetailsP;
}

protected async getInitialGCDetails(): Promise<IGCDetails> {
return this.initialGCDetailsP;
protected async getInitialGCSummaryDetails(): Promise<IGarbageCollectionSummaryDetails> {
return this.gcDetailsInInitialSummaryP;
}

public generateAttachMessage(): IAttachMessage {
Expand Down Expand Up @@ -758,7 +771,8 @@ export class LocalFluidDataStoreContextBase extends FluidDataStoreContext {
addBlobToSummary(summarizeResult, attributesBlobKey, JSON.stringify(attributes));

// Add GC details to the summary.
const gcDetails: IGCDetails = {
const gcDetails: IGarbageCollectionSummaryDetails = {
usedRoutes: this.summarizerNode.usedRoutes,
gcData: summarizeResult.gcData,
};
addBlobToSummary(summarizeResult, gcBlobKey, JSON.stringify(gcDetails));
Expand All @@ -785,8 +799,8 @@ export class LocalFluidDataStoreContextBase extends FluidDataStoreContext {
};
}

protected async getInitialGCDetails(): Promise<IGCDetails> {
// There is no initial GC details for local data stores.
protected async getInitialGCSummaryDetails(): Promise<IGarbageCollectionSummaryDetails> {
// Local data store does not have initial summary.
return {};
}
}
Expand Down
32 changes: 30 additions & 2 deletions packages/runtime/container-runtime/src/dataStores.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import {
IEnvelope,
IFluidDataStoreChannel,
IFluidDataStoreContextDetached,
IGCData,
IGarbageCollectionData,
IInboundSignalMessage,
InboundAttachMessage,
ISummarizeResult,
Expand Down Expand Up @@ -437,7 +437,7 @@ export class DataStores implements IDisposable {
return builder.getSummaryTree();
}

public async getGCData(): Promise<IGCData> {
public async getGCData(): Promise<IGarbageCollectionData> {
const builder = new GCDataBuilder();
// Iterate over each store and get their GC data.
await Promise.all(Array.from(this.contexts)
Expand All @@ -457,6 +457,34 @@ export class DataStores implements IDisposable {
return builder.getGCData();
}

/**
* After GC has run, called to notify this Container's data stores of routes that are used in it.
* @param usedRoutes - The routes that are used in all data stores in this Container.
*/
public updateUsedRoutes(usedRoutes: string[]) {
// Build a map of data store ids to routes used in it.
const usedRoutesMap: Map<string, string[]> = new Map();
for (const route of usedRoutes) {
assert(route.startsWith("/"), "Used route should always be an absolute route");

const dataStoreId = route.split("/")[1];
assert(this.contexts.has(dataStoreId), "Used route does not belong to any known data store");

const dataStoreRoute = route.slice(dataStoreId.length + 1);
const routes = usedRoutesMap.get(dataStoreId);
if (routes !== undefined) {
routes.push(dataStoreRoute);
} else {
usedRoutesMap.set(dataStoreId, [dataStoreRoute]);
}
}

// Update the used routes in each data store. Used routes is empty for unused data stores.
for (const [contextId, context] of this.contexts) {
context.updateUsedRoutes(usedRoutesMap.get(contextId) ?? []);
}
}

/**
* Returns the outbound routes of this channel. Only root data stores are considered referenced and their paths are
* part of outbound routes.
Expand Down
Loading

0 comments on commit fa35947

Please sign in to comment.