Skip to content

Commit

Permalink
Merge pull request #1 from lucasbru/KSE-1228
Browse files Browse the repository at this point in the history
KAFKA-12689: Remove exactly_once / exactly_once_beta
  • Loading branch information
lucasbru committed Sep 19, 2022
2 parents b09cadc + 71bf74a commit a0310fa
Show file tree
Hide file tree
Showing 39 changed files with 304 additions and 2,740 deletions.
8 changes: 4 additions & 4 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -218,19 +218,19 @@
files="(RecordCollectorTest|StreamsPartitionAssignorTest|StreamThreadTest|StreamTaskTest|TaskManagerTest|TopologyTestDriverTest).java"/>

<suppress checks="MethodLength"
files="(EosIntegrationTest|EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|RocksDBWindowStoreTest).java"/>
files="(EosIntegrationTest|KStreamKStreamJoinTest|RocksDBWindowStoreTest).java"/>

<suppress checks="ClassDataAbstractionCoupling"
files=".*[/\\]streams[/\\].*test[/\\].*.java"/>

<suppress checks="CyclomaticComplexity"
files="(EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest|RelationalSmokeTest|MockProcessorContextStateStoreTest).java"/>
files="(KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest|RelationalSmokeTest|MockProcessorContextStateStoreTest).java"/>

<suppress checks="JavaNCSS"
files="(EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|StreamThreadTest|TaskManagerTest).java"/>
files="(KStreamKStreamJoinTest|StreamThreadTest|TaskManagerTest).java"/>

<suppress checks="NPathComplexity"
files="(EosV2UpgradeIntegrationTest|EosTestDriver|KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|RelationalSmokeTest|MockProcessorContextStateStoreTest).java"/>
files="(EosTestDriver|KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|RelationalSmokeTest|MockProcessorContextStateStoreTest).java"/>

<suppress checks="(FinalLocalVariable|WhitespaceAround|LocalVariableName|ImportControl|AvoidStarImport)"
files="Murmur3Test.java"/>
Expand Down
4 changes: 2 additions & 2 deletions docs/streams/core-concepts.html
Original file line number Diff line number Diff line change
Expand Up @@ -295,8 +295,8 @@ <h2 class="anchor-heading"><a id="streams_processing_guarantee" class="anchor-li
which requires broker version 2.5.0 or newer.
This implementation is more efficient, because it reduces client and broker resource utilization, like client threads and used network connections,
and it enables higher throughput and improved scalability.
As of the 3.0.0 release, the first version of exactly-once has been deprecated. Users are encouraged to use exactly-once v2 for
exactly-once processing from now on, and prepare by upgrading their brokers if necessary.
As of the 3.0.0 release, the first version of exactly-once has been deprecated, and with the 4.0.0 release,
it has been removed from Kafka Streams.
For more information on how this is done inside the brokers and Kafka Streams, see
<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics">KIP-447</a>.<br />

Expand Down
4 changes: 2 additions & 2 deletions docs/streams/developer-guide/config-streams.html
Original file line number Diff line number Diff line change
Expand Up @@ -375,8 +375,8 @@ <h4><a class="toc-backref" href="#id23">num.standby.replicas</a><a class="header
<tr class="row-even"><td>processing.guarantee</td>
<td>Medium</td>
<td colspan="2">The processing mode. Can be either <code class="docutils literal"><span class="pre">"at_least_once"</span></code> (default)
or <code class="docutils literal"><span class="pre">"exactly_once_v2"</span></code> (for EOS version 2, requires broker version 2.5+). Deprecated config options are
<code class="docutils literal"><span class="pre">"exactly_once"</span></code> (for EOS version 1) and <code class="docutils literal"><span class="pre">"exactly_once_beta"</span></code> (for EOS version 2, requires broker version 2.5+)</td>.
or <code class="docutils literal"><span class="pre">"exactly_once_v2"</span></code> (for EOS version 2, requires broker version 2.5+). Config options
<code class="docutils literal"><span class="pre">"exactly_once"</span></code> and <code class="docutils literal"><span class="pre">"exactly_once_beta"</span></code> are not supported anymore since 4.0.</td>.
<td>See <a class="reference internal" href="#streams-developer-guide-processing-guarantee"><span class="std std-ref">Processing Guarantee</span></a></td>
</tr>
<tr class="row-odd"><td>poll.ms</td>
Expand Down
26 changes: 19 additions & 7 deletions docs/streams/upgrade-guide.html
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,13 @@ <h3 class="anchor-heading"><a id="streams_notable_changes" class="anchor-link"><
</p>

<p>
Starting in Kafka Streams 2.6.x, a new processing mode is available, named EOS version 2. This can be configured
by setting <code>"processing.guarantee"</code> to <code>"exactly_once_v2"</code> for
application versions 3.0+, or setting it to <code>"exactly_once_beta"</code> for versions between 2.6 and 2.8.
To use this new feature, your brokers must be on version 2.5.x or newer.
If you want to upgrade your EOS application from an older version and enable this feature in version 3.0+,
you first need to upgrade your application to version 3.0.x, staying on <code>"exactly_once"</code>,
and then do second round of rolling bounces to switch to <code>"exactly_once_v2"</code>. If you
Starting in Kafka Streams 4.0.0, the only processing modes available are <code>at_least_once</code> and
<code>"exactly_once_v2"</code>. To use the latter, your brokers must be on version 2.5.x or newer.
Previous versions of Kafka Streams allowed <code>"exactly_once"</code> and <code>"exactly_once_beta"</code>
settings, which were deprecated in 3.0.0 and removed in 4.0.0. If you want to upgrade your application that uses
<code>"exactly_once"</code> in a version &lt;3.0 and enable this feature in version 3.0+, you first need to
upgrade your application to version 3.0.x, staying on <code>"exactly_once"</code>,
and then do second round of rolling bounces to switch to <code>exactly_once_v2</code>. If you
are upgrading an EOS application from an older (pre-2.6) version to a version between 2.6 and 2.8, follow these
same steps but with the config <code>"exactly_once_beta"</code> instead. No special steps are required
to upgrade an application using <code>"exactly_once_beta"</code> from version 2.6+ to 3.0 or higher: you can
Expand Down Expand Up @@ -105,6 +105,18 @@ <h3 class="anchor-heading"><a id="streams_notable_changes" class="anchor-link"><
More details about the new config <code>StreamsConfig#TOPOLOGY_OPTIMIZATION</code> can be found in <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-295%3A+Add+Streams+Configuration+Allowing+for+Optional+Topology+Optimization">KIP-295</a>.
</p>

<h3><a id="streams_api_changes_400" href="#streams_api_changes_400">Streams API changes in 4.0.0</a></h3>
<p>
We removed the StreamsConfig <code>processing.guarantee</code> configuration values <code>"exactly_once"</code> and <code>"exactly_once_beta"</code>,
which were deprecated in 3.0. Now, there is only one implementation of exactly-once semantics available, which is enabled by setting
<code>processing.guarantee</code> to <code>"exactly_once_v2"</code> (which is equivalent to the mode formally enabled by <code>exactly_once_beta</code>).
Note that eos-v2 requires broker version 2.5 or higher, so users need to upgrade their kafka cluster if necessary.
Users that still use <code>"exactly_once"</code> in a version 3.0+ or <code>"exactly_once_beta"</code> can use a rolling bounce to upgrade to the
<code>"exactly_once_v2"</code> setting.
For users using still using <code>"exactly_once"</code> in a version &lt;3.0, please see <a href="#streams_notable_changes">Notable compatibility changes in past releases</a>
for detailed upgrade instructions.
</p>

<h3><a id="streams_api_changes_300" href="#streams_api_changes_300">Streams API changes in 3.0.0</a></h3>
<p>
We improved the semantics of
Expand Down
66 changes: 18 additions & 48 deletions streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,7 @@
* </ul>
*
* If {@link #PROCESSING_GUARANTEE_CONFIG "processing.guarantee"} is set to {@link #EXACTLY_ONCE_V2 "exactly_once_v2"},
* {@link #EXACTLY_ONCE "exactly_once"} (deprecated), or {@link #EXACTLY_ONCE_BETA "exactly_once_beta"} (deprecated), Kafka Streams does not
* allow users to overwrite the following properties (Streams setting shown in parentheses):
* Kafka Streams does not allow users to overwrite the following properties (Streams setting shown in parentheses):
* <ul>
* <li>{@link ConsumerConfig#ISOLATION_LEVEL_CONFIG "isolation.level"} (read_committed) - Consumers will always read committed data only</li>
* <li>{@link ProducerConfig#ENABLE_IDEMPOTENCE_CONFIG "enable.idempotence"} (true) - Producer will always have idempotency enabled</li>
Expand Down Expand Up @@ -357,32 +356,6 @@ public class StreamsConfig extends AbstractConfig {
@SuppressWarnings("WeakerAccess")
public static final String AT_LEAST_ONCE = "at_least_once";

/**
* Config value for parameter {@link #PROCESSING_GUARANTEE_CONFIG "processing.guarantee"} for exactly-once processing guarantees.
* <p>
* Enabling exactly-once processing semantics requires broker version 0.11.0 or higher.
* If you enable this feature Kafka Streams will use more resources (like broker connections)
* compared to {@link #AT_LEAST_ONCE "at_least_once"} and {@link #EXACTLY_ONCE_V2 "exactly_once_v2"}.
*
* @deprecated Since 3.0.0, will be removed in 4.0. Use {@link #EXACTLY_ONCE_V2 "exactly_once_v2"} instead.
*/
@SuppressWarnings("WeakerAccess")
@Deprecated
public static final String EXACTLY_ONCE = "exactly_once";

/**
* Config value for parameter {@link #PROCESSING_GUARANTEE_CONFIG "processing.guarantee"} for exactly-once processing guarantees.
* <p>
* Enabling exactly-once (beta) requires broker version 2.5 or higher.
* If you enable this feature Kafka Streams will use fewer resources (like broker connections)
* compared to the {@link #EXACTLY_ONCE} (deprecated) case.
*
* @deprecated Since 3.0.0, will be removed in 4.0. Use {@link #EXACTLY_ONCE_V2 "exactly_once_v2"} instead.
*/
@SuppressWarnings("WeakerAccess")
@Deprecated
public static final String EXACTLY_ONCE_BETA = "exactly_once_beta";

/**
* Config value for parameter {@link #PROCESSING_GUARANTEE_CONFIG "processing.guarantee"} for exactly-once processing guarantees.
* <p>
Expand Down Expand Up @@ -443,7 +416,7 @@ public class StreamsConfig extends AbstractConfig {
private static final String COMMIT_INTERVAL_MS_DOC = "The frequency in milliseconds with which to commit processing progress." +
" For at-least-once processing, committing means to save the position (ie, offsets) of the processor." +
" For exactly-once processing, it means to commit the transaction which includes to save the position and to make the committed data in the output topic visible to consumers with isolation level read_committed." +
" (Note, if <code>processing.guarantee</code> is set to <code>" + EXACTLY_ONCE_V2 + "</code>, <code>" + EXACTLY_ONCE + "</code>,the default value is <code>" + EOS_DEFAULT_COMMIT_INTERVAL_MS + "</code>," +
" (Note, if <code>processing.guarantee</code> is set to <code>" + EXACTLY_ONCE_V2 + "</code>, the default value is <code>" + EOS_DEFAULT_COMMIT_INTERVAL_MS + "</code>," +
" otherwise the default value is <code>" + DEFAULT_COMMIT_INTERVAL_MS + "</code>.";

/** {@code repartition.purge.interval.ms} */
Expand Down Expand Up @@ -584,8 +557,6 @@ public class StreamsConfig extends AbstractConfig {
private static final String PROCESSING_GUARANTEE_DOC = "The processing guarantee that should be used. " +
"Possible values are <code>" + AT_LEAST_ONCE + "</code> (default) " +
"and <code>" + EXACTLY_ONCE_V2 + "</code> (requires brokers version 2.5 or higher). " +
"Deprecated options are <code>" + EXACTLY_ONCE + "</code> (requires brokers version 0.11.0 or higher) " +
"and <code>" + EXACTLY_ONCE_BETA + "</code> (requires brokers version 2.5 or higher). " +
"Note that exactly-once processing requires a cluster of at least three brokers by default what is the " +
"recommended setting for production; for development you can change this, by adjusting broker setting " +
"<code>transaction.state.log.replication.factor</code> and <code>transaction.state.log.min.isr</code>.";
Expand Down Expand Up @@ -816,7 +787,7 @@ public class StreamsConfig extends AbstractConfig {
.define(PROCESSING_GUARANTEE_CONFIG,
Type.STRING,
AT_LEAST_ONCE,
in(AT_LEAST_ONCE, EXACTLY_ONCE, EXACTLY_ONCE_BETA, EXACTLY_ONCE_V2),
(name, value) -> validateProcessingConfiguration((String) value),
Importance.MEDIUM,
PROCESSING_GUARANTEE_DOC)
.define(RACK_AWARE_ASSIGNMENT_TAGS_CONFIG,
Expand Down Expand Up @@ -1247,17 +1218,6 @@ protected StreamsConfig(final Map<?, ?> props,
super(CONFIG, props, doLog);
eosEnabled = StreamsConfigUtils.eosEnabled(this);

final String processingModeConfig = getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG);
if (processingModeConfig.equals(EXACTLY_ONCE)) {
log.warn("Configuration parameter `{}` is deprecated and will be removed in the 4.0.0 release. " +
"Please use `{}` instead. Note that this requires broker version 2.5+ so you should prepare "
+ "to upgrade your brokers if necessary.", EXACTLY_ONCE, EXACTLY_ONCE_V2);
}
if (processingModeConfig.equals(EXACTLY_ONCE_BETA)) {
log.warn("Configuration parameter `{}` is deprecated and will be removed in the 4.0.0 release. " +
"Please use `{}` instead.", EXACTLY_ONCE_BETA, EXACTLY_ONCE_V2);
}

if (props.containsKey(RETRIES_CONFIG)) {
log.warn("Configuration parameter `{}` is deprecated and will be removed in the 4.0.0 release.", RETRIES_CONFIG);
}
Expand Down Expand Up @@ -1331,6 +1291,21 @@ private void validateRackAwarenessConfiguration() {
});
}

private static void validateProcessingConfiguration(final String processingModeConfig) {
if (processingModeConfig.equals("exactly_once")) {
throw new ConfigException(String.format("Configuration parameter `exactly_once` was removed in the 4.0.0 release. " +
"Please use `%s` instead. Refer to the Kafka Streams upgrade guide on how to upgrade your application " +
"to use the new parameter. Note that this requires broker version 2.5+ so you should prepare "
+ "to upgrade your brokers if necessary.", EXACTLY_ONCE_V2));
}
if (processingModeConfig.equals("exactly_once_beta")) {
throw new ConfigException(String.format("Configuration parameter `exactly_once_beta` was removed in the 4.0.0 release. " +
"Please use `%s` instead, which is the new name for the same processing semantics.",
EXACTLY_ONCE_V2));
}
in(AT_LEAST_ONCE, EXACTLY_ONCE_V2).ensureValid(PROCESSING_GUARANTEE_CONFIG, processingModeConfig);
}

private Map<String, Object> getCommonConsumerConfigs() {
final Map<String, Object> clientProvidedProps = getClientPropsWithPrefix(CONSUMER_PREFIX, ConsumerConfig.configNames());

Expand Down Expand Up @@ -1580,11 +1555,6 @@ public Map<String, Object> getProducerConfigs(final String clientId) {
props.putAll(getClientCustomProps());
props.putAll(clientProvidedProps);

// When using EOS alpha, stream should auto-downgrade the transactional commit protocol to be compatible with older brokers.
if (StreamsConfigUtils.processingMode(this) == StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_ALPHA) {
props.put("internal.auto.downgrade.txn.commit", true);
}

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, originals().get(BOOTSTRAP_SERVERS_CONFIG));
// add client id with stream client id prefix
props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ public class StreamsConfigUtils {
public enum ProcessingMode {
AT_LEAST_ONCE("AT_LEAST_ONCE"),

EXACTLY_ONCE_ALPHA("EXACTLY_ONCE_ALPHA"),

EXACTLY_ONCE_V2("EXACTLY_ONCE_V2");

public final String name;
Expand All @@ -34,25 +32,17 @@ public enum ProcessingMode {
}
}

@SuppressWarnings("deprecation")
public static ProcessingMode processingMode(final StreamsConfig config) {
if (StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG))) {
return ProcessingMode.EXACTLY_ONCE_ALPHA;
} else if (StreamsConfig.EXACTLY_ONCE_BETA.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG))) {
return ProcessingMode.EXACTLY_ONCE_V2;
} else if (StreamsConfig.EXACTLY_ONCE_V2.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG))) {
if (StreamsConfig.EXACTLY_ONCE_V2.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG))) {
return ProcessingMode.EXACTLY_ONCE_V2;
} else {
return ProcessingMode.AT_LEAST_ONCE;
}
}

@SuppressWarnings("deprecation")
public static String processingModeString(final ProcessingMode processingMode) {
if (processingMode == ProcessingMode.EXACTLY_ONCE_V2) {
return StreamsConfig.EXACTLY_ONCE_V2;
} else if (processingMode == ProcessingMode.EXACTLY_ONCE_ALPHA) {
return StreamsConfig.EXACTLY_ONCE;
} else {
return StreamsConfig.AT_LEAST_ONCE;
}
Expand All @@ -63,7 +53,6 @@ public static boolean eosEnabled(final StreamsConfig config) {
}

public static boolean eosEnabled(final ProcessingMode processingMode) {
return processingMode == ProcessingMode.EXACTLY_ONCE_ALPHA ||
processingMode == ProcessingMode.EXACTLY_ONCE_V2;
return processingMode == ProcessingMode.EXACTLY_ONCE_V2;
}
}

0 comments on commit a0310fa

Please sign in to comment.