Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 177 additions & 0 deletions design/open/staging-compaction.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
# Proposal: Staging Compaction

## Problem Description

lakeFS staging area is built on top of the [KV store](../accepted/metadata_kv/index.md).
In short, the structure for the `Branch` entity in the kv-store is as follows:
```go
type Branch struct {
CommitID CommitID
StagingToken StagingToken
// SealedTokens - Staging tokens are appended to the front, this allows building the diff iterator easily
SealedTokens []StagingToken
}
```

Uncommitted entries are read from the staging token first, then from the
sealed tokens by order. Writes are performed on the staging token.

The KV design has proven to meet lakeFS requirements for its consistency
guarantees. For most use-cases of uncommitted areas in lakeFS, it does that
efficiently. However, there are some cases where it falls short, for example
this [issue](https://github.com/treeverse/lakeFS/issues/2092).
There might be other cases where the structure of the `Branch` entity impacts
the performance of reading from the staging area, for example when the
number of sealed tokens is large (N)[^1] and reading a missing entry requires
reading from all N sealed tokens.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I'm putting together a separate proposal for partitioning the staging area. It is much simpler and more efficient to partition only forwards in time and not change anything already written. So we will want to compact staging as a way to reduce read load on already-written parts, by removing KV partitions entirely!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll love to read that proposal. I'm having hard time understanding how compacting partitions work 😅

Comment on lines +23 to +26
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I know multiple sealed tokens case is rare.
Is there an open open performance issue where we found more than one token that cause performance issue?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope. Well, I did see this when experimenting with very many concurrent merges. But there are other reasons why those are slow. Still, once I get to split staging tokens this might become more common.
Definitely something for later, agree with @itaiad200 that initially it least we should trigger this due to multiple deletions -- it will definitely help there immediately.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's indeed rare as every sealed token represents a fail operation on the branch, so I don't think it has a big impact.


## Goals
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see any reference to visibility into this internal feature.
To me this is parallel to having a GC, I wanna know if it worked, when it runs, how much time did it take etc.
If you agree can you add a small section about the visibility requirements?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a user, why do you want to know if compaction worked? And how effective was it? You only care about it when performance degrades and you need to drill down further. I guess most users will never hear of that.
GC is different. It's an external process I need to manage by myself (or pay someone to do it) and it results in storage costs.
We should have metrics for it just like with any other flow in Graveler.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First, sorry for not being clear - I meant GC in JVM not lakeFS.
And this is super internal but, we must have some visibility into the performance of compaction isolated from the rest.
The reason is because the sensor would evolve over time into different directions with heuristics / tradeoffs / configuration.
Having visibility is important.

I think that this should be mentioned as part of the design, otherwise this can easily be missed and "shipped later" or done with an after thought.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think prometheus metrics are more than enough for the sophisticated lakeFS user to tweak what needs to be tweaked. What new metrics would you like me to call out in this design?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

compaction successful runs number, compaction failures number or total compaction runs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So a single event compaction_run {success:bool}


- Preserve lakeFS consistency guarantees for all scenarios.
- A more efficient way to read from branches with a large number of
tombstones (i.e. Fix the issue mentioned above).
- Preserve lakeFS performance for other scenarios.[^2]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that "other scenarios" can cause also be accessing the blockstore. Iterate over metarange can cause slowdown while we requrire to download blocks (unless they are cached). The currently iterator uses next for both uncommitted and committed so I assume we can say that we have the same tombstone issue while we scan over committed data if the staging/uncommtited entries are full of changes too and we "pay" in performance while scan data that we will served but still scan it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's true. The CompactedMetaRange will not contain just the uncommitted entries. It will contain a snapshot of the branch at the time of the compaction, i.e. as if commit ran at the same time. Meaning, tombstones will not be part of the CompactedMetaRange as we do not store deletions in a commit. So we can't have the same tombstone issue.
However, you're pointing on a different potential problem. We're replacing load from the kv-store with load on the object store.



## Non Goals

- Improve the performance of the KV store in general.
- Simplify the uncommitted area model in lakeFS. While we'll try to keep the
changes minimal, we might need to add some complexity to the model to
achieve the goals.


## Proposed Design

We propose to add a compaction mechanism to the staging area. We'll explain
how to decide when to compact (Sensor), how to compact (Compactor),
data read/write flows and commit flows after compaction.

```go
type Branch struct {
CommitID CommitID
StagingToken StagingToken
// SealedTokens - Staging tokens are appended to the front, this allows building the diff iterator easily
SealedTokens []StagingToken

CompactedMetaRange MetaRangeID
CompactionStartTime time.Time
}
```

### Sensor

The Sensor is responsible for deciding when to compact the staging area.
The Sensor will be linked to the Graveler and will collect
information on writes to the staging area. It will decide when to compact a
certain branch based on the number of deleted entries to its staging area,
Comment on lines +66 to +67
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a thought according to "based on the ..."
Do we have any statistics on cases that do bulk delete? IIRC there are spark jobs that do a bunch of deletes. working by a fix number of deleted entries may result in many compaction during a bulk delete.
Another example I can think of is moving a directory (that exists on staging), doing "commits" along the way may result in commits that each of them is "heavier than the only commit that would've been done otherwise.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have any statistics.

taking into accounts operations like commits/resets etc. It can also decide
based on the number of sealed tokens, although not necessarily a priority
for the first version. We can probably avoid the need to query the kv-store
to retrieve that information (except for the service startups) by caching
the information in the Sensor.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is hard to cache the number of deletions locally in a cluster of lakeFSes. One option might be for each lakeFS instance to select a unique ID and store its deletions count on KV some key. Writes are immediately uncontended, and write load can be reduced by batching the increment for a short while.


Notice there's a single Sensor for each lakeFS. While the followup sections
describe why concurrent compactions don't harm consistency, they may be very
inefficient. Therefore, a Sensor deciding to compact will only do so if the
branch's `CompactionStartTime` is not within the last x<TBD> minutes using
`SetIf` to minimize collisions (although not avoiding them completely due to
clock skews).

### Compactor

Upon deciding to compact, the Sensor will trigger the Compactor. The
Compactor will perform an operation very similar to Commit. Before starting
to compact, it will atomically:
Comment on lines +81 to +85
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about the "garbage", this compactor may create many "dangling" ranges. Do have any estimation to how this would effect or if it might be an issue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will increase the number of dangling commits significantly, let's say by an order of a magnitude? We'll need to handle that at some point in some GC process.


1. Push a new `StagingToken` to the branch.
1. Add the old `StagingToken` to the `SealedTokens` list.
1. Set the `CompactionStartTime` to the current time.

The Compactor will now read the branch's `SealedTokens` in order and apply
them on either the `CompactedMetaRange` if it exists, or on the branch's
HEAD CommitID MetaRange to create a newMetaRange. The Compactor will then
atomically update the branch entity:

1. Set the `CompactedMetaRangeID` to the new MetaRangeID.
1. Remove the compacted `SealedTokens` from the branch.

The Compactor should fail the operation if the sealed tokens have changed
since the compaction started (SetIf). Although the algorithm can tolerate
additional sealed tokens being added during the compaction, it's better to
avoid competing with concurrent commits. Commits have the same benefits as
compactions, but they are proactively triggered by the user (and might fail
if compaction succeeds).

Consistency guarantees for the compaction process are derived from the
Commit operation consistency guarantees. In short, the compaction process
follows the same principles as the Commit operation. Writes to a branch
staging token succeeds only if it's the staging token after the write
occurred. Therefore, replacing the sealed token with an equivelant MetaRange
guarantees no successful writes are gone missing.

### Branch Write Flow

No change here. Writes are performed on the staging token.

### Branch Read Flow

Reading from a branch includes both a specific entry lookup and a listing
request on entries of a branch. The read flow is more complicated than what
we have today, and is different for compacted and uncompacted branches.

#### Combined (Committed+ Uncommitted)

There are 3 layers of reading from a branch:
1. Read from the staging token.
2. Read from the sealed tokens (in order).
3. Depends on compaction:
1. If a CompactedMetaRangeID exists, read from the compacted MetaRange.
1. If a CompactedMetaRangeID doesn't exist, read from the CommitID.

#### Committed

Just like today, reads are performed on the Branch's CommitID.

#### Uncommitted

For operations such as `DiffUncommitted` or checking if the staging
area is empty, the read flow will be as follows:

1. Read from the staging token.
2. Read from the sealed tokens (in order).
3. If a CompactedMetaRangeID exists, read the 2-way diff between the compacted
metarange and the CommitID's metarange.
Comment on lines +141 to +144
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There still is a step I think I'm missing, what do I do with the data from the sealed (and staging) tokens and the data from the diff. There is some kind of "uncommitted merge" that we need to do.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on that

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This already exists today. Assume the following simple branch scenario today:

StagingToken: {A:{foo1:barA}}
SealedTokens: [{B:{foo1:barB,foo2:gooB}} , {C:{foo1:barC,foo2:gooC}}]
Commit: [{foo1:barCommitted,foo2:gooCommitted,,foo3:jooCommitted}]

Reading foo1,foo2,foo3 from this branch yields barA,gooB,jooCommitted. For reading from the uncommitted area, you just skip the last layer with reading the commit.
With compaction the same algo holds, but you need to diff the CompactedMetaRange with the commit for the changes that are already manifested in the compaction


* There's an inefficiency here, as there's an option we'll need to read 2 whole
metaranges to get the diff, like when there's a single change in every
range. The nature of changes to a lakeFS branch is such that changes are
expected in a small number of ranges, and the diff operation is expected
to skip most ranges. Moreover, Pebble caching the ranges should remove the
most costly operation of ranges comparison - fetching them from S3. If
this is still inefficient, we can use the immutability trait[^3] of the
diff: we can calculate the diff result once and cache it.

### Commit Flow

The commit flow is slightly affected by the compaction process. If
compaction never happened, the commit flow is the same as today. If a
compaction happened, apply the changes to the compacted metarange instead of
Comment on lines +155 to +159
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if a compaction happened during a commit? will this fail the commit? If that is the case, are we fine with compaction failing commits?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

the HEAD commit metarange. A successful commit will reset the the
`CompactedMetaRangeID` field.

## Metrics

We can collect the following prometheus metrics:
- Compaction time.
- Number of sealed tokens.

[^1]: The number of sealed tokens increases by one for every HEAD changing
operation (commit, merge, etc.) on the branch. The number of sealed tokens
resets to 0 when one of the operations succeeds. Therefore, a large number of
sealed tokens is unlikely and is a sign of a bigger problem like the
inability to commit.
[^2]: Any additional call to the kv-store might have an indirect performance
impact, like in the case of DynamoDB throttling. We'll treat a performance
impact as a change of flow with a direct impact on a user operation.
[^3]: A diff between two immutable metaranges is also immutable.