-
Notifications
You must be signed in to change notification settings - Fork 206
/
start-plaintext.sh
executable file
·122 lines (88 loc) · 6.63 KB
/
start-plaintext.sh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
#!/bin/bash
set -e
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null && pwd )"
source ${DIR}/../../scripts/utils.sh
if ! version_gt $TAG_BASE "6.9.9"; then
logwarn "WARN: Cluster Linking is GA since CP 7.0 only"
exit 111
fi
${DIR}/../../environment/mdc-plaintext/start.sh "${PWD}/docker-compose.mdc-plaintext.yml"
log "Create topic demo"
docker exec broker-europe kafka-topics --create --topic demo --bootstrap-server broker-us:9092 --replication-factor 1 --partitions 1
log "Sending 20 messages in US cluster"
seq -f "us_sale_%g ${RANDOM}" 20 | docker container exec -i connect-us bash -c "kafka-console-producer --broker-list broker-us:9092 --topic demo"
log "Verify we have received the data in source cluster using consumer group id my-consumer-group, we read only 5 messages"
docker container exec -i connect-us bash -c "kafka-console-consumer --bootstrap-server broker-us:9092 --topic demo --from-beginning --max-messages 5 --consumer-property group.id=my-consumer-group"
log "Create the cluster link on the destination cluster (with metadata.max.age.ms=5 seconds + consumer.offset.sync.enable=true + consumer.offset.sync.ms=3000 + consumer.offset.sync.json set to all consumer groups)"
docker cp consumer.offset.sync.json broker-europe:/tmp/consumer.offset.sync.json
docker exec broker-europe kafka-cluster-links --bootstrap-server broker-europe:9092 --create --link demo-link --config bootstrap.servers=broker-us:9092,metadata.max.age.ms=5000,consumer.offset.sync.enable=true,consumer.offset.sync.ms=3000 --consumer-group-filters-json-file /tmp/consumer.offset.sync.json
log "Initialize the topic mirror for topic demo"
docker exec broker-europe kafka-mirrors --create --mirror-topic demo --link demo-link --bootstrap-server broker-europe:9092
log "Check the replica status on the destination"
docker exec broker-europe kafka-replica-status --topics demo --include-linked --bootstrap-server broker-europe:9092
log "Wait 6 seconds for consumer.offset sync to happen (2 times consumer.offset.sync.ms=3000)"
sleep 6
log "Verify that current offset is consistent in source and destination"
log "Describe consumer group my-consumer-group at Source cluster"
docker exec broker-europe kafka-consumer-groups --bootstrap-server broker-us:9092 --describe --group my-consumer-group
log "Describe consumer group my-consumer-group at Destination cluster"
docker exec broker-europe kafka-consumer-groups --bootstrap-server broker-europe:9092 --describe --group my-consumer-group
log "Consume from the mirror topic on the destination cluster and verify consumer offset is working, it should start at 6"
docker container exec -i connect-us bash -c "kafka-console-consumer --bootstrap-server broker-europe:9092 --topic demo --max-messages 5 --consumer-property group.id=my-consumer-group"
log "Describe consumer group my-consumer-group at Destination cluster."
docker exec broker-europe kafka-consumer-groups --bootstrap-server broker-europe:9092 --describe --group my-consumer-group
log "sleep 6 seconds"
sleep 6
log "Describe consumer group my-consumer-group at Destination cluster. Note: the current-offset has been overwritten due to the consumer offset sync"
docker exec broker-europe kafka-consumer-groups --bootstrap-server broker-europe:9092 --describe --group my-consumer-group
log "Stop consumer offset sync for consumer group my-consumer-group"
echo "consumer.offset.group.filters={\"groupFilters\": [ \
{ \
\"name\": \"*\", \
\"patternType\": \"LITERAL\", \
\"filterType\": \"INCLUDE\" \
}, \
{ \
\"name\": \"my-consumer-group\", \
\"patternType\": \"LITERAL\", \
\"filterType\": \"EXCLUDE\" \
} \
]}" > newFilters.properties
docker cp newFilters.properties broker-europe:/tmp/newFilters.properties
docker exec broker-europe kafka-configs --bootstrap-server broker-europe:9092 --alter --cluster-link demo-link --add-config-file /tmp/newFilters.properties
sleep 6
log "Consume from the source cluster another 10 messages, up to 15"
docker container exec -i connect-us bash -c "kafka-console-consumer --bootstrap-server broker-us:9092 --topic demo --max-messages 10 --consumer-property group.id=my-consumer-group"
log "Consume from the destination cluster, it will continue from it's last offset 10"
docker container exec -i connect-us bash -c "kafka-console-consumer --bootstrap-server broker-europe:9092 --topic demo --max-messages 5 --consumer-property group.id=my-consumer-group"
log "Verify that the topic mirror is read-only"
seq -f "europe_sale_%g ${RANDOM}" 10 | docker container exec -i connect-us bash -c "kafka-console-producer --broker-list broker-europe:9092 --topic demo"
log "Modify the source topic config, set retention.ms"
docker container exec -i connect-us kafka-configs --alter --topic demo --add-config retention.ms=123456890 --bootstrap-server broker-us:9092
log "Check the Source Topic Configuration"
docker container exec -i connect-us kafka-configs --describe --topic demo --bootstrap-server broker-us:9092
log "Wait 6 seconds (default is 5 minutes metadata.max.age.ms, but we modified it to 5 seconds)"
sleep 6
log "Check the Destination Topic Configuration"
docker container exec -i connect-us kafka-configs --describe --topic demo --bootstrap-server broker-europe:9092
log "Alter the number of partitions on the source topic"
docker container exec -i connect-us kafka-topics --alter --topic demo --partitions 8 --bootstrap-server broker-us:9092
log "Verify the change on the source topic"
docker container exec -i connect-us kafka-topics --describe --topic demo --bootstrap-server broker-us:9092
log "Wait 6 seconds (default is 5 minutes metadata.max.age.ms, but we modified it to 5 seconds)"
sleep 6
log "Verify the change on the destination topic"
docker container exec -i connect-us kafka-topics --describe --topic demo --bootstrap-server broker-europe:9092
log "List mirror topics"
docker container exec -i connect-us kafka-cluster-links --list --link demo-link --include-topics --bootstrap-server broker-europe:9092
log "Cut over the mirror topic to make it writable"
docker container exec -i connect-us kafka-mirrors --promote --topics demo --bootstrap-server broker-europe:9092
log "Produce to both topics to verify divergence"
log "Sending data again in US cluster"
seq -f "us_sale_%g ${RANDOM}" 10 | docker container exec -i connect-us bash -c "kafka-console-producer --broker-list broker-us:9092 --topic demo"
log "Sending data in EUROPE cluster"
seq -f "europe_sale_%g ${RANDOM}" 10 | docker container exec -i connect-us bash -c "kafka-console-producer --broker-list broker-europe:9092 --topic demo"
set +e
log "Delete the cluster link"
docker container exec -i connect-us kafka-cluster-links --bootstrap-server broker-europe:9092 --delete --link demo-link
exit 0