-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
kafka streams #111
Comments
StreamsDSL
KStream
KTable
GlobelKTable
|
streams application examplepublic class SimpleStreamApplication {
private static String APPLICATION_NAME = "streams-application";
private static String BOOTSTRAP_SERVER = "my-kafka:9092";
private static String STREAM_LOG = "stream_log";
private static String STREAM_LOG_COPY = "stream_log_copy";
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> streamLog = builder.stream(STREAM_LOG);
streamLog.to(STREAM_LOG_COPY);
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
}
}
|
streamsDSL filter StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> streamLog = builder.stream(STREAM_LOG);
KStream<String, String> filteredStream = streamLog.filter((key, value) -> value.length() > 5);
filteredStream.to(STREAM_LOG_COPY);
|
streamsDSL KTable, KStream join StreamsBuilder builder = new StreamsBuilder();
KTable<String, String> addressTable = builder.table(ADDRESS_TABLE);
KStream<String, String> orderStream = builder.stream(ORDER_STREAM);
orderStream.join(addressTable,
(order, address) -> order + " send to " + address)
.to(ORDER_JOIN_STREAM);
|
streamsDSL - GlobalKTable과 KStream join코파티셔닝이 되어 있지 않은 토픽을 조인해야할 때 가능한 두 가지 방법
GlobalKTable join example StreamsBuilder builder = new StreamsBuilder();
GlobalKTable<String, String> addressGlobalTable = builder.globalTable(ADDRESS_GLOBAL_TABLE);
KStream<String, String> orderStream = builder.stream(ORDER_STREAM);
orderStream.join(addressGlobalTable,
(orderKey, orderValue) -> orderKey,
(order, address) -> order + " send to " + address)
.to(ORDER_JOIN_STREAM);
|
ProcessorAPI
Topology topology = new Topology();
topology.addSource("Source", STREAM_LOG)
.addProcessor("Process", () -> new FilterProcessor(),
"Source")
.addSink("Sink",
STREAM_LOG_FILTER,
"Process"); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
kafka streams
카프카 스트림즈 애플리케이션
topology
스트림즈 DSL 데이터 처리 예시
프로세서 API로 구현하는 데이터 처리 예시
The text was updated successfully, but these errors were encountered: