Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Milan fluid summary compress #11600

Closed
wants to merge 53 commits into from

Conversation

milanro
Copy link
Contributor

@milanro milanro commented Aug 19, 2022

Note

ThisPR is now ready to be merged.

Introduction

  • IDocumentStorageService is the interface of the object, through which, the summary is uploaded to / downloaded from Fluid Server.
  • The instance of the IDocumentStorageService is assigned to the ContainerRuntime class member variable _storage in containerRuntime.ts
  • Implementation class of the IDocumentStorageService instance is ContainerStorageAdapter
  • The code uses nodejs Buffer object. Let's discuss whether this is acceptable. It looks that it is used on other places of Fluid framework either.

Methods of IDocumentStorageService interface handling the summary upload / download

  • createBlob
  • readBlob
  • uploadSummaryWithContext
  • getSnapshotTree
  • downloadSummary

Implementation

Note

This is the incomplete implementation. In this first shot, it is not clear, where the methods

  • getSnapshotTree
  • downloadSummary

are called so these methods are not having the decompression functionality implemented (expecting that these methods retrieve summary from server).

Code

Location

Code is implemented in 3 new files and 1 edited file (small change)

  • packages/runtime/container-runtime/src/summaryStorageAdapter.ts
  • packages/runtime/container-runtime/src/summaryStorageCompressionHooks.ts
  • packages/runtime/container-runtime/src/summaryBlobProtocol.ts
  • packages/runtime/container-runtime/src/containerRuntime.ts

Design Overview

  • Implementation based on delegation pattern
  • SummaryStorageAdapter class delegates the calls to underlying IDocumentStorageService instance
  • It also adds calls to preprocessing and postprocessing hooks.
  • Hooks are abstract / pluggable
  • The present implementation of hooks contains the compression algorithms
  • It is however simple to add further hooks such as encryption

Blob protocol

  • This implementation compresses only blobs
  • It is needed that old versions of summaries are still supported
  • We need to add additional info into the blob binary representation which tells us, which compression algorithm is used
  • We also need to support the cases where this info is not present
  • The Blob Header is inserted at the beginning of the blob binary

Blob Header

  • HEADER ID : 16 bytes of unique identifier : f4e72832a5bd47d7a2f35ed47ed94a3c

  • HEADER CONTENT LENGTH : 4 bytes of length of the rest of header content (after the length field)

  • HEADER CONTENT : HEADER CONTENT LENGTH bytes

  • HEADER CONTENT contains key-value pairs

Compression

  • Blob Header contains key 'ALG' and value is number representing the algorithm : NONE = 1, LZ4 = 2,
    DEFLATE = 3,
  • Compression happens based on the requirement (Algorithm number identifier in the constructor of the hook)
  • The compression algorithm is hardcoded, it could also be assigned as the function into the hook constructor
  • However for decompression there would need to be existing repository of algorithms and number identifiers to choose the proper one on receiving the blob with header
  • If the BlobHeader is not present (f4e72832a5bd47d7a2f35ed47ed94a3c not found at the position 1), no decompression happens on receiving summary, it is passed as received

uploadSummaryWithContext challenges

  • It looks like there no symmetric download method to this upload method (uploadSummaryWithContext)
  • The download is performed via readBlob method (at least in some DDSes)
  • uploadSummaryWithContext processes ISummaryTree object
  • It contains ISummaryBlob objects which are supposed to be compresses
  • ISummaryBlob can contain either binary or string content
  • String content must be converted to binary, UTF encoding is used (is this correct?)
  • Then content is compressed and header is inserted

