Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(risedev): use docker for kafka service #16536

Merged
merged 12 commits into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 0 additions & 24 deletions Makefile.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ extend = [
{ path = "src/risedevtool/minio.toml" },
{ path = "src/risedevtool/etcd.toml" },
{ path = "src/risedevtool/tempo.toml" },
{ path = "src/risedevtool/kafka.toml" },
{ path = "src/risedevtool/gcloud-pubsub.toml" },
{ path = "src/risedevtool/redis.toml" },
{ path = "src/risedevtool/connector.toml" },
Expand Down Expand Up @@ -498,7 +497,6 @@ dependencies = [
"download-etcd",
"download-grafana",
"download-tempo",
"download-kafka",
"download-mcli",
"download-minio",
"download-prometheus",
Expand Down Expand Up @@ -556,7 +554,6 @@ dependencies = [
"download-grafana",
"download-prometheus",
"download-tempo",
"download-kafka",
"download-redis",
]

Expand Down Expand Up @@ -707,27 +704,13 @@ script = '''

set -euo pipefail

wait_kafka_exit() {
# Follow kafka-server-stop.sh
while [[ -n "$(ps ax | grep ' kafka\.Kafka ' | grep java | grep -v grep | awk '{print $1}')" ]]; do
echo "Waiting for kafka to exit"
sleep 1
done
}

kill_kafka() {
${PREFIX_BIN}/kafka/bin/kafka-server-stop.sh
wait_kafka_exit
}

if ! ${TMUX} ls &>/dev/null ; then
echo "No risedev cluster to kill. Exiting..."
exit 0
fi

# Kill other components with Ctrl+C/Ctrl+D
${TMUX} list-windows -F "#{window_name} #{pane_id}" \
| grep --invert-match --extended-regexp '(kafka)' \
| awk '{ print $2 }' \
| xargs -I {} ${TMUX} send-keys -t {} C-c C-d

Expand All @@ -738,13 +721,6 @@ if [[ -n ${containers} ]]; then
docker stop ${containers}
fi

# Kill kafka cleanly. Ctrl+C will lose data.
if [[ -n $(${TMUX} list-windows | grep kafka) ]];
then
echo "kill kafka"
kill_kafka || true
fi

${TMUX} kill-server
test $? -eq 0 || { echo "Failed to stop all RiseDev components."; exit 1; }
'''
Expand Down
1 change: 0 additions & 1 deletion ci/risedev-components.ci.benchmark.env
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
RISEDEV_CONFIGURED=true
ENABLE_MINIO=true
ENABLE_ETCD=true
ENABLE_KAFKA=true
ENABLE_BUILD_RUST=true
ENABLE_RELEASE_PROFILE=true
ENABLE_PROMETHEUS_GRAFANA=true
1 change: 0 additions & 1 deletion ci/risedev-components.ci.source.env
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
RISEDEV_CONFIGURED=true
ENABLE_MINIO=true
ENABLE_ETCD=true
ENABLE_KAFKA=true
ENABLE_PUBSUB=true
78 changes: 40 additions & 38 deletions ci/scripts/e2e-kafka-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,46 +3,48 @@
# Exits as soon as any line fails.
set -euo pipefail

./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only --create > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert --create > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert-schema --create > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-debezium --create > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-without-snapshot --create > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-text-key-id --create > /dev/null 2>&1
export RPK_BROKERS="message_queue:29092"

rpk topic create test-rw-sink-append-only
rpk topic create test-rw-sink-upsert
rpk topic create test-rw-sink-upsert-schema
rpk topic create test-rw-sink-debezium
rpk topic create test-rw-sink-without-snapshot
rpk topic create test-rw-sink-text-key-id

sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/create_sink.slt'
sleep 2

# test append-only kafka sink
echo "testing append-only kafka sink"
diff ./e2e_test/sink/kafka/append_only1.result \
<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only --from-beginning --max-messages 10 | sort) 2> /dev/null)
diff -b ./e2e_test/sink/kafka/append_only1.result \
<((rpk topic consume test-rw-sink-append-only --offset start --format '%v\n' --num 10 | sort) 2> /dev/null)
if [ $? -ne 0 ]; then
echo "The output for append-only sink is not as expected."
exit 1
fi

# test upsert kafka sink
echo "testing upsert kafka sink"
diff ./e2e_test/sink/kafka/upsert1.result \
<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert --from-beginning --property print.key=true --max-messages 10 | sort) 2> /dev/null)
diff -b ./e2e_test/sink/kafka/upsert1.result \
<((rpk topic consume test-rw-sink-upsert --offset start --format '%k\t%v\n' --num 10 | sort) 2> /dev/null)
if [ $? -ne 0 ]; then
echo "The output for upsert sink is not as expected."
exit 1
fi

# test upsert kafka sink with schema
echo "testing upsert kafka sink with schema"
diff ./e2e_test/sink/kafka/upsert_schema1.result \
<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert-schema --from-beginning --property print.key=true --max-messages 10 | sort) 2> /dev/null)
diff -b ./e2e_test/sink/kafka/upsert_schema1.result \
<((rpk topic consume test-rw-sink-upsert-schema --offset start --format '%k\t%v\n' --num 10 | sort) 2> /dev/null)
if [ $? -ne 0 ]; then
echo "The output for upsert sink with schema is not as expected."
exit 1
fi

# test debezium kafka sink
echo "testing debezium kafka sink"
(./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-debezium --property print.key=true --from-beginning --max-messages 10 | sort) > ./e2e_test/sink/kafka/debezium1.tmp.result 2> /dev/null
(rpk topic consume test-rw-sink-debezium --offset start --format '%k\t%v\n' --num 10 | sort) > ./e2e_test/sink/kafka/debezium1.tmp.result 2> /dev/null
python3 e2e_test/sink/kafka/debezium.py e2e_test/sink/kafka/debezium1.result e2e_test/sink/kafka/debezium1.tmp.result
if [ $? -ne 0 ]; then
echo "The output for debezium sink is not as expected."
Expand All @@ -58,34 +60,34 @@ psql -h localhost -p 4566 -d dev -U root -c "update t_kafka set v_varchar = '',

# test append-only kafka sink after update
echo "testing append-only kafka sink after updating data"
diff ./e2e_test/sink/kafka/append_only2.result \
<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only --from-beginning --max-messages 11 | sort) 2> /dev/null)
diff -b ./e2e_test/sink/kafka/append_only2.result \
<((rpk topic consume test-rw-sink-append-only --offset start --format '%v\n' --num 11 | sort) 2> /dev/null)
if [ $? -ne 0 ]; then
echo "The output for append-only sink after update is not as expected."
exit 1
fi

# test upsert kafka sink after update
echo "testing upsert kafka sink after updating data"
diff ./e2e_test/sink/kafka/upsert2.result \
<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert --from-beginning --property print.key=true --max-messages 11 | sort) 2> /dev/null)
diff -b ./e2e_test/sink/kafka/upsert2.result \
<((rpk topic consume test-rw-sink-upsert --offset start --format '%k\t%v\n' --num 11 | sort) 2> /dev/null)
if [ $? -ne 0 ]; then
echo "The output for upsert sink after update is not as expected."
exit 1
fi

# test upsert kafka sink with schema after update
echo "testing upsert kafka sink with schema after updating data"
diff ./e2e_test/sink/kafka/upsert_schema2.result \
<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert-schema --from-beginning --property print.key=true --max-messages 11 | sort) 2> /dev/null)
diff -b ./e2e_test/sink/kafka/upsert_schema2.result \
<((rpk topic consume test-rw-sink-upsert-schema --offset start --format '%k\t%v\n' --num 11 | sort) 2> /dev/null)
if [ $? -ne 0 ]; then
echo "The output for upsert sink with schema is not as expected."
exit 1
fi

# test debezium kafka sink after update
echo "testing debezium kafka sink after updating data"
(./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-debezium --property print.key=true --from-beginning --max-messages 11 | sort) > ./e2e_test/sink/kafka/debezium2.tmp.result 2> /dev/null
(rpk topic consume test-rw-sink-debezium --offset start --format '%k\t%v\n' --num 11 | sort) > ./e2e_test/sink/kafka/debezium2.tmp.result 2> /dev/null
python3 e2e_test/sink/kafka/debezium.py e2e_test/sink/kafka/debezium2.result e2e_test/sink/kafka/debezium2.tmp.result
if [ $? -ne 0 ]; then
echo "The output for debezium sink after update is not as expected."
Expand All @@ -97,8 +99,8 @@ fi

# test without-snapshot kafka sink
echo "testing without-snapshot kafka sink"
diff ./e2e_test/sink/kafka/without_snapshot.result \
<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-without-snapshot --from-beginning --max-messages 3 | sort) 2> /dev/null)
diff -b ./e2e_test/sink/kafka/without_snapshot.result \
<((rpk topic consume test-rw-sink-without-snapshot --offset start --format '%v\n' --num 3 | sort) 2> /dev/null)
if [ $? -ne 0 ]; then
echo "The output for append-only sink is not as expected."
exit 1
Expand All @@ -110,25 +112,25 @@ psql -h localhost -p 4566 -d dev -U root -c "delete from t_kafka where id = 1;"

# test upsert kafka sink after delete
echo "testing upsert kafka sink after deleting data"
diff ./e2e_test/sink/kafka/upsert3.result \
<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert --from-beginning --property print.key=true --max-messages 12 | sort) 2> /dev/null)
diff -b ./e2e_test/sink/kafka/upsert3.result \
<((rpk topic consume test-rw-sink-upsert --offset start --format '%k\t%v\n' --num 12 | sort) 2> /dev/null)
if [ $? -ne 0 ]; then
echo "The output for upsert sink after update is not as expected."
exit 1
fi

# test upsert kafka sink with schema after delete
echo "testing upsert kafka sink with schema after deleting data"
diff ./e2e_test/sink/kafka/upsert_schema3.result \
<((./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert-schema --from-beginning --property print.key=true --max-messages 12 | sort) 2> /dev/null)
diff -b ./e2e_test/sink/kafka/upsert_schema3.result \
<((rpk topic consume test-rw-sink-upsert-schema --offset start --format '%k\t%v\n' --num 12 | sort) 2> /dev/null)
if [ $? -ne 0 ]; then
echo "The output for upsert sink with schema is not as expected."
exit 1
fi

# test debezium kafka sink after delete
echo "testing debezium kafka sink after deleting data"
(./.risingwave/bin/kafka/bin/kafka-console-consumer.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-debezium --property print.key=true --from-beginning --max-messages 13 | sort) > ./e2e_test/sink/kafka/debezium3.tmp.result 2> /dev/null
(rpk topic consume test-rw-sink-debezium --offset start --format '%k\t%v\n' --num 13 | sort) > ./e2e_test/sink/kafka/debezium3.tmp.result 2> /dev/null
python3 e2e_test/sink/kafka/debezium.py e2e_test/sink/kafka/debezium3.result e2e_test/sink/kafka/debezium3.tmp.result
if [ $? -ne 0 ]; then
echo "The output for debezium sink after delete is not as expected."
Expand All @@ -139,29 +141,29 @@ else
fi

sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/drop_sink.slt'
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only --delete > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert --delete > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-debezium --delete > /dev/null 2>&1
rpk topic delete test-rw-sink-append-only
rpk topic delete test-rw-sink-upsert
rpk topic delete test-rw-sink-debezium

# test different encoding
echo "preparing confluent schema registry"
python3 -m pip install --break-system-packages requests confluent-kafka

echo "testing protobuf"
cp src/connector/src/test_data/proto_recursive/recursive.pb ./proto-recursive
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf --create > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf-csr-a --create > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf-csr-hi --create > /dev/null 2>&1
rpk topic create test-rw-sink-append-only-protobuf
rpk topic create test-rw-sink-append-only-protobuf-csr-a
rpk topic create test-rw-sink-append-only-protobuf-csr-hi
python3 e2e_test/sink/kafka/register_schema.py 'http://message_queue:8081' 'test-rw-sink-append-only-protobuf-csr-a-value' src/connector/src/test_data/test-index-array.proto
python3 e2e_test/sink/kafka/register_schema.py 'http://message_queue:8081' 'test-rw-sink-append-only-protobuf-csr-hi-value' src/connector/src/test_data/test-index-array.proto
sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/protobuf.slt'
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf-csr-hi --delete > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf-csr-a --delete > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf --delete > /dev/null 2>&1
rpk topic delete test-rw-sink-append-only-protobuf
rpk topic delete test-rw-sink-append-only-protobuf-csr-a
rpk topic delete test-rw-sink-append-only-protobuf-csr-hi

echo "testing avro"
python3 e2e_test/sink/kafka/register_schema.py 'http://message_queue:8081' 'test-rw-sink-upsert-avro-value' src/connector/src/test_data/all-types.avsc
python3 e2e_test/sink/kafka/register_schema.py 'http://message_queue:8081' 'test-rw-sink-upsert-avro-key' src/connector/src/test_data/all-types.avsc 'string_field,int32_field'
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert-avro --create > /dev/null 2>&1
rpk topic create test-rw-sink-upsert-avro
sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/avro.slt'
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert-avro --delete > /dev/null 2>&1
rpk topic delete test-rw-sink-upsert-avro
14 changes: 6 additions & 8 deletions e2e_test/sink/kafka/debezium.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
# debezium sink sends k/v pair
kv = line.split()
key = json.loads(kv[0])
# kafka consumer outputs string "null" for null payload
if kv[1] == "null":
value = kv[1]
# rpk output nothing for null payload
if len(kv) == 1:
value = None
else:
value = json.loads(kv[1])
# The `ts_ms` field may vary, so we delete it from the json object
Expand All @@ -26,12 +26,10 @@
with open(test_output_file) as file:
for line in file:
kv = line.split()
if len(kv) != 2:
print(line)
assert(len(kv) == 2)
key = json.loads(kv[0])
if kv[1] == "null":
value = kv[1]
# rpk output nothing for null payload
if len(kv) == 1:
value = None
else:
value = json.loads(kv[1])
# Assert `ts_ms` is an integer here.
Expand Down
Loading
Loading