In general, this application was written for research and learning purposes. It's providing functionality that covers various aspects of working with the Flink, like:
- Flink stream processing
- Kafka messaging, semantic (exactly once, at most once)
- Flink's state
- Checkpointing
- State restoring
Current Flink version: 1.18.0
In order to run the application you need to deploy the Kafka first. Broker should have listener setup at kafka:9093
.
Docker compose manifest under flink-cluster
directory with more details.
Used Custom Components:
- Kafka Gateway s7i/kafka-gateway
- Flink Standalone Cluster s7i/flink
Application interface:
- Endpoint: http://localhost:8180/
- Web-Method:
POST
- Payload:
json
- Query for Todo List:
{"id": "{todo-list-id}"}
- Create a Todo List and Add first task:
{"add": "My Task #1"}
- Add next task to the list:
{"id": "{todo-list-id}","add": "My Task #2"}
- Remove task from the list:
{"id": "{todo-list-id}","remove": "My Task #1"}
The configuration could be provided via:
- Environment variable
CONFIG
- Flink's job parameter
--config
kafka-io:
- name: action
type: source
topic: TodoAction
properties:
bootstrap.servers: localhost:9092
group.id: todo-action
- name: reaction
type: sink
topic: TodoReaction
properties:
bootstrap.servers: localhost:9092
- name: txlog
type: sink
topic: ActionTxLog
semantic: EXACTLY_ONCE
properties:
bootstrap.servers: localhost:9092
- Find a last completed checkpoint:
curl http://localhost:8081/jobs/${JOB_ID}/checkpoints --silent | jq -r '.latest .completed .external_path'
-
Enabling RocksDB state
state.backend: rocksdb state.checkpoints.dir: file:/opt/flink/appdata/checkpointing state.savepoints.dir: file:/opt/flink/appdata/savepointing state.backend.incremental: true state.backend.rocksdb.timer-service.factory: rocksdb state.backend.rocksdb.localdir: /opt/flink/appdata/state-rocksdb
Flink CLI
- run a application
flink run -d todo-app.jar
How to produce a message on TodoAction topic:
kafka-console-producer.sh --topic TodoAction --bootstrap-server localhost:9092
How to create a compacted topic:
kafka-topics.sh --create \
--zookeeper zookeeper:2181 \
--topic TodoTxLog \
--replication-factor 1 \
--partitions 1 \
--config "cleanup.policy=compact" \
--config "delete.retention.ms=100" \
--config "segment.ms=100" \
--config "min.cleanable.dirty.ratio=0.01"