Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-12689: Remove exactly_once / exactly_once_beta #1

Merged
merged 8 commits into from Sep 19, 2022

Conversation

lucasbru
Copy link
Owner

Removes all code for handling the processing_guarantee settings exactly_once and exactly_once_beta, which have been deprecated since 3.0.0 and can be removed with the release of 4.0.0.

Some tests were adapted to use exactly_once_v2 instead of exactly_once_beta or exactly_once, to retain the same coverage of the code bas.

exactly_once / exactly_once_beta are rejected with an error message to upgrade to exactly_once_v2.

EosV2UpgradeIntegrationTest is removed, since it cannot be implemented anymore with only one implementation available.

More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.

Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@lucasbru
Copy link
Owner Author

Some notes:

  • I kept StreamsConfig.EXACTLY_ONCE and StreamsConfig.EXACTLY_ONCE_BETA around with default visibility to print have a decent error message when they are used, but I don't have a strong opinion in case you want it to be removed.
  • I did not make changes to the Kafka Client API, im particular removal of the deprecated version of sendOffsetsToTransaction. Not sure if it belongs in this change.
  • Some return values in ActiveTaskCreator are now too general. The plan is to move the threadProducer out of the ActiveTaskCreator, and removing all the wrapping methods "closeThreadProducerIfNeeded", "producerMetrics", "producerClientIds", etc. So I didn't clean this up yet in this PR.
  • EosV2UpgradeIntegrationTest is removed completely, because it cannot be written as a unit test anymore. We will have to rewrite a similar system test.
  • In some places, EOS_beta is still used in names. I will replace these by EOS_v2 later.
  • Some reordering of tests in StreamProducerTest.java I did not do yet, as it would make reviewing harder
  • I suppose we need to create a KAFKA ticket for this?

Copy link

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EosV2UpgradeIntegrationTest is removed completely, because it cannot be written as a unit test anymore. We will have to rewrite a similar system test.

Yes I think that's very important to make sure rolling upgrades still works, we can do that in a separate PR.

I kept StreamsConfig.EXACTLY_ONCE and StreamsConfig.EXACTLY_ONCE_BETA around with default visibility to print have a decent error message when they are used

I think it's okay to keep these (potentially forever..) for decent error messages

I did not make changes to the Kafka Client API, im particular removal of the deprecated version of sendOffsetsToTransaction.

Yup, do not need to worry about it.

I suppose we need to create a KAFKA ticket for this?

Yes!

