Skip to content

Commit

Permalink
Refactoring and implementing one kafka stream example
Browse files Browse the repository at this point in the history
  • Loading branch information
jefersonm committed Jun 27, 2016
1 parent 4c2041a commit 2e11002
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 41 deletions.
Original file line number Diff line number Diff line change
@@ -1,33 +1,30 @@
package jefersonm.KafkaStreamsPlayground;
package jefersonm.kafka.stream;

import jefersonm.model.Tweet;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.processor.WallclockTimestampExtractor;

import java.util.Properties;

/**
* Created by jefersonm on 6/15/16.
*/
//http://docs.confluent.io/2.1.0-alpha1/streams/developer-guide.html#writing-a-kafka-streams-application
//http://docs.confluent.io/3.0.0/streams/developer-guide.html
public class Main {


public static void main(String[] args){

// When you want to override serdes explicitly/selectively
final Serde<String> stringSerde = Serdes.String();
final Serde<byte[]> byteArraySerde = Serdes.ByteArray();

KStreamBuilder builder = new KStreamBuilder();

// Method 1: Read the input Kafka topic into a KStream instance.
// Method 1: Read the input Kafka topic `TextLinesTopic` into a KStream instance.
KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, "TextLinesTopic");

// Write (i.e. persist) the results to a new Kafka topic called "UppercasedTextLinesTopic".
Expand All @@ -40,7 +37,7 @@ public static void main(String[] args){
// Method 3: using `map`, modify both key and value
KStream<String, String> originalAndUppercased = textLines.map((key, value) -> KeyValue.pair(value, value.toUpperCase()));

// Write the results to a new Kafka topic "OriginalAndUppercased".
// Write the results to a new Kafka topic `OriginalAndUppercased`.
originalAndUppercased.to(stringSerde, stringSerde, "OriginalAndUppercased");

StreamsConfig config = new StreamsConfig(getProperties());
Expand All @@ -57,6 +54,11 @@ private static Properties getProperties() {
props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);

// When configuring the default serdes of StreamConfig
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

return props;
}

Expand Down

This file was deleted.

This file was deleted.

0 comments on commit 2e11002

Please sign in to comment.