Skip to content

Commit

Permalink
KAFKA-14734: Use CommandDefaultOptions in StreamsResetter (apache#13983)
Browse files Browse the repository at this point in the history
This PR adds CommandDefaultOptions usage like in the other joptsimple based tools. It also moves the associated unit test class from streams to tools module as discussed in apache#13127 (comment)

Reviewers:  Luke Chen <showuon@gmail.com>, Bruno Cadonna <cadonna@apache.org>, Sagar Rao <sagarmeansocean@gmail.com>
  • Loading branch information
fvaleri authored and jeqo committed Jul 21, 2023
1 parent 6d5c177 commit 7dff594
Show file tree
Hide file tree
Showing 5 changed files with 293 additions and 254 deletions.
4 changes: 0 additions & 4 deletions checkstyle/import-control.xml
Expand Up @@ -378,10 +378,6 @@
<allow pkg="kafka.admin" />
</subpackage>

<subpackage name="tools">
<allow pkg="org.apache.kafka.tools" />
</subpackage>

<subpackage name="state">
<allow pkg="org.rocksdb" />
</subpackage>
Expand Down
Expand Up @@ -427,7 +427,7 @@ protected boolean tryCleanGlobal(final boolean withIntermediateTopics,
cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(CLEANUP_CONSUMER_TIMEOUT));

return new StreamsResetter().run(parameters, cleanUpConfig) == 0;
return new StreamsResetter().execute(parameters, cleanUpConfig) == 0;
}

protected void cleanGlobal(final boolean withIntermediateTopics,
Expand Down
Expand Up @@ -117,7 +117,7 @@ public void shouldNotAllowToResetWhileStreamsIsRunning() {
streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
streams.start();

final int exitCode = new StreamsResetter().run(parameters, cleanUpConfig);
final int exitCode = new StreamsResetter().execute(parameters, cleanUpConfig);
Assert.assertEquals(1, exitCode);

streams.close();
Expand All @@ -135,7 +135,7 @@ public void shouldNotAllowToResetWhenInputTopicAbsent() {
cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(CLEANUP_CONSUMER_TIMEOUT));

final int exitCode = new StreamsResetter().run(parameters, cleanUpConfig);
final int exitCode = new StreamsResetter().execute(parameters, cleanUpConfig);
Assert.assertEquals(1, exitCode);
}

Expand All @@ -151,7 +151,7 @@ public void shouldNotAllowToResetWhenIntermediateTopicAbsent() {
cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(CLEANUP_CONSUMER_TIMEOUT));

final int exitCode = new StreamsResetter().run(parameters, cleanUpConfig);
final int exitCode = new StreamsResetter().execute(parameters, cleanUpConfig);
Assert.assertEquals(1, exitCode);
}

Expand All @@ -167,7 +167,7 @@ public void shouldNotAllowToResetWhenSpecifiedInternalTopicDoesNotExist() {
cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(CLEANUP_CONSUMER_TIMEOUT));

final int exitCode = new StreamsResetter().run(parameters, cleanUpConfig);
final int exitCode = new StreamsResetter().execute(parameters, cleanUpConfig);
Assert.assertEquals(1, exitCode);
}

Expand All @@ -183,7 +183,7 @@ public void shouldNotAllowToResetWhenSpecifiedInternalTopicIsNotInternal() {
cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(CLEANUP_CONSUMER_TIMEOUT));

final int exitCode = new StreamsResetter().run(parameters, cleanUpConfig);
final int exitCode = new StreamsResetter().execute(parameters, cleanUpConfig);
Assert.assertEquals(1, exitCode);
}

Expand Down

0 comments on commit 7dff594

Please sign in to comment.