@@ -295,8 +295,9 @@ <h2 class="anchor-heading"><a id="streams_processing_guarantee" class="anchor-li
which requires broker version 2.5.0 or newer.
This implementation is more efficient, because it reduces client and broker resource utilization, like client threads and used network connections,
and it enables higher throughput and improved scalability.
As of the 3.0.0 release, the first version of exactly-once has been deprecated. Users are encouraged to use exactly-once v2 for

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not for this file but for a different file: we should also edit the upgrade-guide.html under the API changes in 4.0 section, noting those configs are now removed; also mention how users still with those configs can upgrade with rolling bounces to the V2 config values.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}
if (processingModeConfig.equals(EXACTLY_ONCE_BETA)) {
throw new ConfigException(String.format("Configuration parameter `%s` was removed in the 4.0.0 release. " +
"Please use `%s` instead. Note that this requires broker version 2.5+ so you should prepare "

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If users are already using BETA then their brokers should be on 2.5 already, since the semantics of BETA and V2 are exactly the same. I think we should just say "... Please use .. instead which is the new name for the same processing semantics".

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

if (processingModeConfig.equals(EXACTLY_ONCE)) {
throw new ConfigException(String.format("Configuration parameter `%s` was removed in the 4.0.0 release. " +
"Please use `%s` instead. Note that this requires broker version 2.5+ so you should prepare "
+ "to upgrade your brokers if necessary.", EXACTLY_ONCE, EXACTLY_ONCE_V2));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also need to mention while changing the flag users should also follow the upgrade guide doing rolling bounces.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

} catch (final RuntimeException e) {
throw new StreamsException("[" + id + "] task producer encounter error trying to close.", e, id);
}
try {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can change the function name removing IfNeeded as well, ditto for similar function names.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. I didn't do this yet, because I was thinking about moving StreamsProducer out of ActiveTaskCreator, at which point all of these wrapping functions would go.

@@ -43,7 +43,6 @@
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.internals.StreamsConfigUtils;
import org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode;
import org.apache.kafka.streams.processor.TaskId;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The major purpose of this StreamProducer is to wrap both task/thread producer for the EOSv1/v2 usages, I think we can consider removing and inlining this class completely in a follow-up PR.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't StreamProducer still useful to have the translation between KafkaProducer exceptions and StreamsExceptions all in one place?

@@ -152,10 +152,6 @@ Consumer<byte[], byte[]> mainConsumer() {
return mainConsumer;
}

StreamsProducer streamsProducerForTask(final TaskId taskId) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not for this PR itself, but: we can do some further simplifications around TaskManager, e.g. the logic around shouldCommitAdditionalTasks was mainly for EOS-v1 since only in that case we may not want to commit for all tasks, for ALOS and EOS-v2, whenever we want to commit, we should always commit all tasks anyways. With V1 removed we should consider reducing the complexity for such cases as well, i.e. when we commit, we are always trying to commit all active tasks except for the error handling case.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. I will create a separate PR for this.

@guozhangwang
Copy link

Also cc @cadonna @lihaosky for another look.

Copy link
Collaborator

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lucasbru Thanks for the PR!

I left a couple of comments.

Note that a PR that addresses Kafka should have a title that starts with a ticket number of the Apache Kafka Jira. This PR's title starts with a ticket number of the Confluent Jira.
If you haven't please read the following pages:

I understand that your development branch in your fork is named 4.0. Note that the 4.0 release branch in apache/kafka will also be named 4.0. You might run into naming conflicts.

@Deprecated
public static final String EXACTLY_ONCE = "exactly_once";
static final String EXACTLY_ONCE = "exactly_once";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't you even use private here and below?

Copy link
Owner Author

@lucasbru lucasbru Sep 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't because I am using it in StreamsConfigTest.java.

However, since you seem to be concerned about polluting the codebase forever with these outdated values, I removed these and hardcoded the values in StreamsConfigTest and the check in StreamsConfig.

Comment on lines 298 to 300
As of the 3.0.0 release, the first version of exactly-once has been deprecated, and with the 4.0.0 release,
it has been removed from Kafka Streams. Users who want to use exactly-once processing with Kafka streams 4.0.0
or newer, upgrade to broker version 2.5.0 or newer.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
As of the 3.0.0 release, the first version of exactly-once has been deprecated, and with the 4.0.0 release,
it has been removed from Kafka Streams. Users who want to use exactly-once processing with Kafka streams 4.0.0
or newer, upgrade to broker version 2.5.0 or newer.
As of the 3.0.0 release, the first version of exactly-once has been deprecated, and with the 4.0.0 release,
it has been removed from Kafka Streams.

It is already mentioned in the first sentence of the paragraph that users need a broker version 2.5.0 or newer.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines 588 to 589
consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG),
equalTo(READ_COMMITTED.name().toLowerCase(Locale.ROOT))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: 4 spaces are enough for indentation.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines 630 to 631
consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG),
equalTo(READ_COMMITTED.name().toLowerCase(Locale.ROOT))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also here 4 spaces are enough:

Suggested change
consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG),
equalTo(READ_COMMITTED.name().toLowerCase(Locale.ROOT))
consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG),
equalTo(READ_COMMITTED.name().toLowerCase(Locale.ROOT))

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -1331,6 +1307,20 @@ private void validateRackAwarenessConfiguration() {
});
}

private static void validateProcessingConfiguration(final String processingModeConfig) {
if (processingModeConfig.equals(EXACTLY_ONCE)) {
Copy link
Collaborator

@cadonna cadonna Sep 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if we should just remove StreamsConfig.EXACTLY_ONCE and StreamsConfig.EXACTLY_ONCE_BETA and not throw a specific ConfigException for those value, but just a generic one. When would we remove the specific ConfigException in the future?
We had a deprecation period, we have the removal documented in the docs, I think that should be enough.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both solutions are fine with me. I suppose its a trade-off between a courtesy to the users and a cleaner codebase. @guozhangwang wdyt?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My personal preference: even if we remove the deprecated strings, I still slightly preferred to log a meaningful message to users indicating that those config values are not used anymore, with the cost that we need to have those hard-coded values forever in the codebase.

But in either way, I think we can still throw a ConfigException but a generic exception, what's the motivation for not doing so? @cadonna

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guozhangwang My proposal was to throw a generic ConfigException instead of a specific one. I have never intended not to throw a ConfigException.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see, thanks @cadonna .


@RunWith(Parameterized.class)
@Category({IntegrationTest.class})
public class EosV2UpgradeIntegrationTest {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should verify if we have a system test that might replace this test. If not we might consider creating such a system test.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. As noted above, this will be done in a follow-up PR.


@Parameterized.Parameter
public String eosConfig;
public String eosConfig = StreamsConfig.EXACTLY_ONCE_V2;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would make sense to replace eosConfig with StreamsConfig.EXACTLY_ONCE_V2 in these tests. I already imagine future me wondering about this useless indirection.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, Done

Comment on lines 118 to 119
final StreamsProducer eosBetaStreamsProducerWithMock = new StreamsProducer(
eosBetaConfig,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
final StreamsProducer eosBetaStreamsProducerWithMock = new StreamsProducer(
eosBetaConfig,
final StreamsProducer eosV2StreamsProducerWithMock = new StreamsProducer(
eosV2Config,

We want to get rid of the "beta".

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to keep such renamings out of the first PR to keep it reviewable (and added a note above), but maybe it was ill-conceived in this case. Done.

Comment on lines -330 to -761
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject());
expectLastCall().once();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a heads up: We are currently migrating from EasyMock to Mockito. There is already PR apache#12607 up to migrate this test. That means the all these mock setups will change in the near future.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Yes, I guess there will be lots of conflicts anyways, once we are ready to push 4.0 to trunk. So I'll just have to deal with it I guess

@lucasbru lucasbru changed the title KSE-1228 - Remove exactly_once / exactly_once_beta KAFKA-12689 - Remove exactly_once / exactly_once_beta Sep 14, 2022
@lucasbru lucasbru closed this Sep 14, 2022
@lucasbru lucasbru deleted the KSE-1228 branch September 14, 2022 14:25
@lucasbru lucasbru restored the KSE-1228 branch September 14, 2022 14:26
@lucasbru lucasbru reopened this Sep 14, 2022
@lucasbru lucasbru changed the title KAFKA-12689 - Remove exactly_once / exactly_once_beta KAFKA-12689: Remove exactly_once / exactly_once_beta Sep 14, 2022
@guozhangwang
Copy link

@lucasbru I think jenkins jobs would not auto trigger for these PRs (unfortunately..) so we'd have to manually run the tests to make sure nothing breaks.

Copy link

@lihaosky lihaosky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @lucasbru ! Only have a minor comment

release.py Outdated
Comment on lines 214 to 215
if jdk_java_home.strip(): jdk_env['JAVA_HOME'] = jdk_java_home
else: jdk_java_home = jdk_env['JAVA_HOME']

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic here is different from original logic, is it intended? jdk_java_home in original logic is not set here.

Copy link
Owner Author

@lucasbru lucasbru Sep 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think the original logic was broken when user input was empty. It was supposed to use JAVA_HOME system property to find java, but it wouldn't set jdk_java_home, so the following version check

java_version = cmd_output("%s/bin/java -version" % jdk_java_home, env=jdk_env)

would access /bin/java which does not exist on any system I know.

Not sure why nobody else had this problem before, maybe nobody used the JAVA_HOME branch here.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we carve out this change as a separate PR (you can file a PR directly against apache kafka trunk) since it's orthogonal to the ticket itself?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are right. I removed it.

@lihaosky
Copy link

For Jenkins jobs, can you create these PRs in some branch of base Kafka to run?

Removes all code for handling the processing_guarantee settings
exactly_once and exactly_once_beta, which have been deprecated
since 3.0.0 and can be removed with the release of 4.0.0.

Some tests were adapted to use exactly_once_v2 instead of
exactly_once_beta or exactly_once, to retain the same coverage
of the code bas.

exactly_once / exactly_once_beta are rejected with an error
message to upgrade to exactly_once_v2.

EosV2UpgradeIntegrationTest is removed, since it cannot be
implemented anymore with only one implementation available.
@lucasbru lucasbru changed the base branch from KAFKA-12689 to trunk September 15, 2022 15:12
@lucasbru lucasbru changed the base branch from trunk to KAFKA-12689 September 15, 2022 15:12
@lucasbru
Copy link
Owner Author

For Jenkins jobs, can you create these PRs in some branch of base Kafka to run?

I created a draft PR in apache/kafka, here: apache#12646

It did ran, but it failed with various unrelated errors (like all recently merged PRs)...

@lucasbru
Copy link
Owner Author

So to me, the failures in the unit tests look unrelated / flaky. For JDK 11 / Scala 2.13, all tests passed. I also ran the system tests for streams, which passed.

https://jenkins.confluent.io/job/system-test-kafka-branch-builder/5130/

Anything else or can we merge this part? @guozhangwang @cadonna

@guozhangwang
Copy link

Thanks @lucasbru , I do not have further comments, and I think we can merge this PR now and continue on follow-up PRs.

@lucasbru lucasbru merged commit a0310fa into KAFKA-12689 Sep 19, 2022
@lucasbru lucasbru deleted the KSE-1228 branch September 23, 2022 07:55
lucasbru pushed a commit that referenced this pull request Sep 8, 2023
This change introduces some basic clean up and refactoring for forthcoming commits related to the revised fetch code for the consumer threading refactor project.

Reviewers: Christo Lolov <lolovc@amazon.com>, Jun Rao <junrao@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants