Skip to content

MINOR: Migrate CoordinatorLoaderImplTest from Scala to Java #20089

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: trunk
Choose a base branch
from

Conversation

LoganZhuZzz
Copy link
Contributor

Summary of Changes

  • Rewrote both CoordinatorLoaderImpl and CoordinatorLoaderImplTest in Java, replacing their original Scala implementations.
  • Removed the direct dependency on ReplicaManager and replaced it with functional interfaces for partitionLogSupplier and partitionLogEndOffsetSupplier
  • Preserved original logic and test coverage during migration.

@github-actions github-actions bot added triage PRs from the community core Kafka Broker build Gradle build or GitHub Actions group-coordinator labels Jul 2, 2025
Copy link
Collaborator

@m1a2st m1a2st left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @LoganZhuZzz for this patch, left some comments

build.gradle Outdated
@@ -1660,9 +1660,11 @@ project(':coordinator-common') {
implementation libs.slf4jApi
implementation libs.metrics
implementation libs.hdrHistogram
implementation libs.scalaLibrary
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should add scalaLibrary to the :coordinator-common module.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the feedback.

<allow pkg="org.apache.kafka.server.util" />
<allow pkg="org.apache.kafka.storage.internals.log" />
<allow pkg="org.apache.kafka.test" />
<allow pkg="org.apache.kafka.timeline" />
<allow pkg="org.HdrHistogram" />
<allow pkg="scala" />
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the feedback. I’m keeping coordinator-common as a pure Java module, but since the tests depend on the Scala-based ReplicaManager, we still need scala, like OptionConverters in test scope.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you prefer that we wait until ReplicaManager is refactored into Java before removing all Scala dependencies from this module, or would it make more sense to move these tests elsewhere, or wrap the Scala APIs in Java-friendly facade?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that ReplicaManager is still required for the tests. cc @TaiJuWu @chia7712

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, ReplicaManager is needed here so we need to wait @chia7712 feedback.

*/
public class CoordinatorLoaderImpl<T> implements CoordinatorLoader<T> {

private static final Logger LOG = LoggerFactory.getLogger(CoordinatorLoader.class);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the logger class be CoordinatorLoaderImpl?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the feedback.

Comment on lines -30 to -31
<!-- no one depends on the server -->
<disallow pkg="kafka" />
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we do this change?
From my understanding, this prevents developer to depend on core module again in migrating process.

@github-actions github-actions bot removed the triage PRs from the community label Jul 3, 2025
Comment on lines 164 to 165
if (records instanceof MemoryRecords) {
return (MemoryRecords) records;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (records instanceof MemoryRecords) {
return (MemoryRecords) records;
if (records instanceof MemoryRecords memoryRecords) {
return memoryRecords;

@@ -610,7 +611,8 @@ class BrokerServer(
)
val loader = new CoordinatorLoaderImpl[CoordinatorRecord](
time,
replicaManager,
tp => toJava(replicaManager.getLog(tp)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
tp => toJava(replicaManager.getLog(tp)),
tp => replicaManager.getLog(tp).toJava,

@@ -640,7 +642,8 @@ class BrokerServer(
val serde = new ShareCoordinatorRecordSerde
val loader = new CoordinatorLoaderImpl[CoordinatorRecord](
time,
replicaManager,
tp => toJava(replicaManager.getLog(tp)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
tp => toJava(replicaManager.getLog(tp)),
tp => replicaManager.getLog(tp).toJava,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback. I plan to submit all the fixes together in a single update.

Copy link
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@LoganZhuZzz thanks for this patch!


stats.readAtLeastOneRecord = fetchDataInfo.records.sizeInBytes() > 0;

MemoryRecords memoryRecords = toReadableMemoryRecords(tp, fetchDataInfo.records, buffer);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the buffer should be reusable, right? in the scala code, we keep reusing the "larger" buffer. By contrast, in this version, the buffer is recreated many times.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing that out.

I’m currently considering two ways to restore this behavior:

  1. Introduce a simple wrapper class for the buffer, allowing it to be updated directly within the method.
  2. Modify toReadableMemoryRecords to return a tuple containing both the MemoryRecords and the potentially updated buffer.

Would you have a preference between the two approaches?

throw new RuntimeException(msg, ex);
}

if (coordinatorRecordOpt.isPresent()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please use ifPresent instead

return new ReplayResult(currentOffset, previousHighWatermark);
}

private void completeLoadFuture(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the simple implementation takes 5 arguments, and it has only one usage. Perhaps it should be inlined instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback. I’ve fixed these issues. PTAL


stats.readAtLeastOneRecord = fetchDataInfo.records.sizeInBytes() > 0;

MemoryRecords memoryRecords = toReadableMemoryRecords(tp, fetchDataInfo.records, bufferHolder);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need BufferHolder? Could you please try to take the buffer from MemoryRecords?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
build Gradle build or GitHub Actions ci-approved core Kafka Broker group-coordinator
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants