Skip to content

feat(v2): metadata index retention policy #4148

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 31 commits into from
Jul 3, 2025

Conversation

kolesnikovae
Copy link
Collaborator

@kolesnikovae kolesnikovae commented Apr 28, 2025

The change adds a new component that is responsible for enforcement of the retention policy.

It's implemented as follows:

  1. On the leader node, we periodically check which partition shards have passed the retention period threshold. The state is accessed via the Leader Read interface: it always reads the local state.
  2. The component creates tombstones for the partition shards (not the blocks they contain) and issues a new raft command – truncate index.
  3. The command handler removes specified partition shards from the index.
  4. Tombstones then follow the standard tombstones's life-cycle: eventually, they are included into a compaction job.
  5. Once the compaction job is added to the scheduler, we remove the associated tombstone object.
  6. A compaction worker deletes all the tenant/shard objects older than the timestamp specified in the tombstone. Since there may be tens of thousands of objects, the operation is time-limited – 15 seconds by default. Without this limit, multiple workers could end up competing for blocks in the same range if multiple tombstones were generated for the same tenant-shard.

I also added a couple of metrics to keep track of the tombstone queue, and a metric to keep track of blocks deleted in compaction worker:

  • pyroscope_metastore_index_tombstones (gauge)
  • pyroscope_compaction_worker_blocks_deleted_total (counter)

The change introduces a new tenant override – retention-period – which defaults to 31 days. It ignores compactor.blocks-retention-period, primarily because that setting is intended for the old compactor, and the metastore won't see it. Additionally, we may want to manage retention of old data independently, e.g., to force cleanup. In v1, there was no default for the retention period, and many users weren't even aware the option existed.

The retention policies are enforced by a separate component – index cleaner. It is disabled by default via -metastore.index.cleanup-interval=0; we should set a reasonable default (e.g., 1m) before release.

On naming: I'm fine with adding a prefix like metastore., though personally I find it a bit confusing (same goes for compactor.). Users shouldn't have to know which component implements the option – the setting applies to the tenant, not the component. That said, I suggest we revisit CLI and config naming more thoroughly when we're preparing for the release.


To implement the change, I removed the in-memory list of partitions due to the risk of violating transaction isolation. This makes it much easier to reason about the version observed by both writers and readers. Performance testing with synthetic cases and real data showed no negative impact in practice.


The PR also fixes a bug where tombstones could be overwritten due to key collisions (based on Raft command index + append timestamp) at the storage level. This led to some leftover data, which will be cleaned up once a retention policy is enabled.


The change should be rolled out in two steps:

  1. Update the version: all metastore replicas and compaction workers must be updated before the next step.
  2. Enable retention policy via setting the cleanup interval -metastore.index.cleanup-interval=1m.

The change is not forward-compatible: if rolled back, the old FSM won't be able to handle new "unknown" Raft commands: the node will crash trying to reply the WAL, and the rollback won't progress.


Finally, after the index truncation, BoltDB won't shrink automatically: if the index was 1Gi, it's size won't change. This should be enforced by enabling the -metastore.snapshot-compact-on-restore option.

@kolesnikovae kolesnikovae changed the title add basic implementation feat(v2): metadata index retention policy Apr 28, 2025
Comment on lines +89 to +93
logger: logger,
config: cfg,
store: s,
shards: newShardCache(cfg.ShardCacheSize, s),
blocks: newBlockCache(cfg.BlockReadCacheSize, cfg.BlockWriteCacheSize),
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I finally removed the in-memory partition list – it's much cleaner now. Otherwise, we would eventually violate transaction isolation guarantees by exchanging state through the list

@kolesnikovae kolesnikovae marked this pull request as ready for review June 30, 2025 08:47
@kolesnikovae kolesnikovae force-pushed the feat/metastore-index-truncation branch from 263d179 to 7ca1578 Compare June 30, 2025 08:57
@kolesnikovae kolesnikovae force-pushed the feat/metastore-index-truncation branch from eac7b8c to 208bf25 Compare June 30, 2025 12:55
@kolesnikovae kolesnikovae force-pushed the feat/metastore-index-truncation branch 2 times, most recently from bd9d32a to 59cfbd8 Compare June 30, 2025 15:47
@kolesnikovae kolesnikovae force-pushed the feat/metastore-index-truncation branch from 59cfbd8 to a453844 Compare June 30, 2025 16:13
@kolesnikovae kolesnikovae force-pushed the feat/metastore-index-truncation branch from b7474a2 to 14a7fa2 Compare July 2, 2025 07:25
Comment on lines +403 to +418
// Handle tombstones asynchronously on the best effort basis:
// if deletion fails, leftovers will be cleaned up eventually.
//
// There are following reasons why we may not be able to delete:
// 1. General storage unavailability: compaction jobs will be
// retried either way, and the tombstones will be handled again.
// 2. Permission issues. In this case, retry will not help.
// 3. Worker crash: jobs will be retried.
//
// A worker is given a limited time to finish the cleanup. If worker
// didn't finish the cleanup before shutdown and after the compaction
// job was finished (so no retry is expected), the data will be deleted
// eventually due to time-based retention policy. However, if no more
// tombstones are created for the shard, the data will remain in the
// storage. This should be handled by the index cleaner: some garbage
// collection should happen in the background.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something important: cleanup is performed on a best-effort basis – inevitably, there will be some junk left in storage. While it's very unlikely, it's not impossible. There's no good way to handle this in the regular flow, since we don't want to retry jobs indefinitely, and some errors may be permanent.

My idea is to implement an additional retention policy to reconcile the metadata index with the actual data in object storage. This would involve iterating over the "directories" in object storage and reconciling them with the index tree. Since this could be a relatively expensive operation, it should be handled separately from the regular time-based retention policy and run on a much less frequent interval.

This approach would also help in cases where data is deleted manually or removed by the object storage's own retention policies. Currently, we ignore missing blocks during queries and compaction, so it doesn't cause failures – but it's still not ideal for the index and storage to drift too far out of sync.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something important: cleanup is performed on a best-effort basis – inevitably, there will be some junk left in storage. While it's very unlikely, it's not impossible. There's no good way to handle this in the regular flow, since we don't want to retry jobs indefinitely, and some errors may be permanent.

This is where I think using Raft hurts us more than we benefit from it.

In theory we can have each replica remove expired state locally and have a separate routine (or brand new component) remove data from object storage, as we seem to be iterating over data in the bucket anyway. We will not be consistent at the edge, but combined with a read path adjustment to ignore any data beyond the retention period it should work.

Not saying we should change it now, but something to consider in the future to reduce the cognitive load around what compaction jobs are responsible for.

Copy link
Collaborator Author

@kolesnikovae kolesnikovae Jul 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is where I think using Raft hurts us more than we benefit from it.

Could you please elaborate on this? I don't see how Raft is related here. I suspect you might be referring to the idea that shifting the "source of truth" from object storage to the metadata index could have downsides. If that's the case, I'm struggling to see exactly how that would hurt us – could you clarify?

In theory we can have each replica remove expired state locally and have a separate routine (or brand new component) remove data from object storage, as we seem to be iterating over data in the bucket anyway. We will not be consistent at the edge, but combined with a read path adjustment to ignore any data beyond the retention period it should work.

Not saying we should change it now, but something to consider in the future to reduce the cognitive load around what compaction jobs are responsible for.

I believe that would be a mistake, for several reasons:

  • First, any change to the index must go through consensus. Otherwise, you risk ending up with an inconsistent state. There are many scenarios where this kind of approach breaks down – a basic example is handling retention period changes.
  • Second, you'd need to perform heavy transactions (e.g., you want to delete 1M entries from the index). What's worse, these would be write transactions that block the regular write path.
  • Lastly, I don't see how introducing a new component helps here. It would have to synchronize the index with object storage without having direct access to the index itself – which introduces complexity and risk without clear benefit.

The metastore must never be used in this way. Please avoid violating separation of concerns: the metastore compaction planner (and the index cleaner, transitively) are responsible for making decisions, while the compaction workers are responsible for executing them.

What I'm proposing above specifically concerns shard and tenant directories – not individual blocks. As for dropping block entries from the index corresponding to individually deleted objects, I don't believe we need that at all. And if we ever do, it should be handled during index checkpointing or compaction, where a worker has access to the relevant parts of the index.

I hope this clarifies what compaction jobs are and aren't responsible for. If not, I'm happy to elaborate further.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First, any change to the index must go through consensus. Otherwise, you risk ending up with an inconsistent state. There are many scenarios where this kind of approach breaks down – a basic example is handling retention period changes.

My point is that data (state) that is not queryable doesn't have to conform to the same consistency rules. Especially if we can reduce the system complexity.

Second, you'd need to perform heavy transactions (e.g., you want to delete 1M entries from the index). What's worse, these would be write transactions that block the regular write path.

I don't know where this comes from, I didn't say we'd be removing millions of entries at once, that is an implementation detail.

Lastly, I don't see how introducing a new component helps here. It would have to synchronize the index with object storage without having direct access to the index itself – which introduces complexity and risk without clear benefit.

That was just an optional idea for cleaning up data in the bucket without using the metastore at all. Not related to reconciliation at all. There are many ways to do reconciliation if we decide we need it.

The metastore must never be used in this way. Please avoid violating separation of concerns: the metastore compaction planner (and the index cleaner, transitively) are responsible for making decisions, while the compaction workers are responsible for executing them.

The lines are already blurred with compaction jobs used beyond what they were meant for. My proposal would actually reduce responsibilities. If metastore nodes enforce the retention policy by truncating their state without going through compaction, all that is left is cleaning up the bucket which could be done with 0 lines of code (e.g., cloud provider lifecycle rules) or a dedicated component if we want to be more precise.

What I'm proposing above specifically concerns shard and tenant directories – not individual blocks. As for dropping block entries from the index corresponding to individually deleted objects, I don't believe we need that at all. And if we ever do, it should be handled during index checkpointing or compaction, where a worker has access to the relevant parts of the index.

I had to re-read what I wrote, but I never said anything about removing individual blocks. I will not respond to the rest of the imagined scenario :)

Copy link
Collaborator Author

@kolesnikovae kolesnikovae Jul 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aleks, I'm sorry but I do not understand your motivation. I'm trying to understand your ideas better and explain my own ideas where I see misunderstanding.

1. Why we do not rely on the storage provider retention policies.

  1. This is the current approach – we already handle it ourselves in compactors.
  2. This is what users asked for many times.
  3. We strive for zero ops. Maintaining provider-specific configurations is a chore. Making them tenant-specific, flexible, and versatile quickly becomes a burden – operationally and in terms of complexity.
  4. Not all storage providers support this. The most popular one – local filesystem – does not have it.
  5. Retention policies are to be extensible: one feature users asked for in the past was size-based retention policy – can be implemented easily. Another use case: deleting tenants – now you can create an override for the user and we remove its data (I would, however, consider implementing use-case specific tombstone kind, but that's not necessary).

Copy link
Collaborator Author

@kolesnikovae kolesnikovae Jul 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2. Why we handle storage cleanup together with compaction

  • Retention policies specify what should be retained and what should not (= cleaned up).
  • Storage cleanup is part of the storage compaction domain.
  • Compaction workers perform compaction of the storage.

This is how it's currently handled – compactors are responsible for it in the current version: storage cleanup is part of the compaction process.

<...> the metastore compaction planner (and the index cleaner, transitively) are responsible for making decisions, while the compaction workers are responsible for executing them.

The lines are already blurred with compaction jobs being used beyond their original purpose.

Please clarify this point. I designed the solution, and tombstone handling was always intended to be part of the compaction process.

What I'm proposing above specifically concerns shard and tenant directories – not individual blocks. As for dropping block entries from the index corresponding to individually deleted objects, I don't believe we need that at all. And if we ever do, it should be handled during index checkpointing or compaction, where a worker has access to the relevant parts of the index.

I had to re-read what I wrote, but I never said anything about removing individual blocks. I will not respond to the rest of the imagined scenario :)

I found your sentiment inappropriate. This is not how we make progress. Let's please keep the conversation professional. If I misunderstood your point – I'm sorry. I'm genuinely trying to understand your idea and explain mine in return, especially where I see a potential disconnect. In the message you're quoting, I clarified my own idea as I wasn't sure you fully understood it.

I can remember that in early versions we tried to handle cleanup in the metastore – in exactly this way – performing I/O in heavy long-running transactions, and it was a very bad experience: it didn't work normally even in a small dev environment. I thought you were suggesting reviving that idea. This is the sentence that made me think so:

In theory we can have each replica remove expired state locally and have a separate routine (or brand new component) remove data from object storage, as we seem to be iterating over data in the bucket anyway.

Honestly, I can't see how it helps with the problems outlined in the very first comment from me in the thread. Furthermore, your proposal actually adds more ambiguity and complexity:

My proposal would actually reduce responsibilities. If metastore nodes enforce the retention policy by truncating their state without going through compaction, all that is left is cleaning up the bucket which could be done with 0 lines of code (e.g., cloud provider lifecycle rules) or a dedicated component if we want to be more precise.

We do not consider provider-specific mechanisms due to the reasons outlined above in p.1. and below.

  • Would the component be responsible only for time-based retention policies, or for all storage cleanup? For example, how would we delete tenant data (or parts of it)?
  • Would we handle the deletion of compacted blocks the same way we do now – or would we have different flows?
  • How would we keep the metadata state in sync with the storage – or would we?
  • How would we keep retention policies in sync between the metastore and the dedicated component?
  • How would the dedicated component distribute or orchestrate the work?
  • How would the proposed solution reduce cognitive load?

Without answers to these questions, it's hard to see how the proposed approach can be seriously evaluated.

Copy link
Collaborator Author

@kolesnikovae kolesnikovae Jul 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3. Why we must not modify FSM state outside Raft

My point is that data (state) that is not queryable doesn't have to conform to the same consistency rules.

Why? I don't know any exceptions from the rules. Please read https://raft.github.io/raft.pdf – you propose to introduce inconsistency between replicated state machines making them non-deterministic. At this point you can't reason about the system behaviour.

Furthermore, you can't reason about what data is queryable and what isn't: retention policy might change, and the data you considered outdated could be partially deleted (differently across replicas). You would need a protocol to resolve the conflict.

Especially if we can reduce the system complexity.

How? You're proposing moving deletion from compaction workers to metastore or a separate component – I do not understand what it simplifies. We still have to perform same operations. Furthermore, we have a simple way to distribute the work – you're proposing adding another one?

You're proposing to skip Raft consensus here to simplify things, but I don't see how that actually simplifies anything. In fact, I'd argue it does not simplify the system at all – if you want to modify the state outside the standard FSM and Raft interfaces, you'd need to coordinate with FSM operations (such as snapshot and restore) and Raft writes on your own.


I don't believe that correctness can be traded for simplicity in this context.

Rules – whether architectural, contractual, or domain-specific – exist for a reason: to ensure correctness, consistency, and predictability of behavior. If we're intentionally breaking or ignoring them, we need to be very deliberate and have strong justification.

Copy link
Contributor

@aleks-p aleks-p Jul 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found your sentiment inappropriate. This is not how we make progress. Let's please keep the conversation professional. If I misunderstood your point – I'm sorry. I'm genuinely trying to understand your idea and explain mine in return, especially where I see a potential disconnect. In the message you're quoting, I clarified my own idea as I wasn't sure you fully understood it.

My impression was that you were making remarks against something I didn't propose. Apologies if my comment came across as rude, that was not my intention.

My original comment is on how our solution is made more complex (in my opinion) and limited by the fact that we have to plan and synchronize the work to remove data that is not needed. My proposal was not a full blown design, but more wishful thinking (a hope) that we can simplify certain aspects. I should have been more clear about this.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see how we can avoid planning and synchronization. Even in eventually consistent systems, some coordination is necessary – for example, consider how cleanup is handled in the current version, starting from building the bucket index in store-gateways. The very definition of the task – "remove data that is not needed" – implies a potential conflict: in a distributed system, each participant has its own view of what data is not needed. We can address this in two ways: via consensus or via conflict resolution. In v2, we don't resolve such conflicts; instead, we use consensus to prevent them from happening in the first place.

Regarding limitations: one common user request we receive is for data backfilling – uploading historical data. This is a significant and particularly difficult task to support with the current solution. It's not impossible, but certainly challenging. One of the design goals of the new system was to allow for this use case, and the current retention policy implementation does allow for it.

This brings us back to the idea that we can iterate over objects in storage and delete the stale ones. But it's not that simple: we can't reliably determine whether an object is stale without accessing its metadata (or some kind of index), because the content may span a time range that has not yet passed the retention threshold. Right now, we assume that a 24-hour grace period will cover such cases – even though we have no guarantees (see FIXME; this will need to be addressed at some point). This assumption breaks down if we allow uploading historical data – which we do want to support.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The very definition of the task – "remove data that is not needed" – implies a potential conflict: in a distributed system, each participant has its own view of what data is not needed. We can address this in two ways: via consensus or via conflict resolution.

While I agree in general, and I understand the challenges of data processing in distributed systems, I see things a bit differently in this particular use case. If we ensure that "expired" data is not returned by the read path, how we remove the actual data from each replica and the durable storage becomes less important and can be simplified. Obviously, this implies that we make sure we don't break things or slow things down in the process.

As mentioned earlier, I am not advocating for making any changes to what we have. We can agree to disagree.

Copy link
Contributor

@aleks-p aleks-p left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At a high level this seems to be in line with the current approach for cleaning up data.

I can't review the details due to the sheer size of the PR but I trust that the tests cover the important bits and having it run in the dev environment would give us the confidence in moving this forward.

Comment on lines +403 to +418
// Handle tombstones asynchronously on the best effort basis:
// if deletion fails, leftovers will be cleaned up eventually.
//
// There are following reasons why we may not be able to delete:
// 1. General storage unavailability: compaction jobs will be
// retried either way, and the tombstones will be handled again.
// 2. Permission issues. In this case, retry will not help.
// 3. Worker crash: jobs will be retried.
//
// A worker is given a limited time to finish the cleanup. If worker
// didn't finish the cleanup before shutdown and after the compaction
// job was finished (so no retry is expected), the data will be deleted
// eventually due to time-based retention policy. However, if no more
// tombstones are created for the shard, the data will remain in the
// storage. This should be handled by the index cleaner: some garbage
// collection should happen in the background.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something important: cleanup is performed on a best-effort basis – inevitably, there will be some junk left in storage. While it's very unlikely, it's not impossible. There's no good way to handle this in the regular flow, since we don't want to retry jobs indefinitely, and some errors may be permanent.

This is where I think using Raft hurts us more than we benefit from it.

In theory we can have each replica remove expired state locally and have a separate routine (or brand new component) remove data from object storage, as we seem to be iterating over data in the bucket anyway. We will not be consistent at the edge, but combined with a read path adjustment to ignore any data beyond the retention period it should work.

Not saying we should change it now, but something to consider in the future to reduce the cognitive load around what compaction jobs are responsible for.

Comment on lines -120 to -122
if !shard.Overlaps(q.query.startTime, q.query.endTime) {
continue
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for anyone else reading this, the check got moved to the shardIterator

@kolesnikovae kolesnikovae merged commit bbb96ea into main Jul 3, 2025
32 of 33 checks passed
@kolesnikovae kolesnikovae deleted the feat/metastore-index-truncation branch July 3, 2025 07:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants