Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Debezium Server — LogMiner adapter → Kafka sink (LOB tables only)
# Complements OLR by handling LOB tables that OLR skips (skip-lob-tables=1).
# Events route to olr-lob-events topic and merge into OLR's "actual" stream.
quarkus.http.port=8083
debezium.sink.type=kafka
debezium.sink.kafka.producer.bootstrap.servers=localhost:9092
debezium.sink.kafka.producer.key.serializer=org.apache.kafka.common.serialization.StringSerializer
debezium.sink.kafka.producer.value.serializer=org.apache.kafka.common.serialization.StringSerializer

debezium.format.value=json
debezium.format.value.schemas.enable=false
debezium.format.key=json
debezium.format.key.schemas.enable=false

debezium.source.connector.class=io.debezium.connector.oracle.OracleConnector
debezium.source.database.connection.adapter=logminer
debezium.source.database.hostname=${VM_HOST}
debezium.source.database.port=1521
debezium.source.database.user=c##dbzuser
debezium.source.database.password=dbz
debezium.source.database.dbname=ORCLCDB
debezium.source.database.pdb.name=ORCLPDB
debezium.source.topic.prefix=olr-lob
debezium.source.schema.include.list=OLR_TEST
debezium.source.table.include.list=OLR_TEST.FUZZ_LOB
debezium.source.snapshot.mode=recovery
debezium.source.log.mining.strategy=online_catalog
debezium.source.lob.enabled=true

# Route all tables to a single topic for ordered delivery
debezium.transforms=route
debezium.transforms.route.type=org.apache.kafka.connect.transforms.RegexRouter
debezium.transforms.route.regex=.*
debezium.transforms.route.replacement=olr-lob-events

debezium.source.offset.storage=org.apache.kafka.connect.storage.KafkaOffsetBackingStore
debezium.source.offset.storage.topic=dbz-olr-lob-offsets
debezium.source.offset.storage.partitions=1
debezium.source.offset.storage.replication.factor=1
debezium.source.bootstrap.servers=localhost:9092
debezium.source.offset.flush.interval.ms=0
debezium.source.schema.history.internal=io.debezium.storage.file.history.FileSchemaHistory
debezium.source.schema.history.internal.file.filename=/debezium/data/schema-history.dat
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ debezium.format.key.schemas.enable=false

debezium.source.connector.class=io.debezium.connector.oracle.OracleConnector
debezium.source.database.connection.adapter=logminer
debezium.source.database.hostname=192.168.122.130
debezium.source.database.hostname=${VM_HOST}
debezium.source.database.port=1521
debezium.source.database.user=c##dbzuser
debezium.source.database.password=dbz
Expand Down
2 changes: 1 addition & 1 deletion tests/dbz-twin/rac/config/application-logminer.properties
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ debezium.format.key.schemas.enable=false

debezium.source.connector.class=io.debezium.connector.oracle.OracleConnector
debezium.source.database.connection.adapter=logminer
debezium.source.database.hostname=192.168.122.130
debezium.source.database.hostname=${VM_HOST}
debezium.source.database.port=1521
debezium.source.database.user=c##dbzuser
debezium.source.database.password=dbz
Expand Down
4 changes: 2 additions & 2 deletions tests/dbz-twin/rac/config/application-olr-kafka.properties
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ debezium.format.key.schemas.enable=false
debezium.source.connector.class=io.debezium.connector.oracle.OracleConnector
debezium.source.database.connection.adapter=olr
debezium.source.openlogreplicator.source=ORCLCDB
debezium.source.openlogreplicator.host=192.168.122.130
debezium.source.openlogreplicator.host=${VM_HOST}
debezium.source.openlogreplicator.port=5000
debezium.source.database.hostname=192.168.122.130
debezium.source.database.hostname=${VM_HOST}
debezium.source.database.port=1521
debezium.source.database.user=c##dbzuser
debezium.source.database.password=dbz
Expand Down
4 changes: 2 additions & 2 deletions tests/dbz-twin/rac/config/application-olr.properties
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ debezium.format.key.schemas.enable=false
debezium.source.connector.class=io.debezium.connector.oracle.OracleConnector
debezium.source.database.connection.adapter=olr
debezium.source.openlogreplicator.source=ORCLCDB
debezium.source.openlogreplicator.host=192.168.122.130
debezium.source.openlogreplicator.host=${VM_HOST}
debezium.source.openlogreplicator.port=5000
debezium.source.database.hostname=192.168.122.130
debezium.source.database.hostname=${VM_HOST}
debezium.source.database.port=1521
debezium.source.database.user=c##dbzuser
debezium.source.database.password=dbz
Expand Down
3 changes: 2 additions & 1 deletion tests/dbz-twin/rac/config/olr-config.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
"scn-type": 1,
"timestamp-type": 1,
"user-type": 0,
"redo-thread": 0
"redo-thread": 0,
"skip-lob-tables": 1
},
"filter": {
"table": [
Expand Down
7 changes: 5 additions & 2 deletions tests/dbz-twin/rac/db-check.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

Environment variables:
SQLITE_DB — SQLite database path (default: /app/data/fuzz.db)
ORACLE_HOST — Oracle host (default: 192.168.122.130)
ORACLE_HOST — Oracle host (required, set by fuzz-test.sh from VM_HOST)
ORACLE_DSN — Full Oracle DSN (overrides ORACLE_HOST)
"""

Expand Down Expand Up @@ -314,7 +314,10 @@ def print_results(name, matched, missing, extra, diffs):

def main():
sqlite_path = os.environ.get('SQLITE_DB', '/app/data/fuzz.db')
oracle_host = os.environ.get('ORACLE_HOST', '192.168.122.130')
oracle_host = os.environ.get('ORACLE_HOST')
if not oracle_host and not os.environ.get('ORACLE_DSN'):
print("ERROR: ORACLE_HOST or ORACLE_DSN must be set", file=sys.stderr)
sys.exit(1)
oracle_dsn = os.environ.get('ORACLE_DSN',
f"olr_test/olr_test@{oracle_host}:1521/ORCLPDB")

Expand Down
26 changes: 22 additions & 4 deletions tests/dbz-twin/rac/docker-compose-fuzz.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ services:
start_period: 15s

dbz-logminer:
image: quay.io/debezium/server:3.5.0.Beta1
image: rophy/debezium-server:3.5.0-2a7978c0af
container_name: fuzz-dbz-logminer
network_mode: host
environment:
VM_HOST: ${VM_HOST:?VM_HOST is required}
depends_on:
kafka:
condition: service_healthy
Expand All @@ -34,19 +36,33 @@ services:
- dbz-logminer-data:/debezium/data

dbz-olr:
image: quay.io/debezium/server:3.5.0.Beta1
image: rophy/debezium-server:3.5.0-2a7978c0af
container_name: fuzz-dbz-olr
network_mode: host
restart: unless-stopped
environment:
VM_HOST: ${VM_HOST:?VM_HOST is required}
depends_on:
kafka:
condition: service_healthy
volumes:
- ./config/application-olr-kafka.properties:/debezium/config/application.properties:ro
- ../lib/ojdbc8.jar:/debezium/lib/ojdbc8.jar:ro
- ../lib/debezium-connector-oracle-3.5.0.Beta1.jar:/debezium/lib/debezium-connector-oracle-3.5.0.Beta1.jar:ro
- dbz-olr-data:/debezium/data

dbz-lob-logminer:
image: rophy/debezium-server:3.5.0-2a7978c0af
container_name: fuzz-dbz-lob-logminer
network_mode: host
environment:
VM_HOST: ${VM_HOST:?VM_HOST is required}
depends_on:
kafka:
condition: service_healthy
volumes:
- ./config/application-lob-logminer-kafka.properties:/debezium/config/application.properties:ro
- ../lib/ojdbc8.jar:/debezium/lib/ojdbc8.jar:ro
- dbz-lob-logminer-data:/debezium/data

consumer:
build:
context: .
Expand All @@ -63,6 +79,7 @@ services:
SQLITE_DB: /app/data/fuzz.db
LM_TOPIC: lm-events
OLR_TOPIC: olr-events
OLR_LOB_TOPIC: olr-lob-events
volumes:
- ./kafka-consumer.py:/app/kafka-consumer.py:ro
- fuzz-data:/app/data
Expand Down Expand Up @@ -90,4 +107,5 @@ services:
volumes:
dbz-logminer-data:
dbz-olr-data:
dbz-lob-logminer-data:
fuzz-data:
41 changes: 22 additions & 19 deletions tests/dbz-twin/rac/fuzz-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@
# run [duration-min] Deploy fuzz workload and run for N minutes (default: 30)
# status Show consumer/validator status and OLR memory
# validate Run validator (wait for idle timeout, report results)
# logs [component] Show logs (kafka, logminer, olr, consumer, validator, olr-vm)
# logs [component] Show logs (kafka, logminer, olr, lob-logminer, consumer, validator, olr-vm)
# down Stop and remove all containers + volumes
# help Show this help
#
# Typical workflow:
# ./fuzz-test.sh up # start infrastructure
# ./fuzz-test.sh run 60 # run 60-minute fuzz workload
# ./fuzz-test.sh down # clean up any previous run
# ./fuzz-test.sh up # start infrastructure
# ./fuzz-test.sh run 60 # run 60-minute fuzz workload
# ./fuzz-test.sh status # check progress
# ./fuzz-test.sh validate # wait for drain + validate
# ./fuzz-test.sh logs validator # investigate mismatches
Expand Down Expand Up @@ -84,7 +85,7 @@ _seed_debezium_offsets() {

# topic_prefix matches debezium.source.topic.prefix in each connector config
# offset_topic matches debezium.source.offset.storage.topic
local -A topics=( [logminer]=dbz-lm-offsets [olr]=dbz-olr-offsets )
local -A topics=( [logminer]=dbz-lm-offsets [olr]=dbz-olr-offsets [olr-lob]=dbz-olr-lob-offsets )
for topic_prefix in "${!topics[@]}"; do
local offset_topic="${topics[$topic_prefix]}"
local offset_key="[\"kafka\",{\"server\":\"${topic_prefix}\"}]"
Expand Down Expand Up @@ -212,15 +213,16 @@ action_up() {

# Wait for Debezium connectors
echo " Waiting for Debezium connectors..."
for i in $(seq 1 60); do
LM_OK=false; OLR_OK=false
docker logs fuzz-dbz-logminer 2>&1 | tail -10 | grep -q "Starting streaming" && LM_OK=true
docker logs fuzz-dbz-olr 2>&1 | tail -10 | grep -q "streaming client started\|Starting streaming" && OLR_OK=true
if $LM_OK && $OLR_OK; then
echo " Debezium: ready"
for i in $(seq 1 90); do
LM_OK=false; OLR_OK=false; LOB_LM_OK=false
docker logs fuzz-dbz-logminer 2>&1 | grep -q "Starting streaming" && LM_OK=true
docker logs fuzz-dbz-olr 2>&1 | grep -q "streaming client started\|Starting streaming" && OLR_OK=true
docker logs fuzz-dbz-lob-logminer 2>&1 | grep -q "Starting streaming" && LOB_LM_OK=true
if $LM_OK && $OLR_OK && $LOB_LM_OK; then
echo " Debezium: ready (3 connectors)"
break
fi
[[ $i -eq 60 ]] && { echo "ERROR: Debezium connectors did not start" >&2; exit 1; }
[[ $i -eq 90 ]] && { echo "ERROR: Debezium connectors did not start" >&2; exit 1; }
sleep 2
done

Expand Down Expand Up @@ -427,19 +429,20 @@ action_validate() {
action_logs() {
local component="${1:-}"
case "$component" in
kafka) docker logs fuzz-kafka 2>&1 ;;
logminer) docker logs fuzz-dbz-logminer 2>&1 ;;
olr) docker logs fuzz-dbz-olr 2>&1 ;;
consumer) docker logs fuzz-consumer 2>&1 ;;
validator) docker logs fuzz-validator 2>&1 ;;
olr-vm) ssh $_SSH_OPTS "${VM_USER}@${VM_HOST}" "podman logs $OLR_CONTAINER" 2>/dev/null ;;
kafka) docker logs fuzz-kafka 2>&1 ;;
logminer) docker logs fuzz-dbz-logminer 2>&1 ;;
olr) docker logs fuzz-dbz-olr 2>&1 ;;
lob-logminer) docker logs fuzz-dbz-lob-logminer 2>&1 ;;
consumer) docker logs fuzz-consumer 2>&1 ;;
validator) docker logs fuzz-validator 2>&1 ;;
olr-vm) ssh $_SSH_OPTS "${VM_USER}@${VM_HOST}" "podman logs $OLR_CONTAINER" 2>/dev/null ;;
"")
echo "Usage: $0 logs <component>"
echo "Components: kafka, logminer, olr, consumer, validator, olr-vm"
echo "Components: kafka, logminer, olr, lob-logminer, consumer, validator, olr-vm"
;;
*)
echo "Unknown component: $component" >&2
echo "Components: kafka, logminer, olr, consumer, validator, olr-vm"
echo "Components: kafka, logminer, olr, lob-logminer, consumer, validator, olr-vm"
exit 1
Comment thread
coderabbitai[bot] marked this conversation as resolved.
;;
esac
Expand Down
26 changes: 18 additions & 8 deletions tests/dbz-twin/rac/kafka-consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,18 @@ def extract_event_info(event):

LM_TOPIC = os.environ.get('LM_TOPIC', 'lm-events')
OLR_TOPIC = os.environ.get('OLR_TOPIC', 'olr-events')
OLR_LOB_TOPIC = os.environ.get('OLR_LOB_TOPIC', 'olr-lob-events')


def determine_adapter(topic):
"""Determine adapter (logminer or olr) from Kafka topic name."""
"""Determine adapter (logminer or olr) from Kafka topic name.

The OLR LOB topic (LogMiner for LOB tables) is treated as 'olr' because
it complements OLR on the "actual" side of the comparison.
"""
if topic == LM_TOPIC:
return 'logminer'
elif topic == OLR_TOPIC:
elif topic in (OLR_TOPIC, OLR_LOB_TOPIC):
return 'olr'
# Fallback for per-table topics
if topic.startswith('logminer'):
Expand Down Expand Up @@ -139,18 +144,23 @@ def main():
sys.exit(1)

# Wait for topics to appear, then subscribe
print(f"Waiting for topics: {LM_TOPIC}, {OLR_TOPIC}...", flush=True)
for attempt in range(60):
all_topics = [LM_TOPIC, OLR_TOPIC, OLR_LOB_TOPIC]
print(f"Waiting for topics: {', '.join(all_topics)}...", flush=True)
for _ in range(60):
topics = consumer.topics()
if LM_TOPIC in topics or OLR_TOPIC in topics:
print(f" Found topics: {[t for t in (LM_TOPIC, OLR_TOPIC) if t in topics]}", flush=True)
if all(t in topics for t in all_topics):
print(f" Found all topics: {all_topics}", flush=True)
break
time.sleep(5)
else:
missing = [t for t in all_topics if t not in topics]
print(f"ERROR: Missing Kafka topics after 5 min: {missing}", flush=True)
sys.exit(1)

consumer.subscribe([LM_TOPIC, OLR_TOPIC])
consumer.subscribe(all_topics)
# Force metadata refresh
consumer.poll(timeout_ms=1000)
print(f"Subscribed to {LM_TOPIC} and {OLR_TOPIC}", flush=True)
print(f"Subscribed to {', '.join(all_topics)}", flush=True)
Comment thread
coderabbitai[bot] marked this conversation as resolved.

# Track per-event_id sequence numbers for LOB split handling.
lm_seq = {} # event_id -> next seq
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ debezium.format.key.schemas.enable=false

debezium.source.connector.class=io.debezium.connector.oracle.OracleConnector
debezium.source.database.connection.adapter=logminer
debezium.source.database.hostname=192.168.122.130
debezium.source.database.hostname=${VM_HOST}
debezium.source.database.port=1521
debezium.source.database.user=c##dbzuser
debezium.source.database.password=dbz
Expand Down
4 changes: 2 additions & 2 deletions tests/dbz-twin/rac/perf/config/application-olr.properties
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ debezium.format.key.schemas.enable=false
debezium.source.connector.class=io.debezium.connector.oracle.OracleConnector
debezium.source.database.connection.adapter=olr
debezium.source.openlogreplicator.source=ORCLCDB
debezium.source.openlogreplicator.host=192.168.122.130
debezium.source.openlogreplicator.host=${VM_HOST}
debezium.source.openlogreplicator.port=5000
debezium.source.database.hostname=192.168.122.130
debezium.source.database.hostname=${VM_HOST}
debezium.source.database.port=1521
debezium.source.database.user=c##dbzuser
debezium.source.database.password=dbz
Expand Down
6 changes: 4 additions & 2 deletions tests/dbz-twin/rac/perf/config/prometheus.yml
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
# Auto-generated by perf/run.sh — do not edit manually.
# VM_HOST is substituted at runtime from vm-env.sh.
global:
scrape_interval: 5s

scrape_configs:
- job_name: 'node-exporter'
static_configs:
- targets: ['192.168.122.130:9100']
- targets: ['${VM_HOST}:9100']
- job_name: 'cadvisor'
static_configs:
- targets: ['192.168.122.130:9101']
- targets: ['${VM_HOST}:9101']
2 changes: 1 addition & 1 deletion tests/dbz-twin/rac/perf/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ services:
# Override with: docker compose run swingbench -uc 8 -rt 01:00.00
command:
- "-cs"
- "//192.168.122.130:1521/ORCLPDB"
- "//${VM_HOST:?VM_HOST is required}:1521/ORCLPDB"
- "-u"
- "soe"
- "-p"
Expand Down
Loading
Loading