Query Kafka with SQL
What is ksqlDB?
ksqlDB is built on top of Kafka Streams, a lightweight, powerful Java library for enriching, transforming, and processing real-time streams of data. Having Kafka Streams at its core means ksqlDB is built on well-designed and easily understood layers of abstractions.
In this tutorial, we are trying to:
1) setup ksqlDB (most of the information, like docker-compose.yaml can also be taken from their website mentioned below)
2) Insert data into a Kafka topic from
a) ksqldb
b) kafka console producer.
3) Query Data from ksqldb-cli (this can also be done with ksql java/python clients)
Nothing too revolutionary being done here, but this gives an idea about how simple it is to use ksqldb to query a pre-existing kafka broker for data. Its pretty difficult in real life, unless you run console consumer (or any other tool like kafkacat) and read all messages and then filter the data.
- Please use the provided docker-compose.yaml file to bring up the complete setup.
- If you already have an existing Zookeeper/Kafka setup, you can remove them from the docker-compose.yaml file and provide the Kafka broker URL inside ksqldb-server's environment configuration (change
"KSQL_BOOTSTRAP_SERVERS: kafka:9092"
to point to your kafka broker)
Use command
docker-compose up -d
to bring up the setupUse command
docker-compose down
to bring down the setup(If you dont have docker-compose already installed, please refer to : https://docs.docker.com/compose/install/)
(Please run
docker ps
command to check all containers are up and running)
docker ps CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES 539a0c5cc251 confluentinc/ksqldb-cli:0.26.0 "/bin/sh" 3 hours ago Up 3 hours ksqldb-cli 2c5318504dc1 confluentinc/ksqldb-server:0.26.0 "/usr/bin/docker/run" 3 hours ago Up 3 hours 0.0.0.0:8088->8088/tcp ksqldb-server fd12a10c6bb6 bitnami/kafka:latest "/opt/bitnami/script…" 3 hours ago Up 3 hours 0.0.0.0:9092->9092/tcp kafka 3f56f8110d9e bitnami/zookeeper:latest "/opt/bitnami/script…" 3 hours ago Up 3 hours 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, 8080/tcp zookeeper
I have used bitnami images for kafka/zookeeper to show that ksqldb works with images outside confluentinc
Pre-requisite: Please create the "order" kafka topic using the following command:
docker exec -it kafka /opt/bitnami/kafka/bin/kafka-topics.sh --create --topic order --bootstrap-server localhost:9092
1. Connect to ksqldb cli (if all containers are up), to open kqsldb-cli run the following:
docker exec -it ksqldb-cli ksql http://ksqldb-server:8088
CREATE STREAM orderstr (orderId VARCHAR, orderType VARCHAR, orderLines INT) WITH (kafka_topic='order', value_format='json', partitions=1);
CREATE TABLE myorder AS SELECT orderId, LATEST_BY_OFFSET(orderType) AS orderType, LATEST_BY_OFFSET(orderLines) AS orderLines FROM orderstr GROUP BY orderId EMIT CHANGES;
Please use the following data to insert multiple records using ksldb-cli command prompt:(enter these on ksql> prompt)
INSERT INTO orderstr (orderId, orderType, orderLines) VALUES ('ORDER1', 'SHIP', 29); INSERT INTO orderstr (orderId, orderType, orderLines) VALUES ('ORDER2', 'PICK', 100); INSERT INTO orderstr (orderId, orderType, orderLines) VALUES ('ORDER3', 'SHIP', 40); INSERT INTO orderstr (orderId, orderType, orderLines) VALUES ('ORDER4', 'SHIP', 120); INSERT INTO orderstr (orderId, orderType, orderLines) VALUES ('ORDER5', 'PICK', 50); INSERT INTO orderstr (orderId, orderType, orderLines) VALUES ('ORDER6', 'SHIP', 90); INSERT INTO orderstr (orderId, orderType, orderLines) VALUES ('ORDER7', 'PICK', 10);
Query the data using a simple select query on ksqldb-cli:
ksql> select * from myOrder where orderType='SHIP'; +---------+-----------+------------+ |ORDERID |ORDERTYPE |ORDERLINES | +---------+-----------+------------+ |ORDER1 |SHIP |29 | |ORDER3 |SHIP |40 | |ORDER4 |SHIP |120 | |ORDER6 |SHIP |90 |
Query the data using kafka console consumer:
docker exec -it kafka /opt/bitnami/kafka/bin/kafka-console-consumer.sh --topic order --bootstrap-server localhost:9092 --from-beginning {"ORDERID":"ORDER1","ORDERTYPE":"SHIP","ORDERLINES":29} {"ORDERID":"ORDER2","ORDERTYPE":"PICK","ORDERLINES":100} {"ORDERID":"ORDER3","ORDERTYPE":"SHIP","ORDERLINES":40} {"ORDERID":"ORDER4","ORDERTYPE":"SHIP","ORDERLINES":120} {"ORDERID":"ORDER5","ORDERTYPE":"PICK","ORDERLINES":50} {"ORDERID":"ORDER6","ORDERTYPE":"SHIP","ORDERLINES":90} {"ORDERID":"ORDER7","ORDERTYPE":"PICK","ORDERLINES":10}
STEP 4: Inserting data through kafka console producer and viewing through select query on Materialized Views
Run the following command, and when you get a >
prompt, please enter the provided data:
docker exec -it kafka /opt/bitnami/kafka/bin/kafka-console-producer.sh --topic order --bootstrap-server localhost:9092 >{"ORDERID":"ORDER8","ORDERTYPE":"SHIP","ORDERLINES":55}
Query the data using a simple select query on ksqldb-cli:
ksql> select * from myOrder where orderType='SHIP'; +---------+-----------+------------+ |ORDERID |ORDERTYPE |ORDERLINES | +---------+-----------+------------+ |ORDER1 |SHIP |29 | |ORDER3 |SHIP |40 | |ORDER4 |SHIP |120 | |ORDER6 |SHIP |90 | |ORDER8 |SHIP |55 |
Query the data using kafka console consumer:
docker exec -it kafka /opt/bitnami/kafka/bin/kafka-console-consumer.sh --topic order --bootstrap-server localhost:9092 --from-beginning {"ORDERID":"ORDER1","ORDERTYPE":"SHIP","ORDERLINES":29} {"ORDERID":"ORDER2","ORDERTYPE":"PICK","ORDERLINES":100} {"ORDERID":"ORDER3","ORDERTYPE":"SHIP","ORDERLINES":40} {"ORDERID":"ORDER4","ORDERTYPE":"SHIP","ORDERLINES":120} {"ORDERID":"ORDER5","ORDERTYPE":"PICK","ORDERLINES":50} {"ORDERID":"ORDER6","ORDERTYPE":"SHIP","ORDERLINES":90} {"ORDERID":"ORDER7","ORDERTYPE":"PICK","ORDERLINES":10} {"ORDERID":"ORDER8","ORDERTYPE":"SHIP","ORDERLINES":55}
So you can clearly see above how we can use ksqldb to both insert and select records into Kafka topic like we are doing it to any relational database.
Kafka Setup:
https://hub.docker.com/r/bitnami/kafka/
KSQLDB Setup:
https://ksqldb.io/quickstart.html
Unofficial KSQL Python Client:
https://github.com/bryanyang0528/ksql-python