Skip to content

Commit

Permalink
KAFKA-7658: Follow up to original PR (apache#8027)
Browse files Browse the repository at this point in the history
Follow up to original PR apache#7985 for KIP-523 (adding `KStream#toTable()` operator)
  - improve JavaDocs
  - add more unit tests
  - fix bug for auto-repartitioning
  - some code cleanup

Reviewers: High Lee <yello1109@daum.net>, John Roesler <john@confluent.io>
  • Loading branch information
mjsax authored and stanislavkozlovski committed Feb 18, 2020
1 parent 2be04dd commit cb1aa5a
Show file tree
Hide file tree
Showing 16 changed files with 288 additions and 244 deletions.
Expand Up @@ -289,8 +289,9 @@ public synchronized <K, V> KTable<K, V> table(final String topic,

final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal =
new MaterializedInternal<>(
Materialized.with(consumedInternal.keySerde(), consumedInternal.valueSerde()),
internalStreamsBuilder, topic + "-");
Materialized.with(consumedInternal.keySerde(), consumedInternal.valueSerde()),
internalStreamsBuilder,
topic + "-");

return internalStreamsBuilder.table(topic, consumedInternal, materializedInternal);
}
Expand Down
Expand Up @@ -879,11 +879,23 @@ void to(final TopicNameExtractor<K, V> topicExtractor,
/**
* Convert this stream to a {@link KTable}.
* <p>
* an internal repartitioning topic may need to be created in Kafka if a key changed
* If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)},
* {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)}, or
* {@link #transform(TransformerSupplier, String...)}), and no data redistribution happened afterwards (e.g., via
* {@link #through(String)}) an internal repartitioning topic will be created in Kafka.
* This topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is user-specified in
* {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
* "&lt;name&gt;" is an internally generated name, and "-repartition" is a fixed suffix.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
* <p>
* For this case, all data of this stream will be redistributed through the repartitioning topic by writing all
* records to it, and rereading all records from it, such that the resulting {@link KTable} is partitioned
* correctly on its key.
* Note that you cannot enable {@link StreamsConfig#TOPOLOGY_OPTIMIZATION} config for this case, because
* repartition topics are considered transient and don't allow to recover the result {@link KTable} in cause of
* a failure; hence, a dedicated changelog topic is required to guarantee fault-tolerance.
* <p>
* Note that this is a logical operation and only changes the "interpretation" of the stream, i.e., each record of
* it was a "fact/event" and is re-interpreted as update now (cf. {@link KStream} vs {@code KTable}).
*
Expand All @@ -894,11 +906,23 @@ void to(final TopicNameExtractor<K, V> topicExtractor,
/**
* Convert this stream to a {@link KTable}.
* <p>
* an internal repartitioning topic may need to be created in Kafka if a key changed
* If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)},
* {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)}, or
* {@link #transform(TransformerSupplier, String...)}), and no data redistribution happened afterwards (e.g., via
* {@link #through(String)}) an internal repartitioning topic will be created in Kafka.
* This topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is user-specified in
* {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
* "&lt;name&gt;" is an internally generated name, and "-repartition" is a fixed suffix.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
* <p>
* For this case, all data of this stream will be redistributed through the repartitioning topic by writing all
* records to it, and rereading all records from it, such that the resulting {@link KTable} is partitioned
* correctly on its key.
* Note that you cannot enable {@link StreamsConfig#TOPOLOGY_OPTIMIZATION} config for this case, because
* repartition topics are considered transient and don't allow to recover the result {@link KTable} in cause of
* a failure; hence, a dedicated changelog topic is required to guarantee fault-tolerance.
* <p>
* Note that this is a logical operation and only changes the "interpretation" of the stream, i.e., each record of
* it was a "fact/event" and is re-interpreted as update now (cf. {@link KStream} vs {@code KTable}).
*
Expand All @@ -910,11 +934,23 @@ void to(final TopicNameExtractor<K, V> topicExtractor,
/**
* Convert this stream to a {@link KTable}.
* <p>
* an internal repartitioning topic may need to be created in Kafka if a key changed
* If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)},
* {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)}, or
* {@link #transform(TransformerSupplier, String...)}), and no data redistribution happened afterwards (e.g., via
* {@link #through(String)}) an internal repartitioning topic will be created in Kafka.
* This topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is user-specified in
* {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
* "&lt;name&gt;" is an internally generated name, and "-repartition" is a fixed suffix.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
* <p>
* For this case, all data of this stream will be redistributed through the repartitioning topic by writing all
* records to it, and rereading all records from it, such that the resulting {@link KTable} is partitioned
* correctly on its key.
* Note that you cannot enable {@link StreamsConfig#TOPOLOGY_OPTIMIZATION} config for this case, because
* repartition topics are considered transient and don't allow to recover the result {@link KTable} in cause of
* a failure; hence, a dedicated changelog topic is required to guarantee fault-tolerance.
* <p>
* Note that this is a logical operation and only changes the "interpretation" of the stream, i.e., each record of
* it was a "fact/event" and is re-interpreted as update now (cf. {@link KStream} vs {@code KTable}).
*
Expand All @@ -927,11 +963,23 @@ void to(final TopicNameExtractor<K, V> topicExtractor,
/**
* Convert this stream to a {@link KTable}.
* <p>
* an internal repartitioning topic may need to be created in Kafka if a key changed
* If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)},
* {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)}, or
* {@link #transform(TransformerSupplier, String...)}), and no data redistribution happened afterwards (e.g., via
* {@link #through(String)}) an internal repartitioning topic will be created in Kafka.
* This topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is user-specified in
* {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
* "&lt;name&gt;" is an internally generated name, and "-repartition" is a fixed suffix.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
* <p>
* For this case, all data of this stream will be redistributed through the repartitioning topic by writing all
* records to it, and rereading all records from it, such that the resulting {@link KTable} is partitioned
* correctly on its key.
* Note that you cannot enable {@link StreamsConfig#TOPOLOGY_OPTIMIZATION} config for this case, because
* repartition topics are considered transient and don't allow to recover the result {@link KTable} in cause of
* a failure; hence, a dedicated changelog topic is required to guarantee fault-tolerance.
* <p>
* Note that this is a logical operation and only changes the "interpretation" of the stream, i.e., each record of
* it was a "fact/event" and is re-interpreted as update now (cf. {@link KStream} vs {@code KTable}).
*
Expand Down
Expand Up @@ -46,7 +46,7 @@ public abstract class AbstractStream<K, V> {
protected final String name;
protected final Serde<K> keySerde;
protected final Serde<V> valSerde;
protected final Set<String> sourceNodes;
protected final Set<String> subTopologySourceNodes;
protected final StreamsGraphNode streamsGraphNode;
protected final InternalStreamsBuilder builder;

Expand All @@ -57,25 +57,25 @@ public AbstractStream(final AbstractStream<K, V> stream) {
this.builder = stream.builder;
this.keySerde = stream.keySerde;
this.valSerde = stream.valSerde;
this.sourceNodes = stream.sourceNodes;
this.subTopologySourceNodes = stream.subTopologySourceNodes;
this.streamsGraphNode = stream.streamsGraphNode;
}

AbstractStream(final String name,
final Serde<K> keySerde,
final Serde<V> valSerde,
final Set<String> sourceNodes,
final Set<String> subTopologySourceNodes,
final StreamsGraphNode streamsGraphNode,
final InternalStreamsBuilder builder) {
if (sourceNodes == null || sourceNodes.isEmpty()) {
if (subTopologySourceNodes == null || subTopologySourceNodes.isEmpty()) {
throw new IllegalArgumentException("parameter <sourceNodes> must not be null or empty");
}

this.name = name;
this.builder = builder;
this.keySerde = keySerde;
this.valSerde = valSerde;
this.sourceNodes = sourceNodes;
this.subTopologySourceNodes = subTopologySourceNodes;
this.streamsGraphNode = streamsGraphNode;
}

Expand All @@ -86,9 +86,9 @@ protected InternalTopologyBuilder internalTopologyBuilder() {
}

Set<String> ensureCopartitionWith(final Collection<? extends AbstractStream<K, ?>> otherStreams) {
final Set<String> allSourceNodes = new HashSet<>(sourceNodes);
final Set<String> allSourceNodes = new HashSet<>(subTopologySourceNodes);
for (final AbstractStream<K, ?> other: otherStreams) {
allSourceNodes.addAll(other.sourceNodes);
allSourceNodes.addAll(other.subTopologySourceNodes);
}
builder.internalTopologyBuilder.copartitionSources(allSourceNodes);

Expand Down
Expand Up @@ -16,10 +16,6 @@
*/
package org.apache.kafka.streams.kstream.internals;

import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.CogroupedKStream;
Expand All @@ -36,6 +32,11 @@
import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
import org.apache.kafka.streams.state.KeyValueStore;

import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

public class CogroupedKStreamImpl<K, VOut> extends AbstractStream<K, VOut> implements CogroupedKStream<K, VOut> {

static final String AGGREGATE_NAME = "COGROUPKSTREAM-AGGREGATE-";
Expand All @@ -45,10 +46,10 @@ public class CogroupedKStreamImpl<K, VOut> extends AbstractStream<K, VOut> imple
final private CogroupedStreamAggregateBuilder<K, VOut> aggregateBuilder;

CogroupedKStreamImpl(final String name,
final Set<String> sourceNodes,
final Set<String> subTopologySourceNodes,
final StreamsGraphNode streamsGraphNode,
final InternalStreamsBuilder builder) {
super(name, null, null, sourceNodes, streamsGraphNode, builder);
super(name, null, null, subTopologySourceNodes, streamsGraphNode, builder);
groupPatterns = new LinkedHashMap<>();
aggregateBuilder = new CogroupedStreamAggregateBuilder<>(builder);
}
Expand Down Expand Up @@ -97,21 +98,21 @@ public KTable<K, VOut> aggregate(final Initializer<VOut> initializer) {
public <W extends Window> TimeWindowedCogroupedKStream<K, VOut> windowedBy(final Windows<W> windows) {
Objects.requireNonNull(windows, "windows can't be null");
return new TimeWindowedCogroupedKStreamImpl<>(
windows,
builder,
sourceNodes,
name,
aggregateBuilder,
streamsGraphNode,
groupPatterns);
windows,
builder,
subTopologySourceNodes,
name,
aggregateBuilder,
streamsGraphNode,
groupPatterns);
}

@Override
public SessionWindowedCogroupedKStream<K, VOut> windowedBy(final SessionWindows sessionWindows) {
Objects.requireNonNull(sessionWindows, "sessionWindows can't be null");
return new SessionWindowedCogroupedKStreamImpl<>(sessionWindows,
builder,
sourceNodes,
subTopologySourceNodes,
name,
aggregateBuilder,
streamsGraphNode,
Expand Down
Expand Up @@ -26,21 +26,20 @@
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.StoreBuilder;

import static org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder;
import static org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.optimizableRepartitionNodeBuilder;


import java.util.Collections;
import java.util.Set;

import static org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder;
import static org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.optimizableRepartitionNodeBuilder;

class GroupedStreamAggregateBuilder<K, V> {

private final InternalStreamsBuilder builder;
private final Serde<K> keySerde;
private final Serde<V> valueSerde;
private final boolean repartitionRequired;
private final String userProvidedRepartitionTopicName;
private final Set<String> sourceNodes;
private final Set<String> subTopologySourceNodes;
private final String name;
private final StreamsGraphNode streamsGraphNode;
private StreamsGraphNode repartitionNode;
Expand All @@ -54,15 +53,15 @@ class GroupedStreamAggregateBuilder<K, V> {
GroupedStreamAggregateBuilder(final InternalStreamsBuilder builder,
final GroupedInternal<K, V> groupedInternal,
final boolean repartitionRequired,
final Set<String> sourceNodes,
final Set<String> subTopologySourceNodes,
final String name,
final StreamsGraphNode streamsGraphNode) {

this.builder = builder;
this.keySerde = groupedInternal.keySerde();
this.valueSerde = groupedInternal.valueSerde();
this.repartitionRequired = repartitionRequired;
this.sourceNodes = sourceNodes;
this.subTopologySourceNodes = subTopologySourceNodes;
this.name = name;
this.streamsGraphNode = streamsGraphNode;
this.userProvidedRepartitionTopicName = groupedInternal.name();
Expand Down Expand Up @@ -110,7 +109,7 @@ <KR, VR> KTable<KR, VR> build(final NamedInternal functionName,
return new KTableImpl<>(aggFunctionName,
keySerde,
valSerde,
sourceName.equals(this.name) ? sourceNodes : Collections.singleton(sourceName),
sourceName.equals(this.name) ? subTopologySourceNodes : Collections.singleton(sourceName),
queryableStoreName,
aggregateSupplier,
statefulProcessorNode,
Expand Down
Expand Up @@ -16,8 +16,6 @@
*/
package org.apache.kafka.streams.kstream.internals;

import java.util.Objects;
import java.util.Set;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Aggregator;
Expand All @@ -36,6 +34,9 @@
import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
import org.apache.kafka.streams.state.KeyValueStore;

import java.util.Objects;
import java.util.Set;

class KGroupedStreamImpl<K, V> extends AbstractStream<K, V> implements KGroupedStream<K, V> {

static final String REDUCE_NAME = "KSTREAM-REDUCE-";
Expand All @@ -46,19 +47,19 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K, V> implements KGroupedS
final String userProvidedRepartitionTopicName;

KGroupedStreamImpl(final String name,
final Set<String> sourceNodes,
final Set<String> subTopologySourceNodes,
final GroupedInternal<K, V> groupedInternal,
final boolean repartitionRequired,
final StreamsGraphNode streamsGraphNode,
final InternalStreamsBuilder builder) {
super(name, groupedInternal.keySerde(), groupedInternal.valueSerde(), sourceNodes, streamsGraphNode, builder);
super(name, groupedInternal.keySerde(), groupedInternal.valueSerde(), subTopologySourceNodes, streamsGraphNode, builder);
this.repartitionRequired = repartitionRequired;
this.userProvidedRepartitionTopicName = groupedInternal.name();
this.aggregateBuilder = new GroupedStreamAggregateBuilder<>(
builder,
groupedInternal,
repartitionRequired,
sourceNodes,
subTopologySourceNodes,
name,
streamsGraphNode
);
Expand Down Expand Up @@ -192,7 +193,7 @@ public <W extends Window> TimeWindowedKStream<K, V> windowedBy(final Windows<W>
return new TimeWindowedKStreamImpl<>(
windows,
builder,
sourceNodes,
subTopologySourceNodes,
name,
keySerde,
valSerde,
Expand All @@ -207,7 +208,7 @@ public SessionWindowedKStream<K, V> windowedBy(final SessionWindows windows) {
return new SessionWindowedKStreamImpl<>(
windows,
builder,
sourceNodes,
subTopologySourceNodes,
name,
keySerde,
valSerde,
Expand All @@ -231,7 +232,7 @@ private <T> KTable<K, T> doAggregate(final KStreamAggProcessorSupplier<K, K, V,
@Override
public <Vout> CogroupedKStream<K, Vout> cogroup(final Aggregator<? super K, ? super V, Vout> aggregator) {
Objects.requireNonNull(aggregator, "aggregator can't be null");
return new CogroupedKStreamImpl<K, Vout>(name, sourceNodes, streamsGraphNode, builder)
return new CogroupedKStreamImpl<K, Vout>(name, subTopologySourceNodes, streamsGraphNode, builder)
.cogroup(this, aggregator);
}
}
Expand Up @@ -60,10 +60,10 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr

KGroupedTableImpl(final InternalStreamsBuilder builder,
final String name,
final Set<String> sourceNodes,
final Set<String> subTopologySourceNodes,
final GroupedInternal<K, V> groupedInternal,
final StreamsGraphNode streamsGraphNode) {
super(name, groupedInternal.keySerde(), groupedInternal.valueSerde(), sourceNodes, streamsGraphNode, builder);
super(name, groupedInternal.keySerde(), groupedInternal.valueSerde(), subTopologySourceNodes, streamsGraphNode, builder);

this.userProvidedRepartitionTopicName = groupedInternal.name();
}
Expand Down

0 comments on commit cb1aa5a

Please sign in to comment.