Example of streaming data from Twitter to ElasticSearch via Kafka.
Based on Stephane Maarek's Learn Apache Kafka for Beginners course.
gradleinstead ofmvn- Reading Twitter credentials from
gradle.properties slf4j-simpleloggerand a configuration file for it inresources/- Instructions for running ElasticSearch via Docker Compose
To start both Kafka and Zookeeper, modify KAFKA_DIR in start-zookeeper-kafka.sh and run:
$ ./start-zookeeper-kafka.sh
# To stop:
$ ./stop-zookeeper-kafka.shLogs are written to kafka.log and zookeeper.log.
Starting Twitter producer (twitter-producer)
- Add Twitter credentials in
twitter-producer/gradle.propertiesby usingtwitter-producer/gradle.properties.exampleas template. - Start the Twitter producer:
./gradlew twitter-producer:run
Use docker-compose.yml:
$ docker-compose up -d # Start ES and Kibana
$ docker-compose down # Stop clusterNow you can explore http://localhost:5601.
Use the bash scripts included in the repository:
$ ./start-es.sh
$ ./stop-es.shMore explicitly:
$ docker run --rm --name elasticsearch -p 127.0.0.1:9200:9200 -p 127.0.0.1:9300:9300 -e "discovery.type=single-node" docker.elastic.co/elasticsearch/elasticsearch:6.8.6If you want to add a custom configuration, add a bind mount:
$ docker run ... -v `pwd`/custom_elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml:roSetup Kibana:
$ docker run --rm --name kibana --link elasticsearch -p 127.0.0.1:5601:5601 -v `pwd`/kibana.yml:/usr/share/kibana/config/kibana.yml docker.elastic.co/kibana/kibana:6.8.6List indices:
$ curl "localhost:9200/_cat/indices?v"Create an index twitter:
$ curl -X PUT "localhost:9200/twitter"Add a tweet:
$ curl -X PUT "localhost:9200/twitter/tweets/1" -H 'Content-Type: application/json' -d '{ "course": "Kafka" }'Starting ElasticSearch consumer (elasticsearch-consumer)
$ ./gradlew elasticsearch-consumer:runConsume tweets from twitter_tweets topic to stdout with kafkacat:
$ kafkacat -b localhost:9092 -t twitter_tweetsCheck consumer group offsets:
$ kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group es-consumer-1 --describeUsing the Kafka Stream (stream-tweets)
Create topic twitter_tweets_important:
$ kafka-topics.sh --bootstrap-server localhost:9092 --create --topic twitter_tweets_important --partitions 3 --replication-factor 1Start the stream:
$ ./gradlew stream-tweets:runConsume filtered tweets to console:
$ kafkacat -b localhost:9092 -t twitter_tweets_important -C