Skip to content

[server] Do not insert records into transient record cache while the …#14

Closed
ZacAttack wants to merge 5 commits intolinkedin:masterfrom
ZacAttack:master
Closed

[server] Do not insert records into transient record cache while the …#14
ZacAttack wants to merge 5 commits intolinkedin:masterfrom
ZacAttack:master

Conversation

@ZacAttack
Copy link
Copy Markdown
Contributor

@ZacAttack ZacAttack commented Sep 27, 2022

…drainer queue is full

This is a tactical fix. This change prevents the transient record from growing if the memory budget for the drainer queue is close to full or exhausted.

Summary, imperative, start upper case, don't end with a period

Resolves #XXX

How was this PR tested?

Does this PR introduce any user-facing changes?

  • No. You can skip the rest of this section.
  • Yes. Make sure to explain your proposed changes and call out the behavior change.

…drainer queue is full

This is a tactical fix.  This change prevents the transient record from growing
if the memory budget for the drainer queue is close to full or exhausted.
}

public void blockOnDrainerCapacity() {
while (storeBufferService.getTotalRemainingMemory() < 1000) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we replace it with wait-notify?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree... would be great to avoid sleeping, if possible. I've been meaning to get rid of sleeps (e.g., did it in 6db7c05) and I hope we continue getting rid of them, rather than add more.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do

// either. So, there is no need to tell the follower replica to do anything.
}
} else {
blockOnDrainerCapacity();
Copy link
Copy Markdown
Contributor

@sushantmane sushantmane Sep 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CallingblockOnDrainerCapacity on almost every record will impact performance negatively as it calls AtomicLong::get and these atomic variables are being accessed/updated by threads on different cores. It will be interesting to see the ingestion perf after this change.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. We're already examining and updating the capacity for every submission to the drainer, though admittedly, this method calls every single drainer's atomic variable. We could make the argument of augmenting it to only refer to the specific drainer variable thats related to the ingestion task. That would free it up perhaps.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nevermind, it's an array not a map, so getting the aggregate is probably the best way.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe feasible to add a function to the StoreBufferService along the lines of boolean isDrainerNearCapacity(ConsumerRecord<KafkaKey, KafkaMessageEnvelope> consumerRecord, int subPartition) which would then be able to call SBS::getDrainerIndexForConsumerRecord and look up that specific drainer's capacity. Could also be achieved by passing just a String consumedTopic rather than a full record, given a minor refactoring within SBS.

Copy link
Copy Markdown
Contributor

@sushantmane sushantmane Sep 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're already examining and updating the capacity for every submission to the drainer

IIUC, this happens in the drainer threads which are limited in number; however, we have far more shared consumer threads than the drainer threads. Hence the impact of updating atomic variables millions of times per second might be even more pronounced.

Maybe I'm being paranoid here. Let's keep this code change as one of the reference points if we see a substantial impact on ingestion performance.

Copy link
Copy Markdown
Contributor

@FelixGV FelixGV left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment about the unmodified code (which unfortunately GH does not allow me to comment on directly...), I think we should consider renaming the other overload of PCS::setTransientRecord (the one where you do not call the new blocking function before) so that it's more clear that it sets the transient record to null...? It would also make the intent more clear as to why we're not blocking before that one (as presumably a delete may relieve memory pressure? – although that is actually uncertain... a delete heavy workload may in fact still cause a lot of garbage, even if the value part is null...).

}

public void blockOnDrainerCapacity() {
while (storeBufferService.getTotalRemainingMemory() < 1000) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree... would be great to avoid sleeping, if possible. I've been meaning to get rid of sleeps (e.g., did it in 6db7c05) and I hope we continue getting rid of them, rather than add more.

Comment on lines 566 to 567
maybeBlockOnDrainerCapacity();
partitionConsumptionState.setTransientRecord(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it ideal to precede every call to PartitionConsumptionState::setTransientRecord with this new function? Would it make sense to include this functionality inside of the setTransientRecord function itself? This may require passing a closure to evaluate the buffer service's remaining capacity into the PCS, which is maybe not ideal from an encapsulation standpoint. On the other hand, having this convention that calls to setTransientRecord should always be preceded by maybeBlockOnDrainerCapacity seems a bit fragile as well. Maybe there should be a function in LFSIT to wrap both of these functions?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think we should do the closure. Will add.

}

public void maybeBlockOnDrainerCapacity() {
while (storeBufferService.getTotalRemainingMemory() < 1000) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this 1000 be configurable? Or even if left un-configurable, should it at least be made into a constant?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was torn on adding another tunable parameter. But maybe theres a happy middle? Maybe instead of using a strict measure, we can change it to wait if it's like 90-95% at capacity? That way it's at least tunable relative to the size of the configured drainer, but doesn't add extra noise.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why block at some margin below the actual limit? Can we instead block if (remaining_capacity - incoming_payload_size < 0) or something like that? This is not fully precise since the memory overhead of the transient record is more than just the payload size, but probably close enough to not matter?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the issue is due to drainer records being uncompressed and producer buffers are not. just checking drainer capacity still could lead very memory usage. We probably should make tunable and add a metric to check the transient cache usage.

@ZacAttack ZacAttack closed this Sep 27, 2022
@ZacAttack
Copy link
Copy Markdown
Contributor Author

This method won't actually solve our problem. We'll need to do a more in depth fix.

misyel added a commit to misyel/venice that referenced this pull request Apr 22, 2026
…opilot

comments

Adds `checkRollbackOriginVersionCapacityForNewPush` which rejects a new push
while any ROLLED_BACK (or parent-side rollback-origin PARTIALLY_ONLINE)
version
is still within its retention window. The check runs on both the parent
(`VeniceParentHelixAdmin.incrementVersionIdempotent`) and the child
(`VeniceHelixAdmin.addVersion`), so the rejection surfaces synchronously to
the VPJ instead of failing only in async admin-message consumption on the
child.

Integration tests cover both the block (within retention) and the release
(after retention expires) paths.

Also addresses Copilot comments on PR linkedin#2688:
- linkedin#11 `assumeRolledBackIfUnreachable` is now empty for full-cluster rollbacks
  so unreachable regions don't inflate the ROLLED_BACK count.
- linkedin#14 If the region filter contains only unknown regions, skip the parent
  status update instead of falling through into full-cluster behavior.
- linkedin#15 If zero regions confirm ROLLED_BACK, leave parent status unchanged
  rather than downgrading to PARTIALLY_ONLINE on no evidence.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants