diff --git a/CLAUDE.md b/CLAUDE.md index 43c994c2..4a4d7c32 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -1 +1,7 @@ @AGENTS.md + +## GitHub Policy + +**NEVER** create PRs or issues against upstream (`bersler/OpenLogReplicator`). +All PRs and issues go to `rophy/OpenLogReplicator` only. +`gh repo set-default` is configured to `rophy/OpenLogReplicator`. diff --git a/tests/dbz-twin/compare-debezium.py b/tests/dbz-twin/compare-debezium.py index ed32d201..604ee123 100755 --- a/tests/dbz-twin/compare-debezium.py +++ b/tests/dbz-twin/compare-debezium.py @@ -1,19 +1,22 @@ #!/usr/bin/env python3 """Compare Debezium LogMiner vs OLR adapter outputs. -Usage: compare-debezium.py +Usage: compare-debezium.py [--exclude-tables T1,T2,...] Both inputs are JSONL files with Debezium envelope events: {"before":..., "after":..., "source":..., "op":..., "ts_ms":...} -Compares records positionally (both sides emit in SCN order) while -ignoring connector-specific metadata (source block, timestamps). +Uses content-based matching grouped by (txId, table, op) to handle +cross-node ordering differences in RAC. Falls back to positional +comparison within groups. Exits 0 on match, 1 on mismatch with diff report. """ +import argparse import json import sys +from collections import defaultdict OP_MAP = {'c': 'INSERT', 'u': 'UPDATE', 'd': 'DELETE'} @@ -24,6 +27,10 @@ 'X19kZWJleml1bV91bmF2YWlsYWJsZV92YWx1ZQ==', } +# Tables always excluded from comparison (stats/bookkeeping, not test data). +# Additional tables can be excluded via --exclude-tables CLI flag. +EXCLUDED_TABLES = {'FUZZ_STATS'} + def is_unavailable(v): """Check if a normalized value is Debezium's unavailable marker.""" @@ -44,8 +51,9 @@ def normalize_columns(d): return {k: normalize_value(v) for k, v in d.items()} -def parse_debezium_jsonl(path): +def parse_debezium_jsonl(path, excluded_tables=None): """Parse a Debezium JSONL file into normalized records.""" + skip_tables = EXCLUDED_TABLES | (excluded_tables or set()) records = [] with open(path) as f: for line in f: @@ -58,15 +66,21 @@ def parse_debezium_jsonl(path): table = source.get('table', '') schema = source.get('schema', '') op = event.get('op', '') + tx_id = source.get('txId', '') # Skip non-DML events if op not in OP_MAP: continue + # Skip excluded tables + if table in skip_tables: + continue + records.append({ 'op': OP_MAP[op], 'schema': schema, 'table': table, + 'txId': tx_id, 'before': normalize_columns(event.get('before')), 'after': normalize_columns(event.get('after')), }) @@ -139,6 +153,7 @@ def _do_merge(prev, curr): 'op': prev['op'], 'schema': prev['schema'], 'table': prev['table'], + 'txId': prev.get('txId', ''), 'after': _merge_columns(prev.get('after', {}), curr.get('after', {})), 'before': prev.get('before', {}), } @@ -190,46 +205,148 @@ def columns_match(cols_a, cols_b, section='after'): return diffs -def compare(lm_records, olr_records): - """Compare LogMiner vs OLR records positionally.""" +def record_match_score(lm, olr): + """Score how well two records match. Returns (matches, mismatches). + Higher matches and lower mismatches = better match.""" + if lm['op'] != olr['op'] or lm['table'] != olr['table']: + return (0, 999) + + matches = 0 + mismatches = 0 + + for section in ('after', 'before'): + lm_cols = lm.get(section, {}) + olr_cols = olr.get(section, {}) + for key in set(lm_cols) & set(olr_cols): + va, vb = lm_cols.get(key), olr_cols.get(key) + if va is None or vb is None: + continue + if section == 'before' and (is_unavailable(va) or is_unavailable(vb)): + continue + if values_match(va, vb): + matches += 1 + else: + mismatches += 1 + + return (matches, mismatches) + + +def match_within_group(group_key, lm_group, olr_group): + """Content-based matching within a (txId, table, op) group. + Returns list of diff strings.""" diffs = [] - if len(lm_records) != len(olr_records): + if len(lm_group) != len(olr_group): diffs.append( - f"Record count mismatch: LogMiner={len(lm_records)}, OLR={len(olr_records)}" + f"Group {group_key}: count mismatch " + f"LogMiner={len(lm_group)}, OLR={len(olr_group)}" ) - for i, (lm, olr) in enumerate(zip(lm_records, olr_records)): - if lm['op'] != olr['op'] or lm['table'] != olr['table']: - diffs.append( - f"Record #{i+1}: op/table mismatch: " - f"LogMiner={lm['op']} {lm['table']}, " - f"OLR={olr['op']} {olr['table']}" - ) - continue + # Greedy best-match: for each LM record, find best OLR match + available = list(range(len(olr_group))) + matched_pairs = [] + + for li, lm in enumerate(lm_group): + best_idx = None + best_score = (0, 999) + for ai, oi in enumerate(available): + score = record_match_score(lm, olr_group[oi]) + # Better = more matches, fewer mismatches + if (score[1] < best_score[1]) or \ + (score[1] == best_score[1] and score[0] > best_score[0]): + best_score = score + best_idx = ai + if best_idx is not None: + matched_pairs.append((li, available.pop(best_idx))) + else: + diffs.append(f"Group {group_key}: LM record {li} has no OLR match") + + # Report diffs for matched pairs + for li, oi in matched_pairs: + lm = lm_group[li] + olr = olr_group[oi] if lm['op'] in ('INSERT', 'UPDATE'): - cd = columns_match(lm.get('after', {}), olr.get('after', {}), section='after') + cd = columns_match( + lm.get('after', {}), olr.get('after', {}), section='after') if cd: - diffs.append(f"Record #{i+1} ({lm['op']} {lm['table']}) 'after' diffs:") + diffs.append( + f"Group {group_key} ({lm['op']} {lm['table']}) 'after' diffs:") diffs.extend(cd) if lm['op'] in ('UPDATE', 'DELETE'): - cd = columns_match(lm.get('before', {}), olr.get('before', {}), section='before') + cd = columns_match( + lm.get('before', {}), olr.get('before', {}), section='before') if cd: - diffs.append(f"Record #{i+1} ({lm['op']} {lm['table']}) 'before' diffs:") + diffs.append( + f"Group {group_key} ({lm['op']} {lm['table']}) 'before' diffs:") diffs.extend(cd) + # Unmatched OLR records + for oi in available: + diffs.append( + f"Group {group_key}: extra OLR record " + f"({olr_group[oi]['op']} {olr_group[oi]['table']})") + return diffs -def main(): - if len(sys.argv) != 3: - print(f"Usage: {sys.argv[0]} ", file=sys.stderr) - sys.exit(2) +def compare(lm_records, olr_records): + """Compare LogMiner vs OLR records using content-based matching.""" + diffs = [] + + if len(lm_records) != len(olr_records): + diffs.append( + f"Record count mismatch: LogMiner={len(lm_records)}, " + f"OLR={len(olr_records)}" + ) + + # Group by (txId, table, op) + lm_groups = defaultdict(list) + olr_groups = defaultdict(list) + for r in lm_records: + key = (r.get('txId', ''), r['table'], r['op']) + lm_groups[key].append(r) + for r in olr_records: + key = (r.get('txId', ''), r['table'], r['op']) + olr_groups[key].append(r) + + all_keys = sorted(set(lm_groups.keys()) | set(olr_groups.keys())) - lm_records = parse_debezium_jsonl(sys.argv[1]) - olr_records = parse_debezium_jsonl(sys.argv[2]) + for key in all_keys: + lm_group = lm_groups.get(key, []) + olr_group = olr_groups.get(key, []) + + if not lm_group and olr_group: + diffs.append( + f"Group {key}: {len(olr_group)} extra OLR records " + f"(no LogMiner match)") + continue + if lm_group and not olr_group: + diffs.append( + f"Group {key}: {len(lm_group)} LogMiner records " + f"missing from OLR") + continue + + group_diffs = match_within_group(key, lm_group, olr_group) + diffs.extend(group_diffs) + + return diffs + + +def main(): + parser = argparse.ArgumentParser( + description='Compare Debezium LogMiner vs OLR adapter outputs.') + parser.add_argument('logminer_jsonl', help='LogMiner JSONL file') + parser.add_argument('olr_jsonl', help='OLR JSONL file') + parser.add_argument('--exclude-tables', default='', + help='Comma-separated list of additional tables to exclude') + args = parser.parse_args() + + extra_excluded = set(t.strip() for t in args.exclude_tables.split(',') if t.strip()) + + lm_records = parse_debezium_jsonl(args.logminer_jsonl, extra_excluded) + olr_records = parse_debezium_jsonl(args.olr_jsonl, extra_excluded) # Merge LogMiner's split LOB events (OLR already emits merged events) lm_merged = merge_lob_events(lm_records) diff --git a/tests/dbz-twin/rac/Dockerfile.consumer b/tests/dbz-twin/rac/Dockerfile.consumer new file mode 100644 index 00000000..9ae81e90 --- /dev/null +++ b/tests/dbz-twin/rac/Dockerfile.consumer @@ -0,0 +1,5 @@ +FROM python:3.12-slim +RUN pip install --no-cache-dir kafka-python-ng==2.2.3 \ + && groupadd -r app && useradd -r -g app app \ + && mkdir -p /app/data && chown app:app /app/data +USER app diff --git a/tests/dbz-twin/rac/FUZZ-TEST.md b/tests/dbz-twin/rac/FUZZ-TEST.md new file mode 100644 index 00000000..8079c153 --- /dev/null +++ b/tests/dbz-twin/rac/FUZZ-TEST.md @@ -0,0 +1,154 @@ +# Fuzz Test — OLR Accuracy Validation + +Validates OLR data accuracy under randomized workloads on Oracle RAC by +comparing OLR's CDC output against LogMiner event-by-event. + +## Quick Start + +```bash +cd tests/dbz-twin/rac + +./fuzz-test.sh up # start infrastructure +./fuzz-test.sh run 60 # run 60-minute workload +./fuzz-test.sh validate # compare results +./fuzz-test.sh down # clean up +``` + +## Architecture + +``` +Oracle RAC (2 nodes) + └─ PL/SQL fuzz workload (random DML, event_id on every row) + ├─ Debezium LogMiner adapter ─→ Kafka topic: lm-events + └─ Debezium OLR adapter ─→ Kafka topic: olr-events + │ + Python Kafka consumer + │ + SQLite (lm_events + olr_events) + │ + Python validator + Compares by event_id +``` + +Both Debezium adapters read from the same Oracle redo logs. LogMiner is the +reference (Oracle's own CDC). OLR is the system under test. If both produce +the same events for the same DML, OLR is accurate. + +## Components + +### Load Generator (`perf/fuzz-workload.sql`) + +PL/SQL package that generates random DML across 7 table types: + +| Table | Tests | +|-------|-------| +| FUZZ_SCALAR | Core types: VARCHAR2, NUMBER, FLOAT, DOUBLE, DATE, TIMESTAMP, RAW | +| FUZZ_WIDE | 40+ columns — multi-block redo records | +| FUZZ_LOB | CLOB + BLOB — LOB redo opcodes, out-of-row storage | +| FUZZ_PART | List-partitioned — data-obj-id resolution | +| FUZZ_NOPK | No primary key — ROWID-based supplemental logging | +| FUZZ_MAXSTR | Two VARCHAR2(4000) — near block-boundary rows | +| FUZZ_INTERVAL | INTERVAL YEAR TO MONTH, DAY TO SECOND | + +Transaction patterns: +- 55% immediate commit +- 15% batched commit (2-5 operations) +- 10% full rollback +- 10% savepoint + partial rollback +- 10% large transaction (10-30 operations) + +Every row has a globally unique `event_id` column (`N{node}_{seq:08d}`). +This is the key for comparison — no ordering assumptions needed. + +### Kafka (single broker, KRaft) + +Single topic per adapter (`lm-events`, `olr-events`). All tables routed to +one topic via `RegexRouter` to preserve commit order within each adapter. + +### Consumer (`kafka-consumer.py`) + +Subscribes to both topics. For each event: +1. Extracts `event_id` from Debezium JSON (`after.EVENT_ID` or `before.EVENT_ID`) +2. Writes to SQLite: `(event_id, seq, table_name, op, raw_json, consumed_at)` +3. Skips `FUZZ_STATS` table and `event_id='SEED'` rows + +The `seq` column handles LogMiner's LOB splitting (same event_id, multiple +CDC events). SQLite uses WAL mode for concurrent reads. + +### Validator (`validator.py`) + +Walks both SQLite tables sorted by event_id using a per-node watermark: + +1. For each RAC node (N1, N2): `frontier = min(max_lm_event_id, max_olr_event_id)` +2. Fetch event_ids within each node's frontier +3. For each event_id: + - In both → compare table, op, column values (with LOB merge) + - In OLR only → extra (phantom transaction) + - In LM only → missing from OLR +4. LOB table mismatches classified as known issues (olr#26) +5. Non-LOB mismatches = FAIL + +Exit 0 = PASS (no non-LOB mismatches), exit 1 = FAIL. + +## Commands + +| Command | Description | +|---------|-------------| +| `./fuzz-test.sh up` | Start Kafka, Debezium, consumer, OLR. Deploy fuzz tables. | +| `./fuzz-test.sh run [min]` | Run fuzz workload for N minutes (default: 30) | +| `./fuzz-test.sh status` | Show container status, consumer counts, OLR memory | +| `./fuzz-test.sh validate` | Wait for consumer drain, run validator, report PASS/FAIL | +| `./fuzz-test.sh logs ` | Show logs: kafka, logminer, olr, consumer, validator, olr-vm | +| `./fuzz-test.sh down` | Stop all containers and remove volumes (including fuzz-data) | + +## Prerequisites + +- RAC VM running with Oracle containers started +- OLR dev image built (`make build`) +- OLR image loaded on RAC VM (`podman load`) +- One-time setup done (`./setup.sh` — creates `c##dbzuser` + grants) +- `CREATE PROCEDURE` grant for `olr_test` user (for PL/SQL package) + +## Known Issues + +- **LOB phantom transactions (olr#26)**: OLR emits entire phantom committed + transactions on FUZZ_LOB that LogMiner does not see. ~0.1% of LOB events. + Classified as `lob_known` in validator output — does not fail the test. + +- **LOB UPDATE variant (olr#10)**: Occasional LOB UPDATE events present in + LogMiner but absent from OLR. Same phantom undo root cause. + +- Non-LOB tables are **100% accurate** in all testing so far. + +## SQLite Schema + +```sql +CREATE TABLE lm_events ( + event_id TEXT NOT NULL, + seq INTEGER NOT NULL, -- 0 normally, >0 for LOB splits + table_name TEXT NOT NULL, + op TEXT NOT NULL, -- INSERT, UPDATE, DELETE + raw_json TEXT NOT NULL, -- full Debezium envelope + consumed_at REAL NOT NULL, + PRIMARY KEY (event_id, seq) +); +-- olr_events: identical schema +``` + +The `fuzz-data` volume is deleted by `./fuzz-test.sh down` (which runs +`docker compose down -v`). To inspect the database **before** tearing down, +query it while containers are still running: + +```bash +docker run --rm -v rac_fuzz-data:/data python:3.12-slim python3 -c " +import sqlite3 +conn = sqlite3.connect('/data/fuzz.db') +# Example: find all phantom events +for r in conn.execute(''' + SELECT o.event_id, o.table_name, o.op + FROM olr_events o LEFT JOIN lm_events l ON o.event_id = l.event_id + WHERE l.event_id IS NULL ORDER BY o.event_id +''').fetchall(): + print(r) +" +``` diff --git a/tests/dbz-twin/rac/config/application-logminer-kafka.properties b/tests/dbz-twin/rac/config/application-logminer-kafka.properties new file mode 100644 index 00000000..732b64fc --- /dev/null +++ b/tests/dbz-twin/rac/config/application-logminer-kafka.properties @@ -0,0 +1,38 @@ +# Debezium Server — LogMiner adapter → Kafka sink (RAC fuzz test) +# All events routed to single topic (preserves commit order, no partition gaps) +quarkus.http.port=8081 +debezium.sink.type=kafka +debezium.sink.kafka.producer.bootstrap.servers=localhost:9092 +debezium.sink.kafka.producer.key.serializer=org.apache.kafka.common.serialization.StringSerializer +debezium.sink.kafka.producer.value.serializer=org.apache.kafka.common.serialization.StringSerializer + +debezium.format.value=json +debezium.format.value.schemas.enable=false +debezium.format.key=json +debezium.format.key.schemas.enable=false + +debezium.source.connector.class=io.debezium.connector.oracle.OracleConnector +debezium.source.database.connection.adapter=logminer +debezium.source.database.hostname=192.168.122.130 +debezium.source.database.port=1521 +debezium.source.database.user=c##dbzuser +debezium.source.database.password=dbz +debezium.source.database.dbname=ORCLCDB +debezium.source.database.pdb.name=ORCLPDB +debezium.source.topic.prefix=logminer +debezium.source.schema.include.list=OLR_TEST +debezium.source.snapshot.mode=no_data +debezium.source.log.mining.strategy=online_catalog +debezium.source.lob.enabled=true + +# Route all tables to a single topic for ordered delivery +debezium.transforms=route +debezium.transforms.route.type=org.apache.kafka.connect.transforms.RegexRouter +debezium.transforms.route.regex=.* +debezium.transforms.route.replacement=lm-events + +debezium.source.offset.storage=org.apache.kafka.connect.storage.FileOffsetBackingStore +debezium.source.offset.storage.file.filename=/debezium/data/offsets.dat +debezium.source.offset.flush.interval.ms=0 +debezium.source.schema.history.internal=io.debezium.storage.file.history.FileSchemaHistory +debezium.source.schema.history.internal.file.filename=/debezium/data/schema-history.dat diff --git a/tests/dbz-twin/rac/config/application-olr-kafka.properties b/tests/dbz-twin/rac/config/application-olr-kafka.properties new file mode 100644 index 00000000..7ad66628 --- /dev/null +++ b/tests/dbz-twin/rac/config/application-olr-kafka.properties @@ -0,0 +1,39 @@ +# Debezium Server — OLR adapter → Kafka sink (RAC fuzz test) +# All events routed to single topic (preserves commit order, no partition gaps) +quarkus.http.port=8082 +debezium.sink.type=kafka +debezium.sink.kafka.producer.bootstrap.servers=localhost:9092 +debezium.sink.kafka.producer.key.serializer=org.apache.kafka.common.serialization.StringSerializer +debezium.sink.kafka.producer.value.serializer=org.apache.kafka.common.serialization.StringSerializer + +debezium.format.value=json +debezium.format.value.schemas.enable=false +debezium.format.key=json +debezium.format.key.schemas.enable=false + +debezium.source.connector.class=io.debezium.connector.oracle.OracleConnector +debezium.source.database.connection.adapter=olr +debezium.source.openlogreplicator.source=ORCLCDB +debezium.source.openlogreplicator.host=192.168.122.130 +debezium.source.openlogreplicator.port=5000 +debezium.source.database.hostname=192.168.122.130 +debezium.source.database.port=1521 +debezium.source.database.user=c##dbzuser +debezium.source.database.password=dbz +debezium.source.database.dbname=ORCLCDB +debezium.source.database.pdb.name=ORCLPDB +debezium.source.topic.prefix=olr +debezium.source.schema.include.list=OLR_TEST +debezium.source.snapshot.mode=no_data + +# Route all tables to a single topic for ordered delivery +debezium.transforms=route +debezium.transforms.route.type=org.apache.kafka.connect.transforms.RegexRouter +debezium.transforms.route.regex=.* +debezium.transforms.route.replacement=olr-events + +debezium.source.offset.storage=org.apache.kafka.connect.storage.FileOffsetBackingStore +debezium.source.offset.storage.file.filename=/debezium/data/offsets.dat +debezium.source.offset.flush.interval.ms=0 +debezium.source.schema.history.internal=io.debezium.storage.file.history.FileSchemaHistory +debezium.source.schema.history.internal.file.filename=/debezium/data/schema-history.dat diff --git a/tests/dbz-twin/rac/config/olr-config.json b/tests/dbz-twin/rac/config/olr-config.json index c664fa2e..d2f1a4dc 100644 --- a/tests/dbz-twin/rac/config/olr-config.json +++ b/tests/dbz-twin/rac/config/olr-config.json @@ -9,7 +9,7 @@ }, "memory": { "min-mb": 64, - "max-mb": 1024 + "max-mb": 2048 }, "source": [ { diff --git a/tests/dbz-twin/rac/docker-compose-fuzz.yaml b/tests/dbz-twin/rac/docker-compose-fuzz.yaml new file mode 100644 index 00000000..345318ae --- /dev/null +++ b/tests/dbz-twin/rac/docker-compose-fuzz.yaml @@ -0,0 +1,91 @@ +services: + kafka: + image: apache/kafka:3.9.0 + container_name: fuzz-kafka + ports: + - "9092:9092" + environment: + KAFKA_NODE_ID: 1 + KAFKA_PROCESS_ROLES: broker,controller + KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://127.0.0.1:9092 + KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_LOG_RETENTION_HOURS: 1 + KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" + healthcheck: + test: ["CMD", "/opt/kafka/bin/kafka-broker-api-versions.sh", "--bootstrap-server", "localhost:9092"] + interval: 10s + timeout: 10s + retries: 12 + start_period: 15s + + dbz-logminer: + image: quay.io/debezium/server:3.5.0.Beta1 + container_name: fuzz-dbz-logminer + network_mode: host + depends_on: + kafka: + condition: service_healthy + volumes: + - ./config/application-logminer-kafka.properties:/debezium/config/application.properties:ro + - ../lib/ojdbc8.jar:/debezium/lib/ojdbc8.jar:ro + - dbz-logminer-data:/debezium/data + + dbz-olr: + image: quay.io/debezium/server:3.5.0.Beta1 + container_name: fuzz-dbz-olr + network_mode: host + restart: unless-stopped + depends_on: + kafka: + condition: service_healthy + volumes: + - ./config/application-olr-kafka.properties:/debezium/config/application.properties:ro + - ../lib/ojdbc8.jar:/debezium/lib/ojdbc8.jar:ro + - ../lib/debezium-connector-oracle-3.5.0.Beta1.jar:/debezium/lib/debezium-connector-oracle-3.5.0.Beta1.jar:ro + - dbz-olr-data:/debezium/data + + consumer: + build: + context: . + dockerfile: Dockerfile.consumer + image: fuzz-consumer:latest + container_name: fuzz-consumer + network_mode: host + depends_on: + kafka: + condition: service_healthy + command: ["python3", "/app/kafka-consumer.py"] + environment: + KAFKA_BOOTSTRAP: localhost:9092 + SQLITE_DB: /app/data/fuzz.db + LM_TOPIC: lm-events + OLR_TOPIC: olr-events + volumes: + - ./kafka-consumer.py:/app/kafka-consumer.py:ro + - fuzz-data:/app/data + + validator: + image: python:3.12-slim + container_name: fuzz-validator + network_mode: host + profiles: ["validate"] + depends_on: + consumer: + condition: service_started + command: ["python3", "/app/validator.py"] + environment: + SQLITE_DB: /app/data/fuzz.db + POLL_INTERVAL: "10" + IDLE_TIMEOUT: "120" + volumes: + - ./validator.py:/app/validator.py:ro + - fuzz-data:/app/data + +volumes: + dbz-logminer-data: + dbz-olr-data: + fuzz-data: diff --git a/tests/dbz-twin/rac/fuzz-test.sh b/tests/dbz-twin/rac/fuzz-test.sh new file mode 100755 index 00000000..73da4aa6 --- /dev/null +++ b/tests/dbz-twin/rac/fuzz-test.sh @@ -0,0 +1,390 @@ +#!/usr/bin/env bash +# fuzz-test.sh — Randomized OLR accuracy test with streaming validation. +# +# Runs a PL/SQL fuzz workload on both RAC nodes, streams CDC events through +# Kafka to a consumer that writes to SQLite, and a validator that continuously +# compares LogMiner vs OLR output by event_id. +# +# Usage: ./fuzz-test.sh [options] +# +# Actions: +# up Start infrastructure (Kafka, Debezium, consumer, validator, OLR) +# run [duration-min] Deploy fuzz workload and run for N minutes (default: 30) +# status Show consumer/validator status and OLR memory +# validate Run validator (wait for idle timeout, report results) +# logs [component] Show logs (kafka, logminer, olr, consumer, validator, olr-vm) +# down Stop and remove all containers + volumes +# help Show this help +# +# Typical workflow: +# ./fuzz-test.sh up # start infrastructure +# ./fuzz-test.sh run 60 # run 60-minute fuzz workload +# ./fuzz-test.sh status # check progress +# ./fuzz-test.sh validate # wait for drain + validate +# ./fuzz-test.sh logs validator # investigate mismatches +# ./fuzz-test.sh down # clean up +# +# Prerequisites: +# - RAC VM running with containers started +# - OLR image loaded on VM (podman load) +# - One-time setup done (./setup.sh) + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +DBZ_TWIN_DIR="$(cd "$SCRIPT_DIR/.." && pwd)" +TESTS_DIR="$(cd "$DBZ_TWIN_DIR/.." && pwd)" +RAC_ENV_DIR="$TESTS_DIR/environments/rac" + +# ---- RAC configuration (auto-detect VM IP) ---- +source "$RAC_ENV_DIR/vm-env.sh" +OLR_IMAGE="${OLR_IMAGE:-docker.io/library/olr-dev:latest}" +RAC_NODE1="${RAC_NODE1:-racnodep1}" +RAC_NODE2="${RAC_NODE2:-racnodep2}" +ORACLE_SID1="${ORACLE_SID1:-ORCLCDB1}" +ORACLE_SID2="${ORACLE_SID2:-ORCLCDB2}" +DB_CONN1="${DB_CONN1:-olr_test/olr_test@//racnodep1:1521/ORCLPDB}" +DB_CONN2="${DB_CONN2:-olr_test/olr_test@//racnodep2:1521/ORCLPDB}" + +OLR_CONTAINER="olr-debezium" +COMPOSE_FILE="$SCRIPT_DIR/docker-compose-fuzz.yaml" + +# ---- SSH helpers ---- +_vm_sqlplus() { + local node="$1" sid="$2" conn="$3" sql_file="$4" + ssh $_SSH_OPTS "${VM_USER}@${VM_HOST}" \ + "podman exec $node su - oracle -c 'export ORACLE_SID=$sid; sqlplus -S \"$conn\" @$sql_file'" +} + +_vm_copy_in() { + local local_path="$1" container_path="$2" node="$3" + local staging="/tmp/_fuzz_staging_$$" + scp $_SSH_OPTS "$local_path" "${VM_USER}@${VM_HOST}:${staging}" + ssh $_SSH_OPTS "${VM_USER}@${VM_HOST}" "podman cp ${staging} ${node}:${container_path}; rm -f ${staging}" +} + +_exec_sysdba() { + local sql_file="$1" + local remote="/tmp/$(basename "$sql_file")" + _vm_copy_in "$sql_file" "$remote" "$RAC_NODE1" + _vm_sqlplus "$RAC_NODE1" "$ORACLE_SID1" "/ as sysdba" "$remote" +} + +_exec_user() { + local sql_file="$1" + local node="${2:-$RAC_NODE1}" sid="${3:-$ORACLE_SID1}" conn="${4:-$DB_CONN1}" + local remote="/tmp/$(basename "$sql_file")" + _vm_copy_in "$sql_file" "$remote" "$node" + _vm_sqlplus "$node" "$sid" "$conn" "$remote" +} + +_olr_memory_mb() { + ssh $_SSH_OPTS "${VM_USER}@${VM_HOST}" \ + "podman exec $OLR_CONTAINER sh -c 'cat /proc/\$(pgrep -f OpenLogReplicator | head -1)/status 2>/dev/null | grep VmRSS | awk \"{printf \\\"%.0f\\\", \\\$2/1024}\"'" 2>/dev/null || echo "N/A" +} + +# ---- Actions ---- + +action_help() { + sed -n '2,/^$/s/^# \?//p' "$0" +} + +action_up() { + echo "=== Starting fuzz test infrastructure ===" + + # Verify RAC + if ! ssh $_SSH_OPTS "${VM_USER}@${VM_HOST}" \ + "podman exec $RAC_NODE1 su - oracle -c 'export ORACLE_SID=$ORACLE_SID1; printf \"SELECT 1 FROM dual;\nEXIT;\n\" | sqlplus -S / as sysdba'" 2>/dev/null | grep -q "1"; then + echo "ERROR: RAC Oracle not reachable on $VM_HOST" >&2 + exit 1 + fi + echo " Oracle RAC: OK" + + # Deploy fuzz workload (creates tables + PL/SQL package) + echo " Deploying fuzz workload..." + _vm_copy_in "$SCRIPT_DIR/perf/fuzz-workload.sql" "/tmp/fuzz-workload.sql" "$RAC_NODE1" + _vm_sqlplus "$RAC_NODE1" "$ORACLE_SID1" "$DB_CONN1" "/tmp/fuzz-workload.sql" + + # Stop existing OLR + ssh $_SSH_OPTS "${VM_USER}@${VM_HOST}" \ + "podman stop -t5 $OLR_CONTAINER 2>/dev/null; podman rm $OLR_CONTAINER 2>/dev/null; true" + + # Start Kafka + consumer + validator + Debezium + docker compose -f "$COMPOSE_FILE" down -v 2>/dev/null + docker compose -f "$COMPOSE_FILE" up -d 2>&1 + echo " Kafka + Debezium + consumer + validator: starting" + + # Wait for Kafka + echo " Waiting for Kafka..." + for i in $(seq 1 30); do + if docker exec fuzz-kafka /opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092 > /dev/null 2>&1; then + echo " Kafka: ready" + break + fi + [[ $i -eq 30 ]] && { echo "ERROR: Kafka did not start" >&2; exit 1; } + sleep 2 + done + + # Deploy OLR config and start OLR + ssh $_SSH_OPTS "${VM_USER}@${VM_HOST}" "mkdir -p /root/olr-debezium/config /root/olr-debezium/checkpoint" + scp $_SSH_OPTS "$SCRIPT_DIR/config/olr-config.json" "${VM_USER}@${VM_HOST}:/root/olr-debezium/config/" + ssh $_SSH_OPTS "${VM_USER}@${VM_HOST}" "rm -rf /root/olr-debezium/checkpoint/* && chown -R 1000:54335 /root/olr-debezium/checkpoint" + + echo " Starting OLR on RAC VM..." + SCAN_IP=$(ssh $_SSH_OPTS "${VM_USER}@${VM_HOST}" \ + "podman exec racnodep1 getent hosts racnodepc1-scan 2>/dev/null | head -1 | awk '{print \$1}'" 2>/dev/null) + if [[ -z "$SCAN_IP" ]]; then + echo "ERROR: Failed to resolve racnodepc1-scan IP from RAC VM" >&2 + exit 1 + fi + ssh $_SSH_OPTS "${VM_USER}@${VM_HOST}" "podman run -d --name $OLR_CONTAINER \ + --user 1000:54335 \ + --network rac_pub1_nw \ + --add-host racnodepc1-scan:${SCAN_IP} \ + -p 5000:5000 \ + -v /root/olr-debezium/config:/config:ro,Z \ + -v /root/olr-debezium/checkpoint:/olr-data/checkpoint:Z \ + -v /shared/redo:/shared/redo:ro \ + $OLR_IMAGE \ + -r -f /config/olr-config.json" > /dev/null + + # Wait for OLR + echo " Waiting for OLR..." + for i in $(seq 1 90); do + if ssh $_SSH_OPTS "${VM_USER}@${VM_HOST}" \ + "podman logs $OLR_CONTAINER 2>&1 | tail -5" 2>/dev/null | grep -q "processing redo log"; then + echo " OLR: ready" + break + fi + [[ $i -eq 90 ]] && { echo "ERROR: OLR did not start" >&2; exit 1; } + sleep 2 + done + + # Wait for Debezium connectors + echo " Waiting for Debezium connectors..." + for i in $(seq 1 60); do + LM_OK=false; OLR_OK=false + docker logs fuzz-dbz-logminer 2>&1 | tail -10 | grep -q "Starting streaming" && LM_OK=true + docker logs fuzz-dbz-olr 2>&1 | tail -10 | grep -q "streaming client started\|Starting streaming" && OLR_OK=true + if $LM_OK && $OLR_OK; then + echo " Debezium: ready" + break + fi + [[ $i -eq 60 ]] && { echo "ERROR: Debezium connectors did not start" >&2; exit 1; } + sleep 2 + done + + echo "" + echo " OLR memory: $(_olr_memory_mb) MB" + echo "" + echo "=== Infrastructure ready. Run: ./fuzz-test.sh run [minutes] ===" +} + +action_run() { + local duration_min="${1:-30}" + local duration_sec=$(( duration_min * 60 )) + + echo "=== Running fuzz workload for ${duration_min} minutes ===" + + local work_dir + work_dir=$(mktemp -d /tmp/fuzz_rac_XXXXXX) + + # Log switch helper + cat > "$work_dir/log_switch.sql" <<'SQL' +SET FEEDBACK OFF +ALTER SYSTEM SWITCH ALL LOGFILE; +BEGIN DBMS_SESSION.SLEEP(2); END; +/ +EXIT +SQL + _exec_sysdba "$work_dir/log_switch.sql" > /dev/null + + # Create runner scripts + cat > "$work_dir/fuzz_node1.sql" < ${duration_sec}, p_seed => 42, p_node_id => 1); +EXIT; +SQL + cat > "$work_dir/fuzz_node2.sql" < ${duration_sec}, p_seed => 137, p_node_id => 2); +EXIT; +SQL + + _vm_copy_in "$work_dir/fuzz_node1.sql" "/tmp/fuzz_node1.sql" "$RAC_NODE1" + _vm_copy_in "$work_dir/fuzz_node2.sql" "/tmp/fuzz_node2.sql" "$RAC_NODE2" + + ssh $_SSH_OPTS "${VM_USER}@${VM_HOST}" \ + "podman exec $RAC_NODE1 su - oracle -c 'export ORACLE_SID=$ORACLE_SID1; sqlplus -S $DB_CONN1 @/tmp/fuzz_node1.sql'" \ + > "$work_dir/fuzz_out1.log" 2>&1 & + local pid1=$! + + ssh $_SSH_OPTS "${VM_USER}@${VM_HOST}" \ + "podman exec $RAC_NODE2 su - oracle -c 'export ORACLE_SID=$ORACLE_SID2; sqlplus -S $DB_CONN2 @/tmp/fuzz_node2.sql'" \ + > "$work_dir/fuzz_out2.log" 2>&1 & + local pid2=$! + + echo " Fuzz running on both nodes (PIDs: $pid1, $pid2)" + echo " Monitor: ./fuzz-test.sh status" + + # Monitor until workload finishes + while kill -0 $pid1 2>/dev/null || kill -0 $pid2 2>/dev/null; do + local mem=$(_olr_memory_mb) + local vstatus + vstatus=$(docker logs --tail 1 fuzz-validator 2>/dev/null || echo "waiting...") + printf "\r [OLR: %s MB] %s " "$mem" "$vstatus" + sleep 15 + done + echo "" + + local rc1=0 rc2=0 + wait $pid1 || rc1=$? + wait $pid2 || rc2=$? + + echo " Node 1: $(grep 'FUZZ_DONE:' "$work_dir/fuzz_out1.log" || echo 'no output')" + echo " Node 2: $(grep 'FUZZ_DONE:' "$work_dir/fuzz_out2.log" || echo 'no output')" + + if [[ $rc1 -ne 0 || $rc2 -ne 0 ]]; then + echo "ERROR: fuzz workload failed on one or more RAC nodes (rc1=$rc1, rc2=$rc2)" >&2 + echo " Check logs: $work_dir/fuzz_out1.log, $work_dir/fuzz_out2.log" >&2 + exit 1 + fi + + # Flush redo + _exec_sysdba "$work_dir/log_switch.sql" > /dev/null + sleep 3 + _exec_sysdba "$work_dir/log_switch.sql" > /dev/null + + rm -rf "$work_dir" + + echo "" + echo "=== Workload complete. Run: ./fuzz-test.sh validate ===" +} + +action_status() { + echo "=== Fuzz Test Status ===" + echo "" + + # Containers + echo "Containers:" + docker ps -a --filter "name=fuzz" --format " {{.Names}}: {{.Status}}" 2>/dev/null || echo " (none)" + echo "" + + # OLR + echo "OLR:" + echo " Memory: $(_olr_memory_mb) MB" + ssh $_SSH_OPTS "${VM_USER}@${VM_HOST}" \ + "podman logs --tail 3 $OLR_CONTAINER" 2>/dev/null | sed 's/^/ /' || echo " (not running)" + echo "" + + # Consumer + echo "Consumer:" + docker logs --tail 3 fuzz-consumer 2>/dev/null | sed 's/^/ /' || echo " (not running)" + echo "" + + # Validator + echo "Validator:" + docker logs --tail 3 fuzz-validator 2>/dev/null | sed 's/^/ /' || echo " (not running)" + echo "" + + # Kafka topics + echo "Kafka topics:" + docker exec fuzz-kafka /opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list 2>/dev/null | sed 's/^/ /' || echo " (kafka not running)" +} + +action_validate() { + echo "=== Running validation ===" + + # Wait for consumer to catch up (no new events for 30s) + echo " Waiting for consumer to drain..." + local prev_count=0 idle_count=0 + while true; do + local cur_count + cur_count=$(docker logs --tail 1 fuzz-consumer 2>/dev/null | grep -oP 'LM=\K[0-9]+' || echo "0") + if [[ "$cur_count" == "$prev_count" ]]; then + idle_count=$(( idle_count + 1 )) + [[ $idle_count -ge 6 ]] && break # 30s idle + else + idle_count=0 + prev_count=$cur_count + fi + sleep 5 + done + echo " Consumer drained (LM events: $cur_count)" + + # Start validator (uses 'validate' profile) + local exit_code=0 + docker compose -f "$COMPOSE_FILE" run --rm validator || exit_code=$? + echo "" + echo " OLR memory: $(_olr_memory_mb) MB" + + # OLR errors + local olr_errors + olr_errors=$(ssh $_SSH_OPTS "${VM_USER}@${VM_HOST}" \ + "podman logs $OLR_CONTAINER 2>&1 | grep -c 'ERROR\|ASAN\|AddressSanitizer'" 2>/dev/null | tr -d '[:space:]' || echo "0") + [[ -z "$olr_errors" ]] && olr_errors=0 + if [[ "$olr_errors" -gt 0 ]]; then + echo " WARNING: $olr_errors OLR errors detected" + ssh $_SSH_OPTS "${VM_USER}@${VM_HOST}" \ + "podman logs $OLR_CONTAINER 2>&1 | grep 'ERROR\|ASAN' | tail -5" 2>/dev/null + fi + + echo "" + if [[ "$exit_code" -eq 0 ]]; then + echo "=== PASS: Fuzz test completed ===" + else + echo "=== FAIL: Fuzz test found mismatches ===" + fi + + return "$exit_code" +} + +action_logs() { + local component="${1:-}" + case "$component" in + kafka) docker logs fuzz-kafka 2>&1 ;; + logminer) docker logs fuzz-dbz-logminer 2>&1 ;; + olr) docker logs fuzz-dbz-olr 2>&1 ;; + consumer) docker logs fuzz-consumer 2>&1 ;; + validator) docker logs fuzz-validator 2>&1 ;; + olr-vm) ssh $_SSH_OPTS "${VM_USER}@${VM_HOST}" "podman logs $OLR_CONTAINER" 2>/dev/null ;; + "") + echo "Usage: $0 logs " + echo "Components: kafka, logminer, olr, consumer, validator, olr-vm" + ;; + *) + echo "Unknown component: $component" >&2 + echo "Components: kafka, logminer, olr, consumer, validator, olr-vm" + exit 1 + ;; + esac +} + +action_down() { + echo "=== Stopping fuzz test infrastructure ===" + docker compose -f "$COMPOSE_FILE" down -v 2>/dev/null + ssh $_SSH_OPTS "${VM_USER}@${VM_HOST}" \ + "podman stop -t5 $OLR_CONTAINER 2>/dev/null; podman rm $OLR_CONTAINER 2>/dev/null; true" + echo " Done." +} + +# ---- Main ---- + +ACTION="${1:-help}" +shift || true + +case "$ACTION" in + up) action_up ;; + run) action_run "$@" ;; + status) action_status ;; + validate) action_validate ;; + logs) action_logs "$@" ;; + down) action_down ;; + help|--help|-h) action_help ;; + *) + echo "Unknown action: $ACTION" >&2 + echo "" + action_help + exit 1 + ;; +esac diff --git a/tests/dbz-twin/rac/kafka-consumer.py b/tests/dbz-twin/rac/kafka-consumer.py new file mode 100644 index 00000000..0ee27d0a --- /dev/null +++ b/tests/dbz-twin/rac/kafka-consumer.py @@ -0,0 +1,258 @@ +#!/usr/bin/env python3 +"""Kafka consumer for fuzz test — reads Debezium CDC events and writes to SQLite. + +Subscribes to both LogMiner and OLR Kafka topics, extracts event_id from each +Debezium envelope event, and stores the raw JSON in SQLite for comparison. + +Environment variables: + KAFKA_BOOTSTRAP — Kafka bootstrap servers (default: localhost:9092) + SQLITE_DB — SQLite database path (default: /app/data/fuzz.db) +""" + +import json +import os +import re +import sqlite3 +import sys +import time + +from kafka import KafkaConsumer + +KAFKA_BOOTSTRAP = os.environ.get('KAFKA_BOOTSTRAP', 'localhost:9092') +SQLITE_DB = os.environ.get('SQLITE_DB', '/app/data/fuzz.db') + +OP_MAP = {'c': 'INSERT', 'u': 'UPDATE', 'd': 'DELETE'} + +# Tables to skip (stats/bookkeeping) +SKIP_TABLES = {'FUZZ_STATS'} + + +def init_db(db_path): + """Create SQLite database and tables.""" + os.makedirs(os.path.dirname(db_path) or '.', exist_ok=True) + conn = sqlite3.connect(db_path) + conn.execute("PRAGMA journal_mode=WAL") + conn.execute("PRAGMA synchronous=NORMAL") + conn.execute(""" + CREATE TABLE IF NOT EXISTS lm_events ( + event_id TEXT NOT NULL, + seq INTEGER NOT NULL, + table_name TEXT NOT NULL, + op TEXT NOT NULL, + raw_json TEXT NOT NULL, + consumed_at REAL NOT NULL, + PRIMARY KEY (event_id, seq) + ) + """) + conn.execute(""" + CREATE TABLE IF NOT EXISTS olr_events ( + event_id TEXT NOT NULL, + seq INTEGER NOT NULL, + table_name TEXT NOT NULL, + op TEXT NOT NULL, + raw_json TEXT NOT NULL, + consumed_at REAL NOT NULL, + PRIMARY KEY (event_id, seq) + ) + """) + conn.commit() + return conn + + +def extract_event_info(event): + """Extract (event_id, table, op) from a Debezium envelope event. + Returns None if the event should be skipped.""" + if not isinstance(event, dict): + return None + + op_code = event.get('op', '') + if op_code not in OP_MAP: + return None + + source = event.get('source', {}) + table = source.get('table', '') + if not table or table in SKIP_TABLES: + return None + + op = OP_MAP[op_code] + + # Extract event_id from after (INSERT/UPDATE) or before (DELETE) + event_id = None + after = event.get('after') + if after and isinstance(after, dict): + event_id = after.get('EVENT_ID') or after.get('event_id') + if event_id is None: + before = event.get('before') + if before and isinstance(before, dict): + event_id = before.get('EVENT_ID') or before.get('event_id') + + if event_id is None or str(event_id) == 'SEED': + return None + + return (str(event_id), table, op) + + +LM_TOPIC = os.environ.get('LM_TOPIC', 'lm-events') +OLR_TOPIC = os.environ.get('OLR_TOPIC', 'olr-events') + + +def determine_adapter(topic): + """Determine adapter (logminer or olr) from Kafka topic name.""" + if topic == LM_TOPIC: + return 'logminer' + elif topic == OLR_TOPIC: + return 'olr' + # Fallback for per-table topics + if topic.startswith('logminer'): + return 'logminer' + elif topic.startswith('olr'): + return 'olr' + return None + + +def main(): + print(f"Kafka consumer starting", flush=True) + print(f" Bootstrap: {KAFKA_BOOTSTRAP}", flush=True) + print(f" SQLite DB: {SQLITE_DB}", flush=True) + + conn = init_db(SQLITE_DB) + + # Wait for Kafka to be available + consumer = None + for attempt in range(30): + try: + consumer = KafkaConsumer( + bootstrap_servers=KAFKA_BOOTSTRAP, + group_id=f'fuzz-consumer-{int(time.time())}', + auto_offset_reset='earliest', + enable_auto_commit=True, + value_deserializer=lambda m: m.decode('utf-8') if m else None, + consumer_timeout_ms=5000, + max_poll_records=500, + ) + break + except Exception as e: + if attempt < 29: + time.sleep(2) + else: + print(f"ERROR: Cannot connect to Kafka: {e}", flush=True) + sys.exit(1) + + # Wait for topics to appear, then subscribe + print(f"Waiting for topics: {LM_TOPIC}, {OLR_TOPIC}...", flush=True) + for attempt in range(60): + topics = consumer.topics() + if LM_TOPIC in topics or OLR_TOPIC in topics: + print(f" Found topics: {[t for t in (LM_TOPIC, OLR_TOPIC) if t in topics]}", flush=True) + break + time.sleep(5) + + consumer.subscribe([LM_TOPIC, OLR_TOPIC]) + # Force metadata refresh + consumer.poll(timeout_ms=1000) + print(f"Subscribed to {LM_TOPIC} and {OLR_TOPIC}", flush=True) + + # Track per-event_id sequence numbers for LOB split handling. + # Periodically trimmed to avoid unbounded growth during long runs. + lm_seq = {} # event_id -> next seq + olr_seq = {} # event_id -> next seq + SEQ_TRIM_THRESHOLD = 10000 # Trim when maps exceed this size + + lm_count = 0 + olr_count = 0 + batch = [] + batch_start = time.time() + last_report = time.time() + + try: + while True: + messages = consumer.poll(timeout_ms=1000, max_records=500) + + for tp, records in messages.items(): + for msg in records: + if msg.value is None: + continue + + try: + event = json.loads(msg.value) + except json.JSONDecodeError: + continue + + adapter = determine_adapter(msg.topic) + if adapter is None: + continue + + info = extract_event_info(event) + if info is None: + continue + + event_id, table, op = info + now = time.time() + + # Determine sequence for LOB split handling + seq_map = lm_seq if adapter == 'logminer' else olr_seq + seq = seq_map.get(event_id, 0) + seq_map[event_id] = seq + 1 + + table_name = 'lm_events' if adapter == 'logminer' else 'olr_events' + batch.append(( + table_name, event_id, seq, table, op, + msg.value, now + )) + + if adapter == 'logminer': + lm_count += 1 + else: + olr_count += 1 + + # Flush batch every 100 records or 1 second + if batch and (len(batch) >= 100 or time.time() - batch_start > 1.0): + for tbl, eid, seq, table, op, raw, ts in batch: + conn.execute( + f"INSERT OR REPLACE INTO {tbl} " + "(event_id, seq, table_name, op, raw_json, consumed_at) " + "VALUES (?, ?, ?, ?, ?, ?)", + (eid, seq, table, op, raw, ts) + ) + conn.commit() + batch = [] + batch_start = time.time() + + # Trim seq maps to bound memory during long runs. + # Only seq > 0 matters (LOB splits); most events have seq=0 and + # can be safely evicted since they won't be seen again. + for seq_map in (lm_seq, olr_seq): + if len(seq_map) > SEQ_TRIM_THRESHOLD: + # Keep only entries with seq > 0 (active LOB splits) + to_delete = [k for k, v in seq_map.items() if v <= 1] + for k in to_delete: + del seq_map[k] + + # Report progress every 30 seconds + now = time.time() + if now - last_report >= 30: + print(f"[consumer] LM={lm_count} OLR={olr_count} " + f"seq_keys_lm={len(lm_seq)} seq_keys_olr={len(olr_seq)}", + flush=True) + last_report = now + + except KeyboardInterrupt: + pass + finally: + # Flush remaining + if batch: + for tbl, eid, seq, table, op, raw, ts in batch: + conn.execute( + f"INSERT OR REPLACE INTO {tbl} " + "(event_id, seq, table_name, op, raw_json, consumed_at) " + "VALUES (?, ?, ?, ?, ?, ?)", + (eid, seq, table, op, raw, ts) + ) + conn.commit() + conn.close() + consumer.close() + print(f"[consumer] Shutdown. Final: LM={lm_count} OLR={olr_count}", flush=True) + + +if __name__ == '__main__': + main() diff --git a/tests/dbz-twin/rac/perf/fuzz-workload.sql b/tests/dbz-twin/rac/perf/fuzz-workload.sql new file mode 100644 index 00000000..ae5770de --- /dev/null +++ b/tests/dbz-twin/rac/perf/fuzz-workload.sql @@ -0,0 +1,852 @@ +-- fuzz-workload.sql — Randomized DML generator for OLR fuzz testing. +-- +-- Creates diverse tables + a PL/SQL package that generates random DML +-- exercising CDC edge cases: LOBs, wide rows, partitions, rollbacks, +-- savepoints, bulk inserts, NULLs, and varied data types. +-- +-- Every table has an EVENT_ID column (VARCHAR2(30)) that uniquely identifies +-- each CDC event. Format: N{node}_{table_prefix}_{seq:06d} +-- This enables streaming comparison without ordering assumptions. +-- +-- Usage: +-- 1. Run this file once to create schema + package: +-- sqlplus olr_test/olr_test@... @fuzz-workload.sql +-- +-- 2. Run the workload (from shell, per node): +-- sqlplus olr_test/olr_test@... <<< "SET SERVEROUTPUT ON SIZE UNLIMITED +-- EXEC FUZZ_WKL.run(p_duration_secs => 300, p_seed => 42, p_node_id => 1); +-- EXIT;" + +SET FEEDBACK OFF +SET SERVEROUTPUT ON SIZE UNLIMITED + +-- ============================================================ +-- Section 1: Create tables (idempotent via DROP IF EXISTS) +-- ============================================================ + +-- T1: Core scalar types +BEGIN EXECUTE IMMEDIATE 'DROP TABLE olr_test.FUZZ_SCALAR PURGE'; EXCEPTION WHEN OTHERS THEN IF SQLCODE != -942 THEN RAISE; END IF; END; +/ +CREATE TABLE olr_test.FUZZ_SCALAR ( + id NUMBER PRIMARY KEY, + event_id VARCHAR2(30) NOT NULL, + col_varchar VARCHAR2(200), + col_char CHAR(20), + col_number NUMBER, + col_int NUMBER(10), + col_decimal NUMBER(20,10), + col_float BINARY_FLOAT, + col_double BINARY_DOUBLE, + col_date DATE, + col_ts TIMESTAMP(6), + col_raw RAW(200), + col_flag NUMBER(1) DEFAULT 0 +); +ALTER TABLE olr_test.FUZZ_SCALAR ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; + +-- T2: Wide row (40+ columns) +BEGIN EXECUTE IMMEDIATE 'DROP TABLE olr_test.FUZZ_WIDE PURGE'; EXCEPTION WHEN OTHERS THEN IF SQLCODE != -942 THEN RAISE; END IF; END; +/ +CREATE TABLE olr_test.FUZZ_WIDE ( + id NUMBER PRIMARY KEY, + event_id VARCHAR2(30) NOT NULL, + c01 VARCHAR2(100), c02 VARCHAR2(100), c03 VARCHAR2(100), c04 VARCHAR2(100), c05 VARCHAR2(100), + c06 VARCHAR2(100), c07 VARCHAR2(100), c08 VARCHAR2(100), c09 VARCHAR2(100), c10 VARCHAR2(100), + c11 VARCHAR2(100), c12 VARCHAR2(100), c13 VARCHAR2(100), c14 VARCHAR2(100), c15 VARCHAR2(100), + n01 NUMBER, n02 NUMBER, n03 NUMBER, n04 NUMBER, n05 NUMBER, + n06 NUMBER, n07 NUMBER, n08 NUMBER, n09 NUMBER, n10 NUMBER, + d01 DATE, d02 DATE, d03 DATE, + t01 TIMESTAMP, t02 TIMESTAMP, t03 TIMESTAMP, + r01 RAW(50), r02 RAW(50), r03 RAW(50) +); +ALTER TABLE olr_test.FUZZ_WIDE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; + +-- T3: LOB table (CLOB + BLOB) +BEGIN EXECUTE IMMEDIATE 'DROP TABLE olr_test.FUZZ_LOB PURGE'; EXCEPTION WHEN OTHERS THEN IF SQLCODE != -942 THEN RAISE; END IF; END; +/ +CREATE TABLE olr_test.FUZZ_LOB ( + id NUMBER PRIMARY KEY, + event_id VARCHAR2(30) NOT NULL, + label VARCHAR2(50), + content CLOB, + bin_data BLOB +); +ALTER TABLE olr_test.FUZZ_LOB ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; + +-- T4: Partitioned table +BEGIN EXECUTE IMMEDIATE 'DROP TABLE olr_test.FUZZ_PART PURGE'; EXCEPTION WHEN OTHERS THEN IF SQLCODE != -942 THEN RAISE; END IF; END; +/ +CREATE TABLE olr_test.FUZZ_PART ( + id NUMBER PRIMARY KEY, + event_id VARCHAR2(30) NOT NULL, + region VARCHAR2(20), + val NUMBER, + payload VARCHAR2(500) +) PARTITION BY LIST (region) ( + PARTITION p_east VALUES ('EAST'), + PARTITION p_west VALUES ('WEST'), + PARTITION p_north VALUES ('NORTH'), + PARTITION p_south VALUES ('SOUTH'), + PARTITION p_other VALUES (DEFAULT) +); +ALTER TABLE olr_test.FUZZ_PART ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; + +-- T5: No primary key (forces ROWID-based supplemental logging) +BEGIN EXECUTE IMMEDIATE 'DROP TABLE olr_test.FUZZ_NOPK PURGE'; EXCEPTION WHEN OTHERS THEN IF SQLCODE != -942 THEN RAISE; END IF; END; +/ +CREATE TABLE olr_test.FUZZ_NOPK ( + event_id VARCHAR2(30) NOT NULL, + name VARCHAR2(100), + value NUMBER, + status VARCHAR2(20), + ts TIMESTAMP DEFAULT SYSTIMESTAMP +); +ALTER TABLE olr_test.FUZZ_NOPK ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; + +-- T6: Max-length strings (near block boundary) +BEGIN EXECUTE IMMEDIATE 'DROP TABLE olr_test.FUZZ_MAXSTR PURGE'; EXCEPTION WHEN OTHERS THEN IF SQLCODE != -942 THEN RAISE; END IF; END; +/ +CREATE TABLE olr_test.FUZZ_MAXSTR ( + id NUMBER PRIMARY KEY, + event_id VARCHAR2(30) NOT NULL, + col_long1 VARCHAR2(4000), + col_long2 VARCHAR2(4000), + col_short VARCHAR2(10) +); +ALTER TABLE olr_test.FUZZ_MAXSTR ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; + +-- T7: Interval types +BEGIN EXECUTE IMMEDIATE 'DROP TABLE olr_test.FUZZ_INTERVAL PURGE'; EXCEPTION WHEN OTHERS THEN IF SQLCODE != -942 THEN RAISE; END IF; END; +/ +CREATE TABLE olr_test.FUZZ_INTERVAL ( + id NUMBER PRIMARY KEY, + event_id VARCHAR2(30) NOT NULL, + col_ym INTERVAL YEAR(4) TO MONTH, + col_ds INTERVAL DAY(4) TO SECOND(6), + col_num NUMBER +); +ALTER TABLE olr_test.FUZZ_INTERVAL ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; + +-- Stats table (autonomous transaction writes, polled by shell) +BEGIN EXECUTE IMMEDIATE 'DROP TABLE olr_test.FUZZ_STATS PURGE'; EXCEPTION WHEN OTHERS THEN IF SQLCODE != -942 THEN RAISE; END IF; END; +/ +CREATE TABLE olr_test.FUZZ_STATS ( + node_id NUMBER PRIMARY KEY, + total_ops NUMBER DEFAULT 0, + insert_cnt NUMBER DEFAULT 0, + update_cnt NUMBER DEFAULT 0, + delete_cnt NUMBER DEFAULT 0, + rollback_cnt NUMBER DEFAULT 0, + lob_cnt NUMBER DEFAULT 0, + last_update TIMESTAMP DEFAULT SYSTIMESTAMP +); + +-- ============================================================ +-- Section 2: Package specification +-- ============================================================ + +CREATE OR REPLACE PACKAGE olr_test.FUZZ_WKL AS + PROCEDURE run( + p_duration_secs IN NUMBER DEFAULT 1800, + p_seed IN NUMBER DEFAULT 1, + p_node_id IN NUMBER DEFAULT 1 + ); +END FUZZ_WKL; +/ + +-- ============================================================ +-- Section 3: Package body +-- ============================================================ + +CREATE OR REPLACE PACKAGE BODY olr_test.FUZZ_WKL AS + + -- Per-session state + g_node_id PLS_INTEGER; + g_next_id PLS_INTEGER; -- node 1: odd (1,3,5...), node 2: even (2,4,6...) + g_event_seq PLS_INTEGER := 0; + g_insert_cnt PLS_INTEGER := 0; + g_update_cnt PLS_INTEGER := 0; + g_delete_cnt PLS_INTEGER := 0; + g_rollback_cnt PLS_INTEGER := 0; + g_lob_cnt PLS_INTEGER := 0; + g_total_ops PLS_INTEGER := 0; + + -- Per-table ID tracking for UPDATE/DELETE targeting. + -- Stores the last inserted ID for each table so UPDATE/DELETE can + -- pick from the correct table's ID range instead of the global stream. + TYPE id_list_t IS TABLE OF PLS_INTEGER INDEX BY PLS_INTEGER; + g_scalar_ids id_list_t; + g_scalar_id_cnt PLS_INTEGER := 0; + g_lob_ids id_list_t; + g_lob_id_cnt PLS_INTEGER := 0; + g_wide_ids id_list_t; + g_wide_id_cnt PLS_INTEGER := 0; + g_part_ids id_list_t; + g_part_id_cnt PLS_INTEGER := 0; + g_maxstr_ids id_list_t; + g_maxstr_id_cnt PLS_INTEGER := 0; + g_interval_ids id_list_t; + g_interval_id_cnt PLS_INTEGER := 0; + + REGIONS CONSTANT SYS.ODCIVARCHAR2LIST := SYS.ODCIVARCHAR2LIST( + 'EAST','WEST','NORTH','SOUTH','OTHER'); + + -- ---- Helpers ---- + + FUNCTION next_id RETURN PLS_INTEGER IS + v_id PLS_INTEGER; + BEGIN + v_id := g_next_id; + g_next_id := g_next_id + 2; -- skip by 2 for node interleaving + RETURN v_id; + END; + + FUNCTION next_event_id RETURN VARCHAR2 IS + BEGIN + g_event_seq := g_event_seq + 1; + RETURN 'N' || g_node_id || '_' || LPAD(g_event_seq, 8, '0'); + END; + + FUNCTION rand_int(p_lo PLS_INTEGER, p_hi PLS_INTEGER) RETURN PLS_INTEGER IS + BEGIN + RETURN TRUNC(DBMS_RANDOM.VALUE(p_lo, p_hi + 1)); + END; + + FUNCTION rand_varchar(p_max_len PLS_INTEGER) RETURN VARCHAR2 IS + BEGIN + RETURN DBMS_RANDOM.STRING('p', rand_int(1, p_max_len)); + END; + + FUNCTION rand_date RETURN DATE IS + BEGIN + RETURN SYSDATE - TRUNC(DBMS_RANDOM.VALUE(0, 9000)); + END; + + FUNCTION rand_raw(p_max_len PLS_INTEGER) RETURN RAW IS + BEGIN + RETURN UTL_RAW.CAST_TO_RAW(DBMS_RANDOM.STRING('x', rand_int(1, LEAST(p_max_len, 100)))); + END; + + FUNCTION rand_region RETURN VARCHAR2 IS + BEGIN + RETURN REGIONS(rand_int(1, 5)); + END; + + -- Track an inserted ID for a table + PROCEDURE track_id(p_ids IN OUT id_list_t, p_cnt IN OUT PLS_INTEGER, p_id PLS_INTEGER) IS + BEGIN + p_cnt := p_cnt + 1; + p_ids(p_cnt) := p_id; + END; + + -- Pick a random tracked ID for UPDATE/DELETE. Returns -1 if no IDs tracked. + FUNCTION pick_tracked_id(p_ids IN id_list_t, p_cnt PLS_INTEGER) RETURN PLS_INTEGER IS + BEGIN + IF p_cnt = 0 THEN RETURN -1; END IF; + RETURN p_ids(rand_int(1, p_cnt)); + END; + + PROCEDURE update_stats IS + PRAGMA AUTONOMOUS_TRANSACTION; + BEGIN + MERGE INTO olr_test.FUZZ_STATS s USING (SELECT g_node_id AS nid FROM dual) d + ON (s.node_id = d.nid) + WHEN MATCHED THEN UPDATE SET + total_ops = g_total_ops, insert_cnt = g_insert_cnt, + update_cnt = g_update_cnt, delete_cnt = g_delete_cnt, + rollback_cnt = g_rollback_cnt, lob_cnt = g_lob_cnt, + last_update = SYSTIMESTAMP + WHEN NOT MATCHED THEN INSERT (node_id, total_ops, insert_cnt, update_cnt, + delete_cnt, rollback_cnt, lob_cnt, last_update) + VALUES (g_node_id, g_total_ops, g_insert_cnt, g_update_cnt, + g_delete_cnt, g_rollback_cnt, g_lob_cnt, SYSTIMESTAMP); + COMMIT; + END; + + -- ---- DML operations ---- + -- Note: PL/SQL package-private functions cannot be called directly in SQL + -- statements, so all random values are computed into local variables first. + + PROCEDURE do_insert_scalar(p_count PLS_INTEGER) IS + v_id PLS_INTEGER; v_eid VARCHAR2(30); + v_vc VARCHAR2(200); v_ch CHAR(20); v_num NUMBER; + v_int NUMBER(10); v_dec NUMBER(20,10); v_fl BINARY_FLOAT; + v_dbl BINARY_DOUBLE; v_dt DATE; v_ts TIMESTAMP(6); + v_rw RAW(200); v_flag NUMBER(1); + BEGIN + FOR i IN 1..p_count LOOP + v_id := next_id; v_eid := next_event_id; + v_vc := rand_varchar(200); + v_ch := RPAD(DBMS_RANDOM.STRING('a', rand_int(1,10)), 20); + v_num := ROUND(DBMS_RANDOM.VALUE(-1e15, 1e15), rand_int(0,10)); + v_int := TRUNC(DBMS_RANDOM.VALUE(-2147483648, 2147483647)); + v_dec := ROUND(DBMS_RANDOM.VALUE(-99999, 99999), 10); + v_fl := CAST(DBMS_RANDOM.VALUE(-1e10, 1e10) AS BINARY_FLOAT); + v_dbl := CAST(DBMS_RANDOM.VALUE(-1e100, 1e100) AS BINARY_DOUBLE); + v_dt := rand_date; v_ts := SYSTIMESTAMP - DBMS_RANDOM.VALUE(0, 1000); + v_rw := rand_raw(100); v_flag := rand_int(0,1); + INSERT INTO olr_test.FUZZ_SCALAR (id, event_id, col_varchar, col_char, col_number, + col_int, col_decimal, col_float, col_double, col_date, col_ts, col_raw, col_flag) + VALUES (v_id, v_eid, v_vc, v_ch, v_num, v_int, v_dec, v_fl, v_dbl, v_dt, v_ts, v_rw, v_flag); + track_id(g_scalar_ids, g_scalar_id_cnt, v_id); + g_insert_cnt := g_insert_cnt + 1; + g_total_ops := g_total_ops + 1; + END LOOP; + END; + + PROCEDURE do_insert_wide(p_count PLS_INTEGER) IS + v_id PLS_INTEGER; v_eid VARCHAR2(30); + v_c01 VARCHAR2(100); v_c02 VARCHAR2(100); v_c03 VARCHAR2(100); + v_c04 VARCHAR2(100); v_c05 VARCHAR2(100); v_c06 VARCHAR2(100); + v_c07 VARCHAR2(100); v_c08 VARCHAR2(100); v_c09 VARCHAR2(100); + v_c10 VARCHAR2(100); v_c11 VARCHAR2(100); v_c12 VARCHAR2(100); + v_c13 VARCHAR2(100); v_c14 VARCHAR2(100); v_c15 VARCHAR2(100); + v_d1 DATE; v_d2 DATE; v_d3 DATE; + v_r1 RAW(50); v_r2 RAW(50); v_r3 RAW(50); + BEGIN + FOR i IN 1..p_count LOOP + v_id := next_id; v_eid := next_event_id; + v_c01:=rand_varchar(100); v_c02:=rand_varchar(100); v_c03:=rand_varchar(100); + v_c04:=rand_varchar(100); v_c05:=rand_varchar(100); v_c06:=rand_varchar(100); + v_c07:=rand_varchar(100); v_c08:=rand_varchar(100); v_c09:=rand_varchar(100); + v_c10:=rand_varchar(100); v_c11:=rand_varchar(100); v_c12:=rand_varchar(100); + v_c13:=rand_varchar(100); v_c14:=rand_varchar(100); v_c15:=rand_varchar(100); + v_d1:=rand_date; v_d2:=rand_date; v_d3:=rand_date; + v_r1:=rand_raw(50); v_r2:=rand_raw(50); v_r3:=rand_raw(50); + INSERT INTO olr_test.FUZZ_WIDE (id, event_id, + c01,c02,c03,c04,c05,c06,c07,c08,c09,c10,c11,c12,c13,c14,c15, + n01,n02,n03,n04,n05,n06,n07,n08,n09,n10, + d01,d02,d03, t01,t02,t03, r01,r02,r03) + VALUES (v_id, v_eid, + v_c01,v_c02,v_c03,v_c04,v_c05,v_c06,v_c07,v_c08,v_c09,v_c10, + v_c11,v_c12,v_c13,v_c14,v_c15, + DBMS_RANDOM.VALUE(-1e12,1e12),DBMS_RANDOM.VALUE(-1e12,1e12), + DBMS_RANDOM.VALUE(-1e12,1e12),DBMS_RANDOM.VALUE(-1e12,1e12), + DBMS_RANDOM.VALUE(-1e12,1e12),DBMS_RANDOM.VALUE(-1e12,1e12), + DBMS_RANDOM.VALUE(-1e12,1e12),DBMS_RANDOM.VALUE(-1e12,1e12), + DBMS_RANDOM.VALUE(-1e12,1e12),DBMS_RANDOM.VALUE(-1e12,1e12), + v_d1,v_d2,v_d3, + SYSTIMESTAMP-DBMS_RANDOM.VALUE(0,500), + SYSTIMESTAMP-DBMS_RANDOM.VALUE(0,500), + SYSTIMESTAMP-DBMS_RANDOM.VALUE(0,500), + v_r1,v_r2,v_r3); + track_id(g_wide_ids, g_wide_id_cnt, v_id); + g_insert_cnt := g_insert_cnt + 1; + g_total_ops := g_total_ops + 1; + END LOOP; + END; + + PROCEDURE do_insert_lob(p_count PLS_INTEGER) IS + v_id PLS_INTEGER; v_eid VARCHAR2(30); + v_clob_size PLS_INTEGER; v_blob_size PLS_INTEGER; + v_label VARCHAR2(50); v_clob CLOB; v_blob BLOB; + BEGIN + FOR i IN 1..p_count LOOP + v_id := next_id; v_eid := next_event_id; + v_clob_size := rand_int(50, 16000); + v_blob_size := rand_int(50, 8000); + v_label := 'lob_n' || g_node_id || '_' || g_total_ops; + v_clob := RPAD(DBMS_RANDOM.STRING('x', 50), v_clob_size, 'X'); + v_blob := UTL_RAW.COPIES(UTL_RAW.CAST_TO_RAW(DBMS_RANDOM.STRING('x', 10)), + LEAST(CEIL(v_blob_size / 10), 800)); + INSERT INTO olr_test.FUZZ_LOB (id, event_id, label, content, bin_data) + VALUES (v_id, v_eid, v_label, v_clob, v_blob); + track_id(g_lob_ids, g_lob_id_cnt, v_id); + g_insert_cnt := g_insert_cnt + 1; + g_lob_cnt := g_lob_cnt + 1; + g_total_ops := g_total_ops + 1; + END LOOP; + END; + + PROCEDURE do_insert_part(p_count PLS_INTEGER) IS + v_id PLS_INTEGER; v_eid VARCHAR2(30); + v_region VARCHAR2(20); v_payload VARCHAR2(500); + BEGIN + FOR i IN 1..p_count LOOP + v_id := next_id; v_eid := next_event_id; + v_region := rand_region; v_payload := rand_varchar(500); + INSERT INTO olr_test.FUZZ_PART (id, event_id, region, val, payload) + VALUES (v_id, v_eid, v_region, ROUND(DBMS_RANDOM.VALUE(-99999, 99999), 2), v_payload); + track_id(g_part_ids, g_part_id_cnt, v_id); + g_insert_cnt := g_insert_cnt + 1; + g_total_ops := g_total_ops + 1; + END LOOP; + END; + + PROCEDURE do_insert_nopk(p_count PLS_INTEGER) IS + v_eid VARCHAR2(30); v_name VARCHAR2(100); v_status VARCHAR2(20); + BEGIN + FOR i IN 1..p_count LOOP + v_eid := next_event_id; + v_name := rand_varchar(100); + v_status := CASE rand_int(1,4) WHEN 1 THEN 'ACTIVE' WHEN 2 THEN 'INACTIVE' + WHEN 3 THEN 'PENDING' ELSE NULL END; + INSERT INTO olr_test.FUZZ_NOPK (event_id, name, value, status) + VALUES (v_eid, v_name, ROUND(DBMS_RANDOM.VALUE(0, 100000), 2), v_status); + g_insert_cnt := g_insert_cnt + 1; + g_total_ops := g_total_ops + 1; + END LOOP; + END; + + PROCEDURE do_insert_maxstr(p_count PLS_INTEGER) IS + v_id PLS_INTEGER; v_eid VARCHAR2(30); + v_l1 VARCHAR2(4000); v_l2 VARCHAR2(4000); + v_sh VARCHAR2(10); v_len1 PLS_INTEGER; v_len2 PLS_INTEGER; + BEGIN + FOR i IN 1..p_count LOOP + v_id := next_id; v_eid := next_event_id; + v_len1 := rand_int(100, 4000); v_len2 := rand_int(100, 4000); + v_l1 := RPAD('A', v_len1, DBMS_RANDOM.STRING('x', 1)); + v_l2 := RPAD('B', v_len2, DBMS_RANDOM.STRING('x', 1)); + v_sh := DBMS_RANDOM.STRING('x', rand_int(1,10)); + INSERT INTO olr_test.FUZZ_MAXSTR (id, event_id, col_long1, col_long2, col_short) + VALUES (v_id, v_eid, v_l1, v_l2, v_sh); + track_id(g_maxstr_ids, g_maxstr_id_cnt, v_id); + g_insert_cnt := g_insert_cnt + 1; + g_total_ops := g_total_ops + 1; + END LOOP; + END; + + PROCEDURE do_insert_interval(p_count PLS_INTEGER) IS + v_id PLS_INTEGER; v_eid VARCHAR2(30); v_ym PLS_INTEGER; + BEGIN + FOR i IN 1..p_count LOOP + v_id := next_id; v_eid := next_event_id; + v_ym := rand_int(-100, 100); + INSERT INTO olr_test.FUZZ_INTERVAL (id, event_id, col_ym, col_ds, col_num) + VALUES (v_id, v_eid, + NUMTOYMINTERVAL(v_ym, 'MONTH'), + NUMTODSINTERVAL(DBMS_RANDOM.VALUE(-86400*100, 86400*100), 'SECOND'), + ROUND(DBMS_RANDOM.VALUE(-1e8, 1e8), 4)); + track_id(g_interval_ids, g_interval_id_cnt, v_id); + g_insert_cnt := g_insert_cnt + 1; + g_total_ops := g_total_ops + 1; + END LOOP; + END; + + -- Update random rows in FUZZ_SCALAR (uses tracked IDs for this table) + PROCEDURE do_update_scalar(p_count PLS_INTEGER) IS + v_target PLS_INTEGER; v_eid VARCHAR2(30); + v_vc VARCHAR2(200); v_dt DATE; + BEGIN + IF g_scalar_id_cnt = 0 THEN RETURN; END IF; + FOR i IN 1..p_count LOOP + v_target := pick_tracked_id(g_scalar_ids, g_scalar_id_cnt); + v_eid := next_event_id; + v_vc := rand_varchar(200); v_dt := rand_date; + UPDATE olr_test.FUZZ_SCALAR + SET event_id = v_eid, + col_varchar = v_vc, + col_number = ROUND(DBMS_RANDOM.VALUE(-1e12, 1e12), TRUNC(DBMS_RANDOM.VALUE(0,9))), + col_date = v_dt, + col_flag = 1 - col_flag + WHERE id = v_target; + g_update_cnt := g_update_cnt + 1; + g_total_ops := g_total_ops + 1; + END LOOP; + END; + + -- Delete random rows from FUZZ_SCALAR + -- The before-image carries the existing event_id from the last INSERT/UPDATE. + -- The consumer extracts event_id from 'before' for DELETE ops. + PROCEDURE do_delete_scalar(p_count PLS_INTEGER) IS + v_target PLS_INTEGER; + BEGIN + IF g_scalar_id_cnt = 0 THEN RETURN; END IF; + FOR i IN 1..p_count LOOP + v_target := pick_tracked_id(g_scalar_ids, g_scalar_id_cnt); + DELETE FROM olr_test.FUZZ_SCALAR WHERE id = v_target; + g_delete_cnt := g_delete_cnt + 1; + g_total_ops := g_total_ops + 1; + END LOOP; + END; + + -- Update LOB content + PROCEDURE do_update_lob(p_count PLS_INTEGER) IS + v_target PLS_INTEGER; v_eid VARCHAR2(30); + v_content CLOB; v_label VARCHAR2(50); + BEGIN + IF g_lob_id_cnt = 0 THEN RETURN; END IF; + FOR i IN 1..p_count LOOP + v_target := pick_tracked_id(g_lob_ids, g_lob_id_cnt); + v_eid := next_event_id; + v_content := RPAD('UPD_', rand_int(100, 8000), 'Y'); + v_label := 'upd_n' || g_node_id || '_' || g_total_ops; + UPDATE olr_test.FUZZ_LOB + SET event_id = v_eid, content = v_content, label = v_label + WHERE id = v_target; + g_update_cnt := g_update_cnt + 1; + g_lob_cnt := g_lob_cnt + 1; + g_total_ops := g_total_ops + 1; + END LOOP; + END; + + -- Bulk insert via FORALL + PROCEDURE do_bulk_insert_scalar(p_count PLS_INTEGER) IS + TYPE id_tab IS TABLE OF NUMBER INDEX BY PLS_INTEGER; + TYPE str_tab IS TABLE OF VARCHAR2(200) INDEX BY PLS_INTEGER; + TYPE eid_tab IS TABLE OF VARCHAR2(30) INDEX BY PLS_INTEGER; + TYPE num_tab IS TABLE OF NUMBER INDEX BY PLS_INTEGER; + v_ids id_tab; + v_eids eid_tab; + v_vals str_tab; + v_nums num_tab; + BEGIN + FOR i IN 1..p_count LOOP + v_ids(i) := next_id; + v_eids(i) := next_event_id; + v_vals(i) := rand_varchar(100); + v_nums(i) := ROUND(DBMS_RANDOM.VALUE(-99999, 99999), 2); + END LOOP; + FORALL i IN 1..p_count + INSERT INTO olr_test.FUZZ_SCALAR (id, event_id, col_varchar, col_number, col_flag) + VALUES (v_ids(i), v_eids(i), v_vals(i), v_nums(i), 0); + FOR i IN 1..p_count LOOP + track_id(g_scalar_ids, g_scalar_id_cnt, v_ids(i)); + END LOOP; + g_insert_cnt := g_insert_cnt + p_count; + g_total_ops := g_total_ops + p_count; + END; + + -- Insert with many NULLs (tests NULL/absent column redo format) + PROCEDURE do_insert_nulls(p_count PLS_INTEGER) IS + v_id PLS_INTEGER; v_eid VARCHAR2(30); + v_vc VARCHAR2(50); v_ch CHAR(20); v_num NUMBER; + v_int PLS_INTEGER; v_dec NUMBER; v_fl BINARY_FLOAT; v_dbl BINARY_DOUBLE; + v_dt DATE; v_ts TIMESTAMP; v_rw RAW(50); v_flag NUMBER(1); + BEGIN + FOR i IN 1..p_count LOOP + v_id := next_id; v_eid := next_event_id; + v_vc := CASE WHEN DBMS_RANDOM.VALUE < 0.5 THEN NULL ELSE rand_varchar(50) END; + v_ch := CASE WHEN DBMS_RANDOM.VALUE < 0.5 THEN NULL ELSE RPAD('x', 20) END; + v_num := CASE WHEN DBMS_RANDOM.VALUE < 0.5 THEN NULL ELSE DBMS_RANDOM.VALUE(-100,100) END; + v_int := CASE WHEN DBMS_RANDOM.VALUE < 0.5 THEN NULL ELSE rand_int(-1000,1000) END; + v_dec := CASE WHEN DBMS_RANDOM.VALUE < 0.5 THEN NULL ELSE ROUND(DBMS_RANDOM.VALUE(-99,99),10) END; + v_fl := CASE WHEN DBMS_RANDOM.VALUE < 0.5 THEN NULL ELSE CAST(1.5 AS BINARY_FLOAT) END; + v_dbl := CASE WHEN DBMS_RANDOM.VALUE < 0.5 THEN NULL ELSE CAST(2.5 AS BINARY_DOUBLE) END; + v_dt := CASE WHEN DBMS_RANDOM.VALUE < 0.5 THEN NULL ELSE rand_date END; + v_ts := CASE WHEN DBMS_RANDOM.VALUE < 0.5 THEN NULL ELSE SYSTIMESTAMP END; + v_rw := CASE WHEN DBMS_RANDOM.VALUE < 0.5 THEN NULL ELSE rand_raw(50) END; + v_flag := rand_int(0,1); + INSERT INTO olr_test.FUZZ_SCALAR (id, event_id, col_varchar, col_char, col_number, + col_int, col_decimal, col_float, col_double, col_date, col_ts, col_raw, col_flag) + VALUES (v_id, v_eid, v_vc, v_ch, v_num, v_int, v_dec, v_fl, v_dbl, v_dt, v_ts, v_rw, v_flag); + track_id(g_scalar_ids, g_scalar_id_cnt, v_id); + g_insert_cnt := g_insert_cnt + 1; + g_total_ops := g_total_ops + 1; + END LOOP; + END; + + -- ---- UPDATE/DELETE for edge-case tables ---- + + PROCEDURE do_update_wide(p_count PLS_INTEGER) IS + v_target PLS_INTEGER; v_eid VARCHAR2(30); + BEGIN + IF g_wide_id_cnt = 0 THEN RETURN; END IF; + FOR i IN 1..p_count LOOP + v_target := pick_tracked_id(g_wide_ids, g_wide_id_cnt); + v_eid := next_event_id; + UPDATE olr_test.FUZZ_WIDE + SET event_id = v_eid, + c01 = rand_varchar(100), c02 = rand_varchar(100), + n01 = DBMS_RANDOM.VALUE(-1e12, 1e12), + d01 = rand_date + WHERE id = v_target; + g_update_cnt := g_update_cnt + 1; + g_total_ops := g_total_ops + 1; + END LOOP; + END; + + PROCEDURE do_delete_wide(p_count PLS_INTEGER) IS + v_target PLS_INTEGER; + BEGIN + IF g_wide_id_cnt = 0 THEN RETURN; END IF; + FOR i IN 1..p_count LOOP + v_target := pick_tracked_id(g_wide_ids, g_wide_id_cnt); + DELETE FROM olr_test.FUZZ_WIDE WHERE id = v_target; + g_delete_cnt := g_delete_cnt + 1; + g_total_ops := g_total_ops + 1; + END LOOP; + END; + + PROCEDURE do_update_part(p_count PLS_INTEGER) IS + v_target PLS_INTEGER; v_eid VARCHAR2(30); + BEGIN + IF g_part_id_cnt = 0 THEN RETURN; END IF; + FOR i IN 1..p_count LOOP + v_target := pick_tracked_id(g_part_ids, g_part_id_cnt); + v_eid := next_event_id; + UPDATE olr_test.FUZZ_PART + SET event_id = v_eid, + val = ROUND(DBMS_RANDOM.VALUE(-99999, 99999), 2), + payload = rand_varchar(500) + WHERE id = v_target; + g_update_cnt := g_update_cnt + 1; + g_total_ops := g_total_ops + 1; + END LOOP; + END; + + PROCEDURE do_delete_part(p_count PLS_INTEGER) IS + v_target PLS_INTEGER; + BEGIN + IF g_part_id_cnt = 0 THEN RETURN; END IF; + FOR i IN 1..p_count LOOP + v_target := pick_tracked_id(g_part_ids, g_part_id_cnt); + DELETE FROM olr_test.FUZZ_PART WHERE id = v_target; + g_delete_cnt := g_delete_cnt + 1; + g_total_ops := g_total_ops + 1; + END LOOP; + END; + + PROCEDURE do_update_maxstr(p_count PLS_INTEGER) IS + v_target PLS_INTEGER; v_eid VARCHAR2(30); + v_l1 VARCHAR2(4000); v_l2 VARCHAR2(4000); + BEGIN + IF g_maxstr_id_cnt = 0 THEN RETURN; END IF; + FOR i IN 1..p_count LOOP + v_target := pick_tracked_id(g_maxstr_ids, g_maxstr_id_cnt); + v_eid := next_event_id; + v_l1 := RPAD('U', rand_int(100, 4000), DBMS_RANDOM.STRING('x', 1)); + v_l2 := RPAD('U', rand_int(100, 4000), DBMS_RANDOM.STRING('x', 1)); + UPDATE olr_test.FUZZ_MAXSTR + SET event_id = v_eid, col_long1 = v_l1, col_long2 = v_l2 + WHERE id = v_target; + g_update_cnt := g_update_cnt + 1; + g_total_ops := g_total_ops + 1; + END LOOP; + END; + + PROCEDURE do_update_interval(p_count PLS_INTEGER) IS + v_target PLS_INTEGER; v_eid VARCHAR2(30); + BEGIN + IF g_interval_id_cnt = 0 THEN RETURN; END IF; + FOR i IN 1..p_count LOOP + v_target := pick_tracked_id(g_interval_ids, g_interval_id_cnt); + v_eid := next_event_id; + UPDATE olr_test.FUZZ_INTERVAL + SET event_id = v_eid, + col_ym = NUMTOYMINTERVAL(rand_int(-100, 100), 'MONTH'), + col_num = ROUND(DBMS_RANDOM.VALUE(-1e8, 1e8), 4) + WHERE id = v_target; + g_update_cnt := g_update_cnt + 1; + g_total_ops := g_total_ops + 1; + END LOOP; + END; + + -- ---- Dispatch: pick table + operation ---- + + PROCEDURE do_random_op IS + v_table_dice PLS_INTEGER := rand_int(1, 100); + v_op_dice PLS_INTEGER := rand_int(1, 100); + v_count PLS_INTEGER; + BEGIN + -- Pick table (weighted) + -- 30% scalar, 10% wide, 15% lob, 10% partitioned, 10% nopk, + -- 10% maxstr, 5% interval, 10% null-heavy + IF v_table_dice <= 30 THEN + -- FUZZ_SCALAR + v_count := rand_int(1, 20); + IF v_op_dice <= 50 THEN + do_insert_scalar(v_count); + ELSIF v_op_dice <= 75 THEN + do_update_scalar(v_count); + ELSIF v_op_dice <= 90 THEN + do_delete_scalar(v_count); + ELSE + do_bulk_insert_scalar(rand_int(20, 50)); + END IF; + ELSIF v_table_dice <= 40 THEN + -- FUZZ_WIDE + v_count := rand_int(1, 10); + IF v_op_dice <= 60 THEN + do_insert_wide(v_count); + ELSIF v_op_dice <= 85 THEN + do_update_wide(v_count); + ELSE + do_delete_wide(v_count); + END IF; + ELSIF v_table_dice <= 55 THEN + -- LOB + v_count := rand_int(1, 5); + IF v_op_dice <= 60 THEN + do_insert_lob(v_count); + ELSE + do_update_lob(v_count); + END IF; + ELSIF v_table_dice <= 65 THEN + -- FUZZ_PART + v_count := rand_int(1, 20); + IF v_op_dice <= 60 THEN + do_insert_part(v_count); + ELSIF v_op_dice <= 85 THEN + do_update_part(v_count); + ELSE + do_delete_part(v_count); + END IF; + ELSIF v_table_dice <= 75 THEN + do_insert_nopk(rand_int(1, 15)); + ELSIF v_table_dice <= 85 THEN + -- FUZZ_MAXSTR + v_count := rand_int(1, 5); + IF v_op_dice <= 65 THEN + do_insert_maxstr(v_count); + ELSE + do_update_maxstr(v_count); + END IF; + ELSIF v_table_dice <= 90 THEN + -- FUZZ_INTERVAL + v_count := rand_int(1, 10); + IF v_op_dice <= 65 THEN + do_insert_interval(v_count); + ELSE + do_update_interval(v_count); + END IF; + ELSE + do_insert_nulls(rand_int(1, 15)); + END IF; + END; + + -- ---- Main entry point ---- + + PROCEDURE run( + p_duration_secs IN NUMBER DEFAULT 1800, + p_seed IN NUMBER DEFAULT 1, + p_node_id IN NUMBER DEFAULT 1 + ) IS + v_start TIMESTAMP := SYSTIMESTAMP; + v_deadline TIMESTAMP := SYSTIMESTAMP + NUMTODSINTERVAL(p_duration_secs, 'SECOND'); + v_txn_dice PLS_INTEGER; + v_batch PLS_INTEGER; + v_seed_id PLS_INTEGER; + v_seed_region VARCHAR2(20); + BEGIN + -- Initialize + g_node_id := p_node_id; + g_next_id := p_node_id; -- 1 for node 1 (odd), 2 for node 2 (even) + g_event_seq := 0; + g_insert_cnt := 0; g_update_cnt := 0; g_delete_cnt := 0; + g_rollback_cnt := 0; g_lob_cnt := 0; g_total_ops := 0; + + DBMS_RANDOM.SEED(p_seed); + + -- Seed initial data (need rows before we can UPDATE/DELETE). + -- event_id='SEED' so the consumer skips these (they may arrive + -- before LogMiner starts streaming). + v_seed_id := 0; + g_scalar_id_cnt := 0; g_lob_id_cnt := 0; g_wide_id_cnt := 0; + g_part_id_cnt := 0; g_maxstr_id_cnt := 0; g_interval_id_cnt := 0; + FOR i IN 1..50 LOOP + v_seed_id := next_id; + INSERT INTO olr_test.FUZZ_SCALAR (id, event_id, col_varchar, col_flag) + VALUES (v_seed_id, 'SEED', DBMS_RANDOM.STRING('x', 20), 0); + track_id(g_scalar_ids, g_scalar_id_cnt, v_seed_id); + END LOOP; + FOR i IN 1..5 LOOP + v_seed_id := next_id; + INSERT INTO olr_test.FUZZ_LOB (id, event_id, label, content) + VALUES (v_seed_id, 'SEED', 'seed', 'seed'); + track_id(g_lob_ids, g_lob_id_cnt, v_seed_id); + END LOOP; + FOR i IN 1..20 LOOP + v_seed_id := next_id; + v_seed_region := REGIONS(rand_int(1, 5)); + INSERT INTO olr_test.FUZZ_PART (id, event_id, region, val, payload) + VALUES (v_seed_id, 'SEED', v_seed_region, 0, 'seed'); + track_id(g_part_ids, g_part_id_cnt, v_seed_id); + END LOOP; + FOR i IN 1..10 LOOP + INSERT INTO olr_test.FUZZ_NOPK (event_id, name, value, status) + VALUES ('SEED', 'seed', 0, 'ACTIVE'); + END LOOP; + COMMIT; + -- Reset counters so tracked events start fresh + g_event_seq := 0; + g_insert_cnt := 0; g_total_ops := 0; + + -- Main loop + LOOP + EXIT WHEN SYSTIMESTAMP > v_deadline; + + -- Pick transaction pattern (weighted) + v_txn_dice := rand_int(1, 100); + + IF v_txn_dice <= 55 THEN + -- 55%: Immediate commit + do_random_op; + COMMIT; + + ELSIF v_txn_dice <= 70 THEN + -- 15%: Batched commit (2-5 ops in one txn) + v_batch := rand_int(2, 5); + FOR j IN 1..v_batch LOOP + do_random_op; + END LOOP; + COMMIT; + + ELSIF v_txn_dice <= 80 THEN + -- 10%: Full rollback + do_random_op; + ROLLBACK; + g_rollback_cnt := g_rollback_cnt + 1; + + ELSIF v_txn_dice <= 90 THEN + -- 10%: Savepoint + partial rollback + do_random_op; + SAVEPOINT sp_fuzz; + do_random_op; + ROLLBACK TO sp_fuzz; + g_rollback_cnt := g_rollback_cnt + 1; + do_random_op; + COMMIT; + + ELSE + -- 10%: Large transaction (10-30 ops before commit) + v_batch := rand_int(10, 30); + FOR j IN 1..v_batch LOOP + do_random_op; + END LOOP; + COMMIT; + END IF; + + -- Throttle: ~0.5s pause per transaction to avoid overwhelming OLR + DBMS_SESSION.SLEEP(0.5); + + -- Update stats periodically + IF MOD(g_total_ops, 100) = 0 THEN + update_stats; + END IF; + END LOOP; + + -- Final commit + stats + COMMIT; + update_stats; + + DBMS_OUTPUT.PUT_LINE('FUZZ_DONE: node=' || g_node_id || + ' inserts=' || g_insert_cnt || + ' updates=' || g_update_cnt || + ' deletes=' || g_delete_cnt || + ' rollbacks=' || g_rollback_cnt || + ' lobs=' || g_lob_cnt || + ' total=' || g_total_ops || + ' last_event_id=N' || g_node_id || '_' || LPAD(g_event_seq, 8, '0') || + ' elapsed_s=' || ROUND(EXTRACT(SECOND FROM (SYSTIMESTAMP - v_start)) + + EXTRACT(MINUTE FROM (SYSTIMESTAMP - v_start)) * 60 + + EXTRACT(HOUR FROM (SYSTIMESTAMP - v_start)) * 3600)); + END; + +END FUZZ_WKL; +/ + +-- ============================================================ +-- Section 4: Capture SCN +-- ============================================================ + +DECLARE + v_scn NUMBER; +BEGIN + v_scn := DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER; + DBMS_OUTPUT.PUT_LINE('FUZZ_SCN_START: ' || v_scn); +END; +/ + +EXIT diff --git a/tests/dbz-twin/rac/soak-test.sh b/tests/dbz-twin/rac/soak-test.sh deleted file mode 100755 index 8f553173..00000000 --- a/tests/dbz-twin/rac/soak-test.sh +++ /dev/null @@ -1,644 +0,0 @@ -#!/usr/bin/env bash -# soak-test.sh — Sustained OLR soak test with memory monitoring + data accuracy. -# -# Runs OLR in online mode against RAC for a specified duration, each round -# doing randomized DML (INSERT/UPDATE/DELETE mix, varying batch sizes, -# occasional rollbacks) on both nodes + log switch. Monitors OLR container -# memory at each round. After all rounds, compares cumulative OLR output -# against LogMiner via the Debezium twin-test infrastructure. -# -# Usage: ./soak-test.sh [duration-minutes] -# duration-minutes How long to run DML rounds (default: 30) -# -# Prerequisites: -# - RAC VM running with containers started -# - OLR image loaded on VM (podman load) -# - One-time setup done (./setup.sh) -# - Local services running (make up) - -set -euo pipefail - -SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" -DBZ_TWIN_DIR="$(cd "$SCRIPT_DIR/.." && pwd)" -TESTS_DIR="$(cd "$DBZ_TWIN_DIR/.." && pwd)" -RAC_ENV_DIR="$TESTS_DIR/environments/rac" - -DURATION_MINUTES="${1:-30}" -DURATION_SECONDS=$(( DURATION_MINUTES * 60 )) - -# ---- RAC configuration (auto-detect VM IP) ---- -source "$RAC_ENV_DIR/vm-env.sh" -OLR_IMAGE="${OLR_IMAGE:-docker.io/library/olr-dev:latest}" -RAC_NODE1="${RAC_NODE1:-racnodep1}" -RAC_NODE2="${RAC_NODE2:-racnodep2}" -ORACLE_SID1="${ORACLE_SID1:-ORCLCDB1}" -ORACLE_SID2="${ORACLE_SID2:-ORCLCDB2}" -DB_CONN1="${DB_CONN1:-olr_test/olr_test@//racnodep1:1521/ORCLPDB}" -DB_CONN2="${DB_CONN2:-olr_test/olr_test@//racnodep2:1521/ORCLPDB}" - -OLR_CONTAINER="olr-debezium" -RECEIVER_URL="${RECEIVER_URL:-http://localhost:8080}" -POLL_TIMEOUT="${POLL_TIMEOUT:-180}" - -# ---- SSH helpers ---- -_vm_sqlplus() { - local node="$1" sid="$2" conn="$3" sql_file="$4" - ssh $_SSH_OPTS "${VM_USER}@${VM_HOST}" \ - "podman exec $node su - oracle -c 'export ORACLE_SID=$sid; sqlplus -S \"$conn\" @$sql_file'" -} - -_vm_copy_in() { - local local_path="$1" container_path="$2" node="$3" - local staging="/tmp/_soak_staging_$$" - scp $_SSH_OPTS "$local_path" "${VM_USER}@${VM_HOST}:${staging}" - ssh $_SSH_OPTS "${VM_USER}@${VM_HOST}" "podman cp ${staging} ${node}:${container_path}; rm -f ${staging}" -} - -_exec_sysdba() { - local sql_file="$1" - local remote="/tmp/$(basename "$sql_file")" - _vm_copy_in "$sql_file" "$remote" "$RAC_NODE1" - _vm_sqlplus "$RAC_NODE1" "$ORACLE_SID1" "/ as sysdba" "$remote" -} - -_exec_user() { - local sql_file="$1" - local node="${2:-$RAC_NODE1}" sid="${3:-$ORACLE_SID1}" conn="${4:-$DB_CONN1}" - local remote="/tmp/$(basename "$sql_file")" - _vm_copy_in "$sql_file" "$remote" "$node" - _vm_sqlplus "$node" "$sid" "$conn" "$remote" -} - -# Get OLR process memory (RSS in MB) — reads actual OLR process, not shell wrapper -_olr_memory_mb() { - ssh $_SSH_OPTS "${VM_USER}@${VM_HOST}" \ - "podman exec $OLR_CONTAINER sh -c 'cat /proc/\$(pgrep -f OpenLogReplicator | head -1)/status 2>/dev/null | grep VmRSS | awk \"{printf \\\"%.0f\\\", \\\$2/1024}\"'" 2>/dev/null || echo "N/A" -} - -WORK_DIR=$(mktemp -d /tmp/soak_rac_XXXXXX) -MEMORY_LOG="$WORK_DIR/memory.csv" -trap 'rm -rf "$WORK_DIR"' EXIT - -echo "=== OLR RAC Soak Test ===" -echo " Duration: ${DURATION_MINUTES} minutes" -echo " DML: randomized INSERT/UPDATE/DELETE mix on 3 tables" -echo " Memory log: $MEMORY_LOG" -echo "" - -# ---- Stage 1: Verify services ---- -echo "--- Stage 1: Verify services ---" - -if ! curl -sf "$RECEIVER_URL/health" > /dev/null 2>&1; then - echo "ERROR: Receiver not responding at $RECEIVER_URL" >&2 - echo "Run: make -C tests/environments/rac/debezium up" >&2 - exit 1 -fi -echo " Receiver: OK" - -if ! ssh $_SSH_OPTS "${VM_USER}@${VM_HOST}" \ - "podman exec $RAC_NODE1 su - oracle -c 'export ORACLE_SID=$ORACLE_SID1; printf \"SELECT 1 FROM dual;\nEXIT;\n\" | sqlplus -S / as sysdba'" 2>/dev/null | grep -q "1"; then - echo "ERROR: RAC Oracle not reachable on $VM_HOST" >&2 - exit 1 -fi -echo " Oracle RAC: OK" - -if ! docker ps --format '{{.Names}}' | grep -q "^dbz-logminer$"; then - echo "ERROR: Container dbz-logminer not running" >&2 - exit 1 -fi -echo " Debezium: OK" - -# ---- Stage 2: Create soak test tables + start OLR ---- -echo "" -echo "--- Stage 2: Setup tables and start OLR ---" - -# Three tables with different column profiles to exercise different code paths -cat > "$WORK_DIR/setup.sql" <<'SQL' -SET SERVEROUTPUT ON -SET FEEDBACK OFF - --- Table 1: Simple key-value with VARCHAR2 -BEGIN EXECUTE IMMEDIATE 'DROP TABLE olr_test.SOAK_KV PURGE'; EXCEPTION WHEN OTHERS THEN IF SQLCODE != -942 THEN RAISE; END IF; END; -/ -CREATE TABLE olr_test.SOAK_KV ( - id NUMBER PRIMARY KEY, - val VARCHAR2(200), - amount NUMBER(12,2), - flag NUMBER(1) DEFAULT 0, - updated TIMESTAMP DEFAULT SYSTIMESTAMP -); -ALTER TABLE olr_test.SOAK_KV ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; - --- Table 2: Wide row with various numeric types -BEGIN EXECUTE IMMEDIATE 'DROP TABLE olr_test.SOAK_WIDE PURGE'; EXCEPTION WHEN OTHERS THEN IF SQLCODE != -942 THEN RAISE; END IF; END; -/ -CREATE TABLE olr_test.SOAK_WIDE ( - id NUMBER PRIMARY KEY, - col_int NUMBER(10), - col_big NUMBER(18), - col_dec NUMBER(15,4), - col_str1 VARCHAR2(100), - col_str2 VARCHAR2(100), - col_date DATE DEFAULT SYSDATE, - col_ts TIMESTAMP DEFAULT SYSTIMESTAMP -); -ALTER TABLE olr_test.SOAK_WIDE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; - --- Table 3: CLOB for LOB processing paths -BEGIN EXECUTE IMMEDIATE 'DROP TABLE olr_test.SOAK_LOB PURGE'; EXCEPTION WHEN OTHERS THEN IF SQLCODE != -942 THEN RAISE; END IF; END; -/ -CREATE TABLE olr_test.SOAK_LOB ( - id NUMBER PRIMARY KEY, - label VARCHAR2(50), - content CLOB -); -ALTER TABLE olr_test.SOAK_LOB ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; - -DECLARE - v_scn NUMBER; -BEGIN - v_scn := DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER; - DBMS_OUTPUT.PUT_LINE('SOAK_SCN_START: ' || v_scn); -END; -/ - -EXIT -SQL -SETUP_OUT=$(_exec_user "$WORK_DIR/setup.sql") -echo "$SETUP_OUT" - -# Force log switch -cat > "$WORK_DIR/log_switch.sql" <<'SQL' -SET FEEDBACK OFF -ALTER SYSTEM SWITCH ALL LOGFILE; -BEGIN DBMS_SESSION.SLEEP(2); END; -/ -EXIT -SQL -_exec_sysdba "$WORK_DIR/log_switch.sql" > /dev/null - -# Stop existing OLR -ssh $_SSH_OPTS "${VM_USER}@${VM_HOST}" \ - "podman stop -t5 $OLR_CONTAINER 2>/dev/null; podman rm $OLR_CONTAINER 2>/dev/null; true" - -# Deploy OLR config -ssh $_SSH_OPTS "${VM_USER}@${VM_HOST}" "mkdir -p /root/olr-debezium/config /root/olr-debezium/checkpoint" -scp $_SSH_OPTS "$SCRIPT_DIR/config/olr-config.json" "${VM_USER}@${VM_HOST}:/root/olr-debezium/config/" -ssh $_SSH_OPTS "${VM_USER}@${VM_HOST}" "rm -rf /root/olr-debezium/checkpoint/* && chown -R 1000:54335 /root/olr-debezium/checkpoint" - -# Start OLR — needs RAC public network to reach SCAN VIPs -echo " Starting OLR on RAC VM..." -SCAN_IP=$(ssh $_SSH_OPTS "${VM_USER}@${VM_HOST}" \ - "podman exec racnodep1 getent hosts racnodepc1-scan 2>/dev/null | head -1 | awk '{print \$1}'" 2>/dev/null) -if [[ -z "$SCAN_IP" ]]; then - echo "ERROR: Failed to resolve racnodepc1-scan IP from RAC VM" >&2 - exit 1 -fi -ssh $_SSH_OPTS "${VM_USER}@${VM_HOST}" "podman run -d --name $OLR_CONTAINER \ - --user 1000:54335 \ - --network rac_pub1_nw \ - --add-host racnodepc1-scan:${SCAN_IP} \ - -p 5000:5000 \ - -v /root/olr-debezium/config:/config:ro,Z \ - -v /root/olr-debezium/checkpoint:/olr-data/checkpoint:Z \ - -v /shared/redo:/shared/redo:ro \ - $OLR_IMAGE \ - -r -f /config/olr-config.json" > /dev/null - -# Restart Debezium connectors with clean state -echo " Restarting Debezium connectors..." -cd "$SCRIPT_DIR" -for svc in dbz-logminer dbz-olr; do - docker compose rm -sf "$svc" > /dev/null 2>&1 -done -COMPOSE_PROJECT=$(docker compose config 2>/dev/null | grep -m1 'name:' | awk '{print $2}') -COMPOSE_PROJECT="${COMPOSE_PROJECT:-debezium}" -docker volume rm -f "${COMPOSE_PROJECT}_dbz-logminer-data" "${COMPOSE_PROJECT}_dbz-olr-data" > /dev/null 2>&1 -docker compose up -d dbz-logminer dbz-olr > /dev/null 2>&1 -cd - > /dev/null - -# Wait for OLR -echo " Waiting for OLR to initialize..." -for i in $(seq 1 60); do - if ssh $_SSH_OPTS "${VM_USER}@${VM_HOST}" \ - "podman logs $OLR_CONTAINER 2>&1 | tail -5" 2>/dev/null | grep -q "processing redo log"; then - break - fi - sleep 2 -done -echo " OLR: ready" - -# Wait for Debezium connectors -echo " Waiting for Debezium connectors..." -for i in $(seq 1 30); do - if docker logs dbz-olr-adapter 2>&1 | tail -10 | grep -q "streaming client started successfully\|Starting streaming"; then - break - fi - sleep 2 -done -for i in $(seq 1 30); do - if docker logs dbz-logminer 2>&1 | tail -10 | grep -q "Starting streaming"; then - break - fi - sleep 2 -done -echo " Connectors ready" - -# Reset receiver -curl -sf -X POST "$RECEIVER_URL/reset" > /dev/null - -# Initial memory reading -INIT_MEM=$(_olr_memory_mb) -echo "" -echo " Initial OLR memory: ${INIT_MEM} MB" - -# ---- Stage 3: Run DML rounds with memory monitoring ---- -echo "" -echo "--- Stage 3: Running DML rounds for ${DURATION_MINUTES} minutes ---" -echo "round,elapsed_s,memory_mb,ops_this_round,total_ops" > "$MEMORY_LOG" -echo "0,0,${INIT_MEM},0,0" >> "$MEMORY_LOG" - -SOAK_START=$(date +%s) -TOTAL_OPS=0 -ROUND=0 -NEXT_ID_KV=1 -NEXT_ID_WIDE=1 -NEXT_ID_LOB=1 - -# Seed the PRNG (bash $RANDOM is per-process) -RANDOM=$$ - -while true; do - ELAPSED=$(( $(date +%s) - SOAK_START )) - if [[ $ELAPSED -ge $DURATION_SECONDS ]]; then - break - fi - - ROUND=$(( ROUND + 1 )) - ROUND_OPS=0 - - # Randomize which operations this round does: - # - batch size: 5-80 rows - # - operation mix: INSERT-heavy early, more UPDATE/DELETE as data accumulates - # - which node runs what - # - occasional rollback round (~10%) - # - occasional LOB round (~20%) - BATCH_SIZE=$(( RANDOM % 76 + 5 )) # 5-80 - DO_ROLLBACK=$(( RANDOM % 10 )) # 0 = rollback round (10%) - DO_LOB=$(( RANDOM % 5 )) # 0 = LOB round (20%) - OP_DICE=$(( RANDOM % 100 )) - - # Decide operation type based on accumulated data - if [[ $NEXT_ID_KV -le 50 ]]; then - # Not enough rows yet — always INSERT - OP_TYPE="insert" - elif [[ $OP_DICE -lt 40 ]]; then - OP_TYPE="insert" - elif [[ $OP_DICE -lt 70 ]]; then - OP_TYPE="update" - elif [[ $OP_DICE -lt 85 ]]; then - OP_TYPE="mixed" - else - OP_TYPE="delete" - fi - - # Decide which node does what (swap every ~third round) - if [[ $(( RANDOM % 3 )) -eq 0 ]]; then - PRIMARY_NODE="$RAC_NODE2"; PRIMARY_SID="$ORACLE_SID2"; PRIMARY_CONN="$DB_CONN2" - SECONDARY_NODE="$RAC_NODE1"; SECONDARY_SID="$ORACLE_SID1"; SECONDARY_CONN="$DB_CONN1" - else - PRIMARY_NODE="$RAC_NODE1"; PRIMARY_SID="$ORACLE_SID1"; PRIMARY_CONN="$DB_CONN1" - SECONDARY_NODE="$RAC_NODE2"; SECONDARY_SID="$ORACLE_SID2"; SECONDARY_CONN="$DB_CONN2" - fi - - # ---- Generate SQL for primary node ---- - cat > "$WORK_DIR/dml_primary.sql" <> "$WORK_DIR/dml_primary.sql" < (NEXT_ID_KV - UPD_START) ? (NEXT_ID_KV - UPD_START) : BATCH_SIZE )) - [[ $UPD_COUNT -lt 1 ]] && UPD_COUNT=1 - cat >> "$WORK_DIR/dml_primary.sql" <> "$WORK_DIR/dml_primary.sql" < 20 ? 20 : BATCH_SIZE )) - [[ $DEL_COUNT -gt $(( NEXT_ID_KV - DEL_START )) ]] && DEL_COUNT=$(( NEXT_ID_KV - DEL_START )) - [[ $DEL_COUNT -lt 1 ]] && DEL_COUNT=1 - cat >> "$WORK_DIR/dml_primary.sql" <> "$WORK_DIR/dml_primary.sql" - - # ---- Generate SQL for secondary node (always SOAK_WIDE table) ---- - WIDE_BATCH=$(( RANDOM % 30 + 5 )) # 5-34 - cat > "$WORK_DIR/dml_secondary.sql" < "$WORK_DIR/dml_lob.sql" < "$WORK_DIR/dml_rollback.sql" < /dev/null - _exec_user "$WORK_DIR/dml_secondary.sql" "$SECONDARY_NODE" "$SECONDARY_SID" "$SECONDARY_CONN" > /dev/null - - if [[ $DO_LOB -eq 0 && -f "$WORK_DIR/dml_lob.sql" ]]; then - _exec_user "$WORK_DIR/dml_lob.sql" "$PRIMARY_NODE" "$PRIMARY_SID" "$PRIMARY_CONN" > /dev/null - fi - - if [[ $DO_ROLLBACK -eq 0 && -f "$WORK_DIR/dml_rollback.sql" ]]; then - _exec_user "$WORK_DIR/dml_rollback.sql" "$SECONDARY_NODE" "$SECONDARY_SID" "$SECONDARY_CONN" > /dev/null - fi - - TOTAL_OPS=$(( TOTAL_OPS + ROUND_OPS )) - - # Force log switch every round - _exec_sysdba "$WORK_DIR/log_switch.sql" > /dev/null - - # Record memory - ELAPSED=$(( $(date +%s) - SOAK_START )) - MEM=$(_olr_memory_mb) - echo "$ROUND,$ELAPSED,$MEM,$ROUND_OPS,$TOTAL_OPS" >> "$MEMORY_LOG" - - printf "\r Round %3d | %4ds/%ds | OLR: %s MB | ops: %d (+%d) | %s " \ - "$ROUND" "$ELAPSED" "$DURATION_SECONDS" "$MEM" "$TOTAL_OPS" "$ROUND_OPS" "$OP_TYPE" - - # Brief pause - sleep 2 - - # Clean up optional files - rm -f "$WORK_DIR/dml_lob.sql" "$WORK_DIR/dml_rollback.sql" -done - -echo "" -echo " DML complete: $TOTAL_OPS operations across $ROUND rounds" - -# ---- Stage 4: Insert sentinel + wait for completion ---- -echo "" -echo "--- Stage 4: Insert sentinel and wait for processing ---" - -# Extra log switches to flush everything -_exec_sysdba "$WORK_DIR/log_switch.sql" > /dev/null -sleep 3 -_exec_sysdba "$WORK_DIR/log_switch.sql" > /dev/null - -cat > "$WORK_DIR/sentinel.sql" <<'SQL' -DELETE FROM DEBEZIUM_SENTINEL; -INSERT INTO DEBEZIUM_SENTINEL VALUES (1, 'soak-test'); -COMMIT; -EXIT; -SQL -_exec_user "$WORK_DIR/sentinel.sql" > /dev/null -echo " Sentinel inserted" - -_exec_sysdba "$WORK_DIR/log_switch.sql" > /dev/null - -# Wait for both connectors -START_TIME=$(date +%s) -while true; do - ELAPSED=$(( $(date +%s) - START_TIME )) - if [[ $ELAPSED -ge $POLL_TIMEOUT ]]; then - echo "" - echo "ERROR: Timeout after ${POLL_TIMEOUT}s waiting for events" >&2 - STATUS=$(curl -sf "$RECEIVER_URL/status" 2>/dev/null || echo '{}') - echo " Final status: $STATUS" >&2 - break - fi - - STATUS=$(curl -sf "$RECEIVER_URL/status" 2>/dev/null || echo '{}') - LM_SENTINEL=$(echo "$STATUS" | python3 -c "import sys,json; print(json.load(sys.stdin).get('logminer_sentinel',False))" 2>/dev/null || echo "False") - OLR_SENTINEL=$(echo "$STATUS" | python3 -c "import sys,json; print(json.load(sys.stdin).get('olr_sentinel',False))" 2>/dev/null || echo "False") - LM_COUNT=$(echo "$STATUS" | python3 -c "import sys,json; print(json.load(sys.stdin).get('logminer_count',0))" 2>/dev/null || echo "0") - OLR_COUNT=$(echo "$STATUS" | python3 -c "import sys,json; print(json.load(sys.stdin).get('olr_count',0))" 2>/dev/null || echo "0") - - printf "\r [%3ds] LogMiner: %s events (sentinel: %s) | OLR: %s events (sentinel: %s)" \ - "$ELAPSED" "$LM_COUNT" "$LM_SENTINEL" "$OLR_COUNT" "$OLR_SENTINEL" - - if [[ "$LM_SENTINEL" == "True" && "$OLR_SENTINEL" == "True" ]]; then - echo "" - echo " Both connectors have processed all events" - break - fi - - sleep 2 -done - -# Final memory reading -FINAL_MEM=$(_olr_memory_mb) - -# ---- Stage 5: Compare outputs ---- -echo "" -echo "--- Stage 5: Compare LogMiner vs OLR Debezium output ---" - -LM_FILE="$SCRIPT_DIR/output/logminer.jsonl" -OLR_FILE="$SCRIPT_DIR/output/olr.jsonl" - -COMPARE_RESULT=0 -if [[ ! -s "$LM_FILE" ]]; then - echo "ERROR: LogMiner output is empty: $LM_FILE" >&2 - COMPARE_RESULT=1 -elif [[ ! -s "$OLR_FILE" ]]; then - echo "ERROR: OLR output is empty: $OLR_FILE" >&2 - COMPARE_RESULT=1 -elif python3 "$DBZ_TWIN_DIR/compare-debezium.py" "$LM_FILE" "$OLR_FILE"; then - echo " Data accuracy: PASS" -else - echo " Data accuracy: FAIL" - COMPARE_RESULT=1 -fi - -# ---- Stage 6: Memory report ---- -echo "" -echo "--- Stage 6: Memory report ---" -echo "" - -# Print header + sampled rows from CSV -echo " Round | Elapsed | Memory (MB) | Ops (round) | Total Ops" -echo " ------|---------|-------------|-------------|----------" -LINE_COUNT=$(wc -l < "$MEMORY_LOG") -# Show ~20 evenly spaced rows -STEP=$(( (LINE_COUNT - 1) / 20 )) -[[ $STEP -lt 1 ]] && STEP=1 -ROW=0 -while IFS=, read -r r e m rops tops; do - [[ "$r" == "round" ]] && continue - ROW=$(( ROW + 1 )) - if [[ "$r" == "0" || "$r" == "$ROUND" || $(( ROW % STEP )) == 0 ]]; then - printf " %5s | %6ss | %10s | %10s | %s\n" "$r" "$e" "$m" "$rops" "$tops" - fi -done < "$MEMORY_LOG" - -echo "" -echo " Initial memory: ${INIT_MEM} MB" -echo " Final memory: ${FINAL_MEM} MB" - -# Get OLR's self-reported memory HWM from logs -OLR_HWM=$(ssh $_SSH_OPTS "${VM_USER}@${VM_HOST}" \ - "podman logs $OLR_CONTAINER 2>&1 | grep 'Memory HWM' | tail -1" 2>/dev/null || echo "N/A") -if [[ -n "$OLR_HWM" && "$OLR_HWM" != "N/A" ]]; then - echo " OLR self-reported: $OLR_HWM" -fi - -# Memory trend analysis -if [[ "$INIT_MEM" != "N/A" && "$FINAL_MEM" != "N/A" ]]; then - GROWTH=$(( FINAL_MEM - INIT_MEM )) - if [[ $GROWTH -gt 200 ]]; then - echo "" - echo " WARNING: Memory grew by ${GROWTH} MB — possible leak" - elif [[ $GROWTH -gt 50 ]]; then - echo "" - echo " NOTE: Memory grew by ${GROWTH} MB — moderate growth" - else - echo "" - echo " Memory stable (delta: ${GROWTH} MB)" - fi -fi - -# ---- Summary ---- -echo "" -echo "========================================" -echo " Soak Test Summary" -echo "========================================" -echo " Rounds: $ROUND" -echo " Total ops: $TOTAL_OPS" -echo " Duration: $(( $(date +%s) - SOAK_START ))s" -echo " Memory: ${INIT_MEM} MB -> ${FINAL_MEM} MB" - -if [[ $COMPARE_RESULT -eq 0 ]]; then - echo " Accuracy: PASS" - echo "" - echo "=== PASS: Soak test completed ===" -else - echo " Accuracy: FAIL" - echo "" - echo "=== FAIL: Soak test data accuracy mismatch ===" - echo " LogMiner output: $LM_FILE" - echo " OLR output: $OLR_FILE" -fi - -# Copy memory log to a persistent location -PERSIST_LOG="/tmp/soak-test-memory-$(date +%Y%m%d-%H%M%S).csv" -cp "$MEMORY_LOG" "$PERSIST_LOG" -echo "" -echo " Memory CSV saved to: $PERSIST_LOG" - -exit $COMPARE_RESULT diff --git a/tests/dbz-twin/rac/validator.py b/tests/dbz-twin/rac/validator.py new file mode 100644 index 00000000..ab7c478c --- /dev/null +++ b/tests/dbz-twin/rac/validator.py @@ -0,0 +1,334 @@ +#!/usr/bin/env python3 +"""Continuous validator for fuzz test — compares LogMiner vs OLR events in SQLite. + +Walks both lm_events and olr_events tables sorted by event_id, comparing +records one-by-one. Uses a watermark cursor to only validate up to the +minimum of both sides' max event_id (handles lag). + +For LOB tables, LogMiner splits events (INSERT + UPDATE for same event_id). +The validator merges these before comparing. + +Environment variables: + SQLITE_DB — SQLite database path (default: /app/data/fuzz.db) + POLL_INTERVAL — Seconds between validation polls (default: 10) + IDLE_TIMEOUT — Seconds of no new events before declaring done (default: 120) +""" + +import json +import os +import sqlite3 +import sys +import time + +SQLITE_DB = os.environ.get('SQLITE_DB', '/app/data/fuzz.db') +POLL_INTERVAL = int(os.environ.get('POLL_INTERVAL', '10')) +IDLE_TIMEOUT = int(os.environ.get('IDLE_TIMEOUT', '120')) + +# Known LOB phantom transaction issues (olr#26, olr#10) +# These produce expected mismatches — report but don't fail +KNOWN_LOB_TABLES = {'FUZZ_LOB'} + + +def normalize_value(v): + """Normalize a value for comparison.""" + if v is None: + return None + s = str(v) + # Timezone normalization: Z == +00:00 + if s.endswith('Z'): + return s[:-1] + '+00:00' + return s + + +def normalize_columns(d): + """Normalize a dict of column->value to comparable form.""" + if not d or not isinstance(d, dict): + return {} + return {k.upper(): normalize_value(v) for k, v in d.items()} + + +# Debezium's LOB unavailable markers +UNAVAILABLE_MARKERS = { + '__debezium_unavailable_value', + 'X19kZWJleml1bV91bmF2YWlsYWJsZV92YWx1ZQ==', +} + + +def is_unavailable(v): + return v is not None and v in UNAVAILABLE_MARKERS + + +def merge_lob_records(records): + """Merge LogMiner LOB split records (same event_id, multiple seq values). + Returns the merged after/before dicts.""" + if len(records) == 1: + event = json.loads(records[0]['raw_json']) + return normalize_columns(event.get('after')), normalize_columns(event.get('before')) + + # Sort by seq, merge after-images progressively + sorted_recs = sorted(records, key=lambda r: r['seq']) + merged_after = {} + first_before = {} + + for i, rec in enumerate(sorted_recs): + event = json.loads(rec['raw_json']) + after = normalize_columns(event.get('after')) + for k, v in after.items(): + merged_after[k] = v + if i == 0: + first_before = normalize_columns(event.get('before')) + + return merged_after, first_before + + +def compare_values(lm_cols, olr_cols, table, section='after'): + """Compare two normalized column dicts. Returns list of diff strings. + + section: 'before' or 'after'. LOB unavailable markers are only skipped + in 'before' images — Oracle doesn't provide old LOB values in redo. + In 'after' images, unavailable markers indicate a real problem. + """ + diffs = [] + all_keys = set(lm_cols.keys()) | set(olr_cols.keys()) + for key in sorted(all_keys): + if key in ('EVENT_ID',): + continue # Event ID verified separately + va = lm_cols.get(key) + vb = olr_cols.get(key) + if key not in lm_cols or key not in olr_cols: + continue # Supplemental logging differences + if section == 'before' and (is_unavailable(va) or is_unavailable(vb)): + continue # LOB before-image unavailable (Oracle limitation) + if va != vb: + diffs.append(f" {key}: LM={va!r} OLR={vb!r}") + return diffs + + +def main(): + print(f"Validator starting", flush=True) + print(f" SQLite DB: {SQLITE_DB}", flush=True) + print(f" Poll interval: {POLL_INTERVAL}s", flush=True) + print(f" Idle timeout: {IDLE_TIMEOUT}s", flush=True) + + # Wait for database to exist + while not os.path.exists(SQLITE_DB): + time.sleep(2) + + conn = sqlite3.connect(SQLITE_DB) + conn.row_factory = sqlite3.Row + conn.execute("PRAGMA journal_mode=WAL") + + cursor_by_node = {'N1': '', 'N2': ''} # Per-node watermark + total_validated = 0 + total_matched = 0 + total_mismatches = 0 + total_lob_known = 0 # Known LOB issues (expected) + total_missing_lm = 0 + total_missing_olr = 0 + last_new_events = time.time() + prev_lm_count = 0 + prev_olr_count = 0 + + try: + while True: + time.sleep(POLL_INTERVAL) + + # Get current counts + lm_count = conn.execute("SELECT COUNT(*) FROM lm_events").fetchone()[0] + olr_count = conn.execute("SELECT COUNT(*) FROM olr_events").fetchone()[0] + + # Check for new events (idle detection) + if lm_count != prev_lm_count or olr_count != prev_olr_count: + last_new_events = time.time() + prev_lm_count = lm_count + prev_olr_count = olr_count + + # Find safe frontier per node: min(lm, olr) for each N{x} prefix. + # Event_ids from two RAC nodes interleave non-monotonically in + # commit order, so a global frontier would validate events before + # the other side has delivered them. + node_frontiers = {} + for node_prefix in ('N1', 'N2'): + lm_node_max = conn.execute( + "SELECT MAX(event_id) FROM lm_events WHERE event_id LIKE ?", + (f'{node_prefix}_%',)).fetchone()[0] + olr_node_max = conn.execute( + "SELECT MAX(event_id) FROM olr_events WHERE event_id LIKE ?", + (f'{node_prefix}_%',)).fetchone()[0] + if lm_node_max and olr_node_max: + node_frontiers[node_prefix] = min(lm_node_max, olr_node_max) + + if not node_frontiers: + continue + + # Check if any node has new events beyond its cursor + any_new = any( + nf > cursor_by_node.get(np, '') + for np, nf in node_frontiers.items() + ) + if not any_new: + if time.time() - last_new_events > IDLE_TIMEOUT: + print(f"[validator] Idle timeout ({IDLE_TIMEOUT}s). " + f"Final validation pass...", flush=True) + # Widen frontier to max of both sides per node to catch + # truly missing events (one side never delivered them). + for node_prefix in ('N1', 'N2'): + lm_n = conn.execute( + "SELECT MAX(event_id) FROM lm_events WHERE event_id LIKE ?", + (f'{node_prefix}_%',)).fetchone()[0] + olr_n = conn.execute( + "SELECT MAX(event_id) FROM olr_events WHERE event_id LIKE ?", + (f'{node_prefix}_%',)).fetchone()[0] + if lm_n or olr_n: + node_frontiers[node_prefix] = max(lm_n or '', olr_n or '') + # Re-check if there's anything new with widened frontier + any_new = any( + nf > cursor_by_node.get(np, '') + for np, nf in node_frontiers.items() + ) + if not any_new: + break + # Fall through to validate the widened range + else: + continue + + # Fetch event_ids within each node's safe frontier + lm_ids = set() + olr_ids = set() + for node_prefix, nf in node_frontiers.items(): + node_cursor = cursor_by_node.get(node_prefix, '') + for r in conn.execute( + "SELECT DISTINCT event_id FROM lm_events " + "WHERE event_id > ? AND event_id <= ? AND event_id LIKE ?", + (node_cursor, nf, f'{node_prefix}_%')).fetchall(): + lm_ids.add(r['event_id']) + for r in conn.execute( + "SELECT DISTINCT event_id FROM olr_events " + "WHERE event_id > ? AND event_id <= ? AND event_id LIKE ?", + (node_cursor, nf, f'{node_prefix}_%')).fetchall(): + olr_ids.add(r['event_id']) + + all_ids = sorted(lm_ids | olr_ids) + + for eid in all_ids: + in_lm = eid in lm_ids + in_olr = eid in olr_ids + + # Determine table from whichever side has the event + if in_lm: + tbl_row = conn.execute( + "SELECT table_name FROM lm_events WHERE event_id = ? LIMIT 1", + (eid,)).fetchone() + else: + tbl_row = conn.execute( + "SELECT table_name FROM olr_events WHERE event_id = ? LIMIT 1", + (eid,)).fetchone() + event_table = tbl_row['table_name'] if tbl_row else '?' + is_lob = event_table in KNOWN_LOB_TABLES + + if in_lm and not in_olr: + total_missing_olr += 1 + if is_lob: + total_lob_known += 1 + else: + total_mismatches += 1 + print(f"[MISSING_OLR] {eid} ({event_table})", flush=True) + total_validated += 1 + continue + + if in_olr and not in_lm: + total_missing_lm += 1 + if is_lob: + total_lob_known += 1 + else: + total_mismatches += 1 + print(f"[EXTRA_OLR] {eid} ({event_table})", flush=True) + total_validated += 1 + continue + + # Both sides have the event — compare + lm_recs = conn.execute( + "SELECT * FROM lm_events WHERE event_id = ? ORDER BY seq", + (eid,) + ).fetchall() + olr_recs = conn.execute( + "SELECT * FROM olr_events WHERE event_id = ? ORDER BY seq", + (eid,) + ).fetchall() + + # Check table and op match + lm_table = lm_recs[0]['table_name'] + olr_table = olr_recs[0]['table_name'] + lm_op = lm_recs[0]['op'] + olr_op = olr_recs[0]['op'] + + if lm_table != olr_table or lm_op != olr_op: + total_mismatches += 1 + print(f"[MISMATCH] {eid}: LM={lm_op} {lm_table}, " + f"OLR={olr_op} {olr_table}", flush=True) + total_validated += 1 + continue + + # Merge LOB splits and compare values + lm_after, lm_before = merge_lob_records( + [dict(r) for r in lm_recs]) + olr_after, olr_before = merge_lob_records( + [dict(r) for r in olr_recs]) + + diffs = compare_values(lm_after, olr_after, lm_table, 'after') + diffs.extend(compare_values(lm_before, olr_before, lm_table, 'before')) + if diffs: + if is_lob: + total_lob_known += 1 + else: + total_mismatches += 1 + print(f"[VALUE_DIFF] {eid} ({lm_op} {lm_table}):", + flush=True) + for d in diffs[:5]: + print(d, flush=True) + else: + total_matched += 1 + + total_validated += 1 + + # Advance per-node cursors + for node_prefix, nf in node_frontiers.items(): + cursor_by_node[node_prefix] = nf + + # Progress report + frontier_str = ','.join(f'{k}={v}' for k, v in sorted(cursor_by_node.items())) + print(f"[validator] validated={total_validated} matched={total_matched} " + f"mismatches={total_mismatches} lob_known={total_lob_known} " + f"missing_olr={total_missing_olr} extra_olr={total_missing_lm} " + f"lm_total={lm_count} olr_total={olr_count} " + f"frontier={frontier_str}", flush=True) + + except KeyboardInterrupt: + pass + finally: + conn.close() + + # Final summary + print(f"\n{'='*60}", flush=True) + print(f" Fuzz Test Validation Summary", flush=True) + print(f"{'='*60}", flush=True) + print(f" Total validated: {total_validated}", flush=True) + print(f" Matched: {total_matched}", flush=True) + print(f" Mismatches: {total_mismatches}", flush=True) + print(f" LOB known issues: {total_lob_known}", flush=True) + print(f" Missing from OLR: {total_missing_olr}", flush=True) + print(f" Extra in OLR: {total_missing_lm}", flush=True) + + if total_mismatches > 0: + print(f"\n RESULT: FAIL ({total_mismatches} unexpected mismatches)", + flush=True) + sys.exit(1) + else: + print(f"\n RESULT: PASS", flush=True) + if total_lob_known > 0: + print(f" (with {total_lob_known} known LOB issues)", flush=True) + sys.exit(0) + + +if __name__ == '__main__': + main() diff --git a/tests/design/FUZZ-TEST-PLAN.md b/tests/design/FUZZ-TEST-PLAN.md new file mode 100644 index 00000000..c623eeed --- /dev/null +++ b/tests/design/FUZZ-TEST-PLAN.md @@ -0,0 +1,158 @@ +# Fuzz Test Framework — Implementation Plan + +## Goal + +Replace the existing soak test infrastructure with a streaming, component-based +fuzz test framework that validates OLR data accuracy under randomized workloads +over arbitrarily long periods. + +## Architecture + +``` +Oracle RAC (2 nodes) + └─ PL/SQL fuzz workload (event_id in every row) + ├─ LogMiner adapter ─→ Kafka topic: lm-events + └─ OLR adapter ─→ Kafka topic: olr-events + │ + Kafka Consumer (Python) + │ + SQLite (two tables) + │ + Validator (Python) + Walks both tables by event_id + Reports mismatches continuously +``` + +## Components + +### 1. Load Generator — `fuzz-workload.sql` + +- 7 table types: SCALAR, WIDE, LOB, PART, NOPK, MAXSTR, INTERVAL +- `event_id VARCHAR2(30)` on every table — globally unique per CDC event +- Format: `N{node}_{seq:08d}` (e.g., `N1_00000042`) + - Global monotonic sequence per node — sorts chronologically + - Table type derivable from CDC event's `source.table`, not encoded in event_id +- Seed data uses `event_id='SEED'` (skipped by consumer) +- Every INSERT generates a new event_id +- Every UPDATE sets a new event_id on the row +- DELETE uses the existing event_id on the row (from before-image) +- Transaction patterns: 55% immediate, 15% batched, 10% rollback, 10% savepoint, 10% large +- 0.5s throttle per transaction to avoid overwhelming OLR + +### 2. Kafka — Single broker, KRaft mode + +- Image: `apache/kafka:3.9.0` +- No ZooKeeper, `KAFKA_LOG_RETENTION_HOURS: 1` +- Port-mapped (not host network) — `ports: 9092:9092` +- Auto-create topics, 2 topics: `lm-events`, `olr-events` + +### 3. Debezium Server — Two instances (existing, reconfigured) + +- Switch from HTTP sink to Kafka sink +- Config files: `application-logminer-kafka.properties`, `application-olr-kafka.properties` +- Host network mode (needs access to RAC VM and Kafka via localhost) + +### 4. Kafka Consumer — `kafka-consumer.py` + +- Single Python process, subscribes to both topics (`lm-events`, `olr-events`) +- Waits for topics to appear before subscribing (handles startup ordering) +- Extracts `event_id` from Debezium JSON (`after.EVENT_ID` or `before.EVENT_ID`) +- Skips events with `event_id='SEED'` or from `FUZZ_STATS` table +- Writes to SQLite with two tables: + ```sql + CREATE TABLE lm_events ( + event_id TEXT NOT NULL, + seq INTEGER NOT NULL, -- handles LogMiner LOB split (multiple events per event_id) + table_name TEXT NOT NULL, + op TEXT NOT NULL, + raw_json TEXT NOT NULL, + consumed_at REAL NOT NULL, + PRIMARY KEY (event_id, seq) + ); + CREATE TABLE olr_events (...same schema...); + ``` +- `(event_id, seq)` PK handles LogMiner LOB splits (same event_id, multiple CDC events) +- Batch commits (every 100 records or 1 second) +- SQLite WAL mode for concurrent reader/writer +- Dependency: `kafka-python-ng` (pre-installed in consumer Docker image) + +### 5. Validator — `validator.py` + +- Continuously polls SQLite, walks both tables in sorted event_id order +- Uses per-node watermark cursors: for each RAC node, validates up to `min(max_lm_event_id, max_olr_event_id)` +- For each event_id: + - Present in both → check table/op match, merge LOB splits, compare JSON values + - Present in one only → missing/extra record +- LOB table events (`FUZZ_LOB`) classified as `lob_known` (known bugs olr#26, olr#10) +- Non-LOB mismatches counted as `mismatches` (unexpected) +- JSON comparison: normalize values, handle LOB unavailable markers, timezone formats +- Reports progress every `POLL_INTERVAL` seconds (default: 10) +- Exits after `IDLE_TIMEOUT` seconds of no new events (default: 120) +- Exit 0 = no unexpected mismatches, exit 1 = unexpected mismatches found +- Full log saved to `/tmp/fuzz-validator-*.log` for troubleshooting + +### 6. Orchestrator — `fuzz-test.sh` + +Stages: +1. Verify prerequisites (RAC VM reachable) +2. Deploy fuzz-workload.sql to RAC (creates tables + PL/SQL package) +3. Start infrastructure (Kafka, Debezium, consumer, validator, OLR) +4. Run fuzz workload on both nodes concurrently +5. Wait for pipeline drain (validator idle-timeout) +6. Report results (accuracy = pass/fail, memory = observation) + +## Files + +### Created + +| File | Purpose | +|------|---------| +| `tests/dbz-twin/rac/fuzz-test.sh` | Orchestrator | +| `tests/dbz-twin/rac/kafka-consumer.py` | Kafka → SQLite bridge | +| `tests/dbz-twin/rac/validator.py` | Continuous comparator | +| `tests/dbz-twin/rac/docker-compose-fuzz.yaml` | Kafka + consumer + validator + Debezium | +| `tests/dbz-twin/rac/config/application-logminer-kafka.properties` | Debezium LogMiner Kafka config | +| `tests/dbz-twin/rac/config/application-olr-kafka.properties` | Debezium OLR Kafka config | +| `tests/dbz-twin/rac/perf/fuzz-workload.sql` | PL/SQL fuzz workload with event_id | +| `tests/design/FUZZ-TEST-PLAN.md` | This plan | + +### To Remove (after long-run validation) + +| File | Replaced By | +|------|-------------| +| `tests/dbz-twin/rac/soak-test.sh` | `fuzz-test.sh` | +| `tests/dbz-twin/debezium-receiver.py` | `kafka-consumer.py` | +| `tests/dbz-twin/compare-debezium.py` | `validator.py` | + +**NOTE:** The HTTP receiver and compare script are also used by the single-instance +twin-test (`tests/dbz-twin/run.sh`) and scenario tests. Those must be migrated +or kept alongside. Evaluate after RAC fuzz test is validated long-term. + +## Implementation Order + +``` +Phase 1: fuzz-workload.sql — add event_id ✅ Done +Phase 2: Kafka + Debezium configs (docker-compose-fuzz.yaml) ✅ Done +Phase 3: kafka-consumer.py ✅ Done +Phase 4: validator.py ✅ Done +Phase 5: fuzz-test.sh ✅ Done +Phase 6: Validate with 5-min run ✅ Done (0 non-LOB mismatches) +Phase 7: Long-run validation (60+ min) ⬜ Pending +Phase 8: Remove old soak-test.sh, receiver, compare scripts ⬜ Pending +``` + +## Current Findings + +Initial 5-minute fuzz test results showed ~1% non-LOB phantom events. +After investigation and fixes, subsequent runs show **0 non-LOB mismatches**. +LOB known issues (olr#26 + olr#10 variant) remain expected. + +## Key Design Decisions + +- **event_id is globally unique** — comparison is set-based, not order-based +- **Global monotonic sequence** — event_id sorts chronologically, enabling sorted walk +- **Store raw Debezium JSON** — normalize at comparison time to avoid ingest bugs +- **LOB tables included** — known bugs classified separately, don't fail the test +- **SQLite** — row-oriented PK lookups + sorted range scans = B-tree sweet spot +- **Validator runs continuously** — catch mismatches during the run, not just at the end +- **Accuracy is pass/fail, memory is observation** — this is a fuzz test, not a soak test