Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KafkaIO] Fix average record size data race and backlog estimation #34165

Open
wants to merge 7 commits into
base: master
Choose a base branch
from

Conversation

sjvanrossum
Copy link
Contributor

@sjvanrossum sjvanrossum commented Mar 4, 2025

The offset gap ratio may artificially shrink the backlog if consumers can't catch up to the tail of an expiring topic. This may cause runners to trigger a downscaling event which worsens the issue.

MovingAvg has been modified to atomically write the accumulated state since concurrent normal loads/stores of longs/doubles may tear. The numUpdates field is only used by the writer and can be kept as non-volatile, but the update method ensures that normal loads/stores on numUpdates are ordered in relation to acquiring loads and releasing stores on avg. To prevent false sharing I've padded the class since there may be tens to hundreds of instances of the accumulator and updates happen per consumed record.

The JMH benchmark I've added shows a slight uplift in average time per op for both reads and writes compared to the current implementation.

Results of task :sdks:java:io:kafka:jmh:jmh on a t2d-standard-60 Cloud Workstation:

Benchmark                                 Mode  Cnt       Score       Error  Units
KafkaIOUtilsBenchmark.Atomic              avgt   15   50693.751 ±  4937.770  ns/op
KafkaIOUtilsBenchmark.Atomic:atomicRead   avgt   15    5577.357 ±  1135.962  ns/op
KafkaIOUtilsBenchmark.Atomic:atomicWrite  avgt   15  140926.539 ± 14847.095  ns/op
KafkaIOUtilsBenchmark.Plain               avgt   15   65018.754 ±  9814.457  ns/op
KafkaIOUtilsBenchmark.Plain:plainRead     avgt   15    6658.736 ±   288.912  ns/op
KafkaIOUtilsBenchmark.Plain:plainWrite    avgt   15  181738.789 ± 29403.883  ns/op

Note that this test likely does not highlight the effect of padding since it doesn't construct a large pool of accumulators.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

Copy link
Contributor

github-actions bot commented Mar 4, 2025

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@sjvanrossum
Copy link
Contributor Author

Run Spotless PreCommit

Copy link
Contributor

github-actions bot commented Mar 5, 2025

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @kennknowles for label java.
R: @Abacn for label build.
R: @Abacn for label io.
R: @johnjcasey for label kafka.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

Copy link
Member

@kennknowles kennknowles left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy for the improvement. I may be misunderstanding what is strictly necessary to make your changes work as intended but TL;DR the inheritance all seems extraneous - one of them seems like inlining is equivalent and clearer, while the other seems like it is more clearly expressed as a field.

* limitations under the License.
*/

plugins { id 'org.apache.beam.module' }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use build.gradle.kts for new files

*/
@SuppressFBWarnings("UUF_UNUSED_FIELD")
private static class MovingAvgPadding {
byte p000, p001, p002, p003, p004, p005, p006, p007;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this a reaction to a measured slowdown? I'm not like super opposed to having something like this, but I expect it's relevance and effectiveness to wane as the codebase evolves, unless we have some fairly deep and clear understanding of exactly what needs to be maintained.


// The accumulator's fields should be padded to at least 128 bytes (at least 1 or 2
// cache lines).
private static class MovingAvgFields extends MovingAvgPadding {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the padding not just inlined here? It would be clearer what is going on and saves a layer of inheritance.

* Sanity of visibility is only useful when the writer thread changes since avg is the only field that can be shared between multiple concurrent threads.
*/
@SuppressFBWarnings("UUF_UNUSED_FIELD")
public static final class MovingAvg extends MovingAvgFields {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class should simply have a private MovingAvgFields member.

return Double.longBitsToDouble(avg);
}

protected void setAvg(final double value) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just make these package-private and call them from the MovingAvg class. (see below: use a field instead of inheritance)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants