Skip to content
This repository has been archived by the owner on Apr 11, 2021. It is now read-only.
/ poc-kafka Public archive

POC testing Kafka, Kafka Streams & Kafka Connect

License

Notifications You must be signed in to change notification settings

l-lin/poc-kafka

Repository files navigation

Example project that uses kafka streams

Java Go

heart-monitor

Getting started

Build

# this will build the maven projects and build the docker images.
mvn package

Usage

Production alike

# launch all services
docker-compose up -d --scale heart-beat-producer=3 --scale heart-rate-computor=3
# wait until all services are started then setup environment
./scripts/setup.sh
# go to the heart-rate-consumer admin interface
firefox http://localhost/ &

Local environment

If you want to run apps directly from your IDE, you can't use the kafka cluster from docker-compose.yml because the domain name configured in Kafka is only available for service in the docker network, which is not possible to hook from IDE launched app. Thus, a docker-compose-local.yml file is here to launch a single kafka instance.

# launch all services
docker-compose -f docker-compose-dep.yml up -d
# then use same commands as above

Useful commands

Avro Schema Registry

More information on the schema-registry project documentation.

The following examples use HTTPie as the HTTP client to perform the HTTP requests:

# list all subjects
http :8081/subjects

# list all schema versions registered under the subject "heart-beats-value"
http :8081/subjects/heart-beats-value/versions

# fetch version 1 of the schema registered under the subject "heart-beats-value"
http :8081/subjects/heart-beats-value/versions/1

# fetch the most recently registered schema registered under the subject "heart-beats-value"
http :8081/subjects/heart-beats-value/versions/latest

# create heart beat avro schema in the schema registry
echo "{\"schema\":\"$(jq -c . < heart-models/src/main/resources/avro/HeartRate.avsc | sed 's/"/\\"/g')\"}" | http :8081/subjects/heart-rates-value/versions "Content-Type: application/vnd.schemaregistry.v1+json"

# create heart rate avro schema in the schema registry
echo "{\"schema\":\"$(jq -c . < heart-models/src/main/resources/avro/HeartRate.avsc | sed 's/"/\\"/g')\"}" | http :8081/subjects/heart-rates-value/versions "Content-Type: application/vnd.schemaregistry.v1+json"

# consume Avro messages to check what was sent to Kafka:
docker exec -it schema-registry \
    /usr/bin/kafka-avro-console-consumer \
    --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 \
    --topic heart-beats \
    --from-beginning

If breaking change, then the schema registry will throw a HTTP 409 and the application will get an error like:

io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema; error
 code: 409

More information by Confluent on schema evolution and compatibility.

Kafka connect

# list kafka connectors
http :8082/connectors

# add a new connector
curl -X "POST" "http://localhost:8082/connectors/" \
     -H "Content-Type: application/json" \
     -d '{
          "name": "heart-rate-connector-sink",
          "config": {
            "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
            "connection.url": "jdbc:postgresql://db:5432/heart_monitor?applicationName=heart-rate-connector",
            "connection.user": "postgres",
            "connection.password": "postgres",
            "auto.create":"true",
            "auto.evolve":"true",
            "pk.mode": "kafka",
            "topics": "heart-rates",
            "key.converter": "org.apache.kafka.connect.storage.StringConverter",
            "transforms": "ExtractTimestamp,RenameField",
            "transforms.ExtractTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
            "transforms.ExtractTimestamp.timestamp.field" : "extract_ts",
            "transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
            "transforms.RenameField.renames" : "userId:user_id,isReset:is_reset"
          }
     }'

Resources

Kafka

Avro

Spring

Reactor

KSQL

Front