Main Issue

  • The ISummaryBlob can get binary content
  • When binary content is set it is failing at summaryTreeUploadManager#writeSummaryBlob
  • Assertion happens at comparing different methods to generate hash at assert(hash === blob.sha
    async writeSummaryBlob(content) {
        const { parsedContent, encoding } = typeof content === "string"
            ? { parsedContent: content, encoding: "utf-8" }
            : { parsedContent: Uint8ArrayToString(content, "base64"), encoding: "base64" };
        // The gitHashFile would return the same hash as returned by the server as blob.sha
        const hash = await gitHashFile(IsoBuffer.from(parsedContent, encoding));
        if (!this.blobsShaCache.has(hash)) {
            this.blobsShaCache.set(hash, "");
            const blob = await this.manager.createBlob(parsedContent, encoding);
            assert(hash === blob.sha, 0x0b6 /* "Blob.sha and hash do not match!!" */);
        }
        return hash;
    }

Workaround

  • The workaround is to base64 encode the binary compressed blob content with header
  • The read blob must expect it
  • Read blob searches for header in the binary
  • If not found, it does base64 decode and then searches for header
  • If not found, no decompression happens

summaryStorageAdapter.ts

  • Contains the delegation class of IDocumentStorageService interface which applies preprocessing and postprocessing hooks
  • Contains multi-hook class which can apply various hooks (for example if we want to encrypt and compress)
  • Contains helping functions like retrieving blob / replacing blob from ISummaryTree and traversing ISummaryTree to get blob paths
  • Contains factory function to instantiate SummaryStorageAdapter

summaryBlobProtocol.ts

  • Contains BlobHeader class which can store key/value pairs and generate binary array
  • Contains function which inserts the binary representation of BlobHeader into the binary blob
  • Contain function to read the BlobHeader from the binary blob
  • Contain function to cut off the BlobHeader binary representation from the binary blob

summaryStorageCompressionHooks.ts

  • Contains implementation of hooks
  • Contains compressing of blob
  • Contains decompressing the blob
  • Contains traversing and compressing blobs in ISummaryTree

Questions and Arguments

  • Should the summary compression be configurable? How should we configure the summary compression? IRuntimeOptions? (BaseContainerRuntimeFactory)
  • Can we use the common approach with message compression (specify threshold and algorithm)
  • The compression info is stored in the compressed Blob, do you prefer rather flag at the ISummaryTree? This would probably suppose heavy changes within DDSes as they load Blobs directly (at least SharedPropertyTree does). Also the intercepted methods at IDocumentStorageService such as readBlob do not reach ISummaryTree
  • If the service still want to decompress, is it acceptable to use the Blob header or we need ISummaryTree entry to show algorithm

@github-actions github-actions bot added area: runtime Runtime related issues base: main PRs targeted against main branch labels Aug 19, 2022
@milanro
Copy link
Contributor Author

milanro commented Aug 22, 2022

@vladsud @DLehenbauer @dstanesc This draft PR is the partial PoC implementation of the general compression of summaries at the Fluid Framework. It is not fully implemented and is not expected to be merged into main. It is the base for our further discussions as it represents my view, how / where the compression could be implemented. In any case, it is functional at the common use-case (usual summary generation / load)

I believe that it is relevant especially for @vladsud. I would like to ask you to possibly looks at it and open further discussions in this PR.

Just a point that this is based solely on OO, not using functional programming, we can further discuss whether you are ok with it.

@milanro
Copy link
Contributor Author

milanro commented Aug 22, 2022

Maybe also relevant for @justus-camp

@dstanesc
Copy link
Contributor

This is an excellent discussion starter and can work conceptually pretty well with the existing implementation (ie. Property tree summarizer draws efficiency by maximizing blob size to the platform limits hence very good compression rates).

However, once incremental summaries are available, efficiency definition for summary transfer slightly changes. For instance the ideal scenario is when summary blobs equate in size the actual changes.

Summary partitioning algorithm (equally probabilistic or explicit) will rather produce (for typical use-cases) smaller size blobs, to be optimized heuristically per each individual domain/case but likely in the (min 8KiB, avg 16KiB, max 32 KiB) range. Such blobs are not compressing very well.

The design I have in mind would circle back to the original discussion on the differentiation between compression for the data transfer and the compression for the data storage. In a sense this simplifies the problem (not the implementation!) as there are ubiquitous patterns we could apply. One example at hand is http client-server compression, but probably closer to Fluid is the Kafka message compression:

Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression) See doc

Therefore IMHO a canonical design for summary compression would have to include on the client:

  • support for fine granular increments/blobs
  • ability to batch the summary increments/blobs
  • ability to compress the batches of summary increments/blobs
  • ability to partition (fixed size) the compressed batches for efficient transfer

The server will need to provide support for above

@milanro
Copy link
Contributor Author

milanro commented Aug 23, 2022

Summary partitioning algorithm (equally probabilistic or explicit) will rather produce (for typical use-cases) smaller size blobs, to be optimized heuristically per each individual domain/case but likely in the (min 8KiB, avg 16KiB, max 32 KiB) range. Such blobs are not compressing very well.

I have a feeling that there will always be some optimal threshold and maybe 8k blobs will not be appropriate for let's say 1GB DDS (which would result in 131072 blobs). And the compression is very relevant here also due to reducing space on the server. Maybe that is a strong reason, why blobs prior compression should be bigger.

@dstanesc
Copy link
Contributor

I was touching the topics above, but probably more clarification on the presented viewpoint won't hurt:

I have a feeling that there will always be some optimal threshold and maybe 8k blobs will not be appropriate for let's say 1GB DDS (which would result in 131072 blobs).

As mentioned above, chunk sizes should be correlated w/ domain and use-cases (ie commit size). I see 2 possible scenarios to motivate large summaries, both valid:

  1. Historical growth, possibly by small ops (I/O would benefit from configuring smaller chunking threshold)
  2. The use case produces only large ops (I/O would benefit from configuring larger chunking threshold)

And the compression is very relevant here also due to reducing space on the server.

As mentioned above, I have my mind on the logical differentiation between compression for the data transfer and the compression for the data storage. They are conceptually orthogonal and could be implemented as such. Even more, one would typically have different compression rate preference for in-flight data (speed) vs. data at rest (size)

@milanro
Copy link
Contributor Author

milanro commented Aug 23, 2022

As mentioned above, I have my mind on the logical differentiation between compression for the data transfer and the compression for the data storage. They are conceptually orthogonal and could be implemented as such. Even more, one would typically have different compression rate preference for in-flight data (speed) vs. data at rest (size)

I expect that it is not foreseen that server would do any further compression and spend the CPU time on any such heavy operation. I'd rather expect that the size of your data stored on the server would influence your price of the service and that it would be in your best interest to keep it as small as possible.

@@ -77,6 +77,8 @@
"@fluidframework/runtime-utils": ">=2.0.0-internal.1.1.0 <2.0.0-internal.2.0.0",
"@fluidframework/telemetry-utils": ">=2.0.0-internal.1.1.0 <2.0.0-internal.2.0.0",
"double-ended-queue": "^2.1.0-0",
"lz4js": "^0.2.0",
"pako": "^2.0.4",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to find a way for this dependency to be optional - some of our clients are very sensitive to bundle size increases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will appreciate help / ideas here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To deal with optional dependencies like this I see two approaches:

  1. Dynamic imports
  2. Configurable compression through dependency injection

I'm personally partial to the dynamic imports approach, but I think an argument could be made for dependency injection. With dynamic imports, webpack will split the dynamically imported packages to separate chunks and will only download them when requested. This could lead to some sort of flow where we could begin loading the compression library after we first see a compressed op, or possibly when we see that compression is configured on the runtime.

* Licensed under the MIT License.
*/
/**
* This class represents the object instatiation of the header which is optionaly writen at the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great for compression info to be visible and easily modifiable by storage, such that storage (if it wants to) could uncompress / recompress these blobs. This is mostly due to a fact that storage will compress whole file / snapshot used on boot, and having double compression is not great (and will slow down client on boot).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder whether we could put this code to the library common to containerRuntime and also to server storage.

}
public async getSnapshotTree(version?: IVersion | undefined, scenarioName?: string | undefined):
Promise<ISnapshotTree | null> {
const tree = await this._delegate.getSnapshotTree(version, scenarioName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels like code will be simpler if you do not use hooks, but rather provide noop adapter (implementing every method). And whoever needs to overwrite some methods simply overwrites them and calls base method (on base class).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For sure, I was thinking about possibility of chained processors, for example encryption beside compression. This way, it will be very simple to attach. Once, this code is working well, the further add-ons are very simple.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the meeting "General Summary Compression Talk" on September 1st, We agreed that hooks are acceptable and we can benefit in the future from them (if additional pre / postprocessing is needed).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vladsud and I agree that intercepting IDocumentStorageService is a good pattern, but we would prefer you create a base adapter class that simply passes all calls through to the underlying service:

class SummaryStorageAdapter implements IDocumentStorageService {
    constructor (private readonly service) {}
    
    // Implement IDocumentStorageService by forwarding all calls to 'this.service'
    public async getSnapshotTree(...) { return this.service.getSnapshotTree(...); }
    ...
}

...and then implement then implement the CompressedStorageServiceAdapter by overriding 'getSnapshotTree'.

Multiple adapters are then explicitly chained, making the order in which they are applied clear:

new ChunkedStorageServiceAdapter(
    new CompressedStorageServiceAdapter(
        this.storage));

Copy link
Contributor Author

@milanro milanro Nov 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this one!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented in 86c95f5. CompressedStorageServiceAdapter constructor has more parameters such as algorithm etc.

const paths: string[][] = [];
const currentPath: string[] = [];
listBlobPaths(paths, currentPath, newSummary);
paths.forEach((path) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have a look at 'recursivelyReplace()' in 'packages/dds/shared-object-base/src/serializer.ts'.

'recursivelyReplace()' descends a JSONable tree of objects, looking for a particular pattern (in this case IFluidHandles). When it finds one, it replaces that object with a new object (the encoded or decoded FluidHandle). If a replacement is made, it clones the parent object as the stack unwinds.

The result is a minimal copy-on-write, where only the cloned objects that those that need to be mutated (and their ancestors, which are implicitly mutated to point to the clones.) The rest of the original tree is reused.

IIRC, the perf speedup was ~200x, although the majority of that came from avoiding JSON.parse(JSON.stringify()) to do the original clone.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DLehenbauer I see the beauty of this pattern and will refactor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DLehenbauer The method 'recursivelyReplace()' in 'packages/dds/shared-object-base/src/serializer.ts' looks to have a potential to be a standalone function reusable in my code either. It could be a common util method. I wonder where such method could be located. I see the @fluidframework/common-utils package but it contains only build and lib dirs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DLehenbauer Implemented in 8dfcef4 . It would be nice to avoid code duplication by making packages/dds/shared-object-base/src/serializer.ts#recursivelyReplace method available globally. I am not sure, where that method should be located.

@DLehenbauer
Copy link
Contributor

@milanro - Just FYI - We're going to have more nit picky feedback on this PR, but do plan to merge it. You have the fundamentally correctly approach.

@dstanesc - I was wondering if you were thinking of using the same pattern for chunking? (i.e., have a storage summary adapter that can be chained before or after the compression adapter?)

@@ -553,9 +553,12 @@ export interface ISummaryAckMessage extends ISequencedDocumentMessage {

// @public (undocumented)
export interface ISummaryBaseConfiguration {
compressionAlgorithm?: SummaryCompressionAlgorithm;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@justus-camp - Any opportunities for reuse / convergence with op compression? (e.g., configuration patterns to duplicate, sharing an enum of compression algorithms?)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The work is only in next for now (need to rebase it onto main), but there's an enum of compression algorithms here (currently only lz4). For configuration, I've added ICompressionRuntimeOptions here as part of ContainerRuntimeOptions.

@milanro
Copy link
Contributor Author

milanro commented Nov 8, 2022

@milanro - Just FYI - We're going to have more nit picky feedback on this PR, but do plan to merge it. You have the fundamentally correctly approach.

@dstanesc - I was wondering if you were thinking of using the same pattern for chunking? (i.e., have a storage summary adapter that can be chained before or after the compression adapter?)

@dstanesc @DLehenbauer The facility for chaining the adapters (or so-called hooks) is implemented in this PR already. Next hook performing the chunking can be added when constructing the storage in containerRuntime. So I believe, either @dstanesc should wait for this PR to be merged or he should build upon it in the new branch.

@dstanesc
Copy link
Contributor

dstanesc commented Nov 8, 2022

@dstanesc - I was wondering if you were thinking of using the same pattern for chunking? (i.e., have a storage summary adapter that can be chained before or after the compression adapter?)

I believe so. Adopting the pattern for summary compaction would definitely weight on reusable symmetry and design intent for the runtime and opt-in features. Summary compaction would require a hook implementing a single method ie. onPreUploadSummaryWithContext.

| IPendingRuntimeState
| undefined;
const baseSnapshot: ISnapshotTree | undefined =
pendingRuntimeState?.baseSnapshot ?? context.baseSnapshot;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to self: 'pendingRuntimeState', 'baseSnapshot', and 'storage' are just reformatted for readability.

: new SerializedSnapshotStorage(() => {
return context.storage;
}, pendingRuntimeState.snapshotBlobs);
if (summaryConfigurationHeuristic?.compressionAlgorithm !== undefined) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you also want to elide the adapter when the algorithm is explicitly 'None'?

Perhaps remove the 'None' option and use undefined for no compression (this is what @justus-camp did for ops).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the algorithm is None, we still want to impose the full functionality on decoding.

@milanro milanro requested a review from a team as a code owner March 29, 2023 15:27
@microsoft-github-policy-service
Copy link
Contributor

This PR has been automatically marked as stale because it has had no activity for 60 days. It will be closed if no further activity occurs within 8 days of this comment. Thank you for your contributions to Fluid Framework!

@microsoft-github-policy-service
Copy link
Contributor

This PR has been automatically marked as stale because it has had no activity for 60 days. It will be closed if no further activity occurs within 8 days of this comment. Thank you for your contributions to Fluid Framework!

4 similar comments
@microsoft-github-policy-service
Copy link
Contributor

This PR has been automatically marked as stale because it has had no activity for 60 days. It will be closed if no further activity occurs within 8 days of this comment. Thank you for your contributions to Fluid Framework!

@microsoft-github-policy-service
Copy link
Contributor

This PR has been automatically marked as stale because it has had no activity for 60 days. It will be closed if no further activity occurs within 8 days of this comment. Thank you for your contributions to Fluid Framework!

@microsoft-github-policy-service
Copy link
Contributor

This PR has been automatically marked as stale because it has had no activity for 60 days. It will be closed if no further activity occurs within 8 days of this comment. Thank you for your contributions to Fluid Framework!

@microsoft-github-policy-service
Copy link
Contributor

This PR has been automatically marked as stale because it has had no activity for 60 days. It will be closed if no further activity occurs within 8 days of this comment. Thank you for your contributions to Fluid Framework!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area: runtime Runtime related issues base: main PRs targeted against main branch public api change Changes to a public API status: stale
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

7 participants