Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
@@ -1 +1,7 @@
@AGENTS.md

## GitHub Policy

**NEVER** create PRs or issues against upstream (`bersler/OpenLogReplicator`).
All PRs and issues go to `rophy/OpenLogReplicator` only.
`gh repo set-default` is configured to `rophy/OpenLogReplicator`.
169 changes: 143 additions & 26 deletions tests/dbz-twin/compare-debezium.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
#!/usr/bin/env python3
"""Compare Debezium LogMiner vs OLR adapter outputs.

Usage: compare-debezium.py <logminer.jsonl> <olr.jsonl>
Usage: compare-debezium.py [--exclude-tables T1,T2,...] <logminer.jsonl> <olr.jsonl>

Both inputs are JSONL files with Debezium envelope events:
{"before":..., "after":..., "source":..., "op":..., "ts_ms":...}

Compares records positionally (both sides emit in SCN order) while
ignoring connector-specific metadata (source block, timestamps).
Uses content-based matching grouped by (txId, table, op) to handle
cross-node ordering differences in RAC. Falls back to positional
comparison within groups.

Exits 0 on match, 1 on mismatch with diff report.
"""

import argparse
import json
import sys
from collections import defaultdict

OP_MAP = {'c': 'INSERT', 'u': 'UPDATE', 'd': 'DELETE'}

Expand All @@ -24,6 +27,10 @@
'X19kZWJleml1bV91bmF2YWlsYWJsZV92YWx1ZQ==',
}

# Tables always excluded from comparison (stats/bookkeeping, not test data).
# Additional tables can be excluded via --exclude-tables CLI flag.
EXCLUDED_TABLES = {'FUZZ_STATS'}


def is_unavailable(v):
"""Check if a normalized value is Debezium's unavailable marker."""
Expand All @@ -44,8 +51,9 @@ def normalize_columns(d):
return {k: normalize_value(v) for k, v in d.items()}


def parse_debezium_jsonl(path):
def parse_debezium_jsonl(path, excluded_tables=None):
"""Parse a Debezium JSONL file into normalized records."""
skip_tables = EXCLUDED_TABLES | (excluded_tables or set())
records = []
with open(path) as f:
for line in f:
Expand All @@ -58,15 +66,21 @@ def parse_debezium_jsonl(path):
table = source.get('table', '')
schema = source.get('schema', '')
op = event.get('op', '')
tx_id = source.get('txId', '')

# Skip non-DML events
if op not in OP_MAP:
continue

# Skip excluded tables
if table in skip_tables:
continue

records.append({
'op': OP_MAP[op],
'schema': schema,
'table': table,
'txId': tx_id,
'before': normalize_columns(event.get('before')),
'after': normalize_columns(event.get('after')),
})
Expand Down Expand Up @@ -139,6 +153,7 @@ def _do_merge(prev, curr):
'op': prev['op'],
'schema': prev['schema'],
'table': prev['table'],
'txId': prev.get('txId', ''),
'after': _merge_columns(prev.get('after', {}), curr.get('after', {})),
'before': prev.get('before', {}),
}
Expand Down Expand Up @@ -190,46 +205,148 @@ def columns_match(cols_a, cols_b, section='after'):
return diffs


def compare(lm_records, olr_records):
"""Compare LogMiner vs OLR records positionally."""
def record_match_score(lm, olr):
"""Score how well two records match. Returns (matches, mismatches).
Higher matches and lower mismatches = better match."""
if lm['op'] != olr['op'] or lm['table'] != olr['table']:
return (0, 999)

matches = 0
mismatches = 0

for section in ('after', 'before'):
lm_cols = lm.get(section, {})
olr_cols = olr.get(section, {})
for key in set(lm_cols) & set(olr_cols):
va, vb = lm_cols.get(key), olr_cols.get(key)
if va is None or vb is None:
continue
if section == 'before' and (is_unavailable(va) or is_unavailable(vb)):
continue
if values_match(va, vb):
matches += 1
else:
mismatches += 1

return (matches, mismatches)


def match_within_group(group_key, lm_group, olr_group):
"""Content-based matching within a (txId, table, op) group.
Returns list of diff strings."""
diffs = []

if len(lm_records) != len(olr_records):
if len(lm_group) != len(olr_group):
diffs.append(
f"Record count mismatch: LogMiner={len(lm_records)}, OLR={len(olr_records)}"
f"Group {group_key}: count mismatch "
f"LogMiner={len(lm_group)}, OLR={len(olr_group)}"
)

for i, (lm, olr) in enumerate(zip(lm_records, olr_records)):
if lm['op'] != olr['op'] or lm['table'] != olr['table']:
diffs.append(
f"Record #{i+1}: op/table mismatch: "
f"LogMiner={lm['op']} {lm['table']}, "
f"OLR={olr['op']} {olr['table']}"
)
continue
# Greedy best-match: for each LM record, find best OLR match
available = list(range(len(olr_group)))
matched_pairs = []

for li, lm in enumerate(lm_group):
best_idx = None
best_score = (0, 999)
for ai, oi in enumerate(available):
score = record_match_score(lm, olr_group[oi])
# Better = more matches, fewer mismatches
if (score[1] < best_score[1]) or \
(score[1] == best_score[1] and score[0] > best_score[0]):
best_score = score
best_idx = ai
if best_idx is not None:
matched_pairs.append((li, available.pop(best_idx)))
else:
diffs.append(f"Group {group_key}: LM record {li} has no OLR match")

# Report diffs for matched pairs
for li, oi in matched_pairs:
lm = lm_group[li]
olr = olr_group[oi]

if lm['op'] in ('INSERT', 'UPDATE'):
cd = columns_match(lm.get('after', {}), olr.get('after', {}), section='after')
cd = columns_match(
lm.get('after', {}), olr.get('after', {}), section='after')
if cd:
diffs.append(f"Record #{i+1} ({lm['op']} {lm['table']}) 'after' diffs:")
diffs.append(
f"Group {group_key} ({lm['op']} {lm['table']}) 'after' diffs:")
diffs.extend(cd)

if lm['op'] in ('UPDATE', 'DELETE'):
cd = columns_match(lm.get('before', {}), olr.get('before', {}), section='before')
cd = columns_match(
lm.get('before', {}), olr.get('before', {}), section='before')
if cd:
diffs.append(f"Record #{i+1} ({lm['op']} {lm['table']}) 'before' diffs:")
diffs.append(
f"Group {group_key} ({lm['op']} {lm['table']}) 'before' diffs:")
diffs.extend(cd)

# Unmatched OLR records
for oi in available:
diffs.append(
f"Group {group_key}: extra OLR record "
f"({olr_group[oi]['op']} {olr_group[oi]['table']})")

return diffs


def main():
if len(sys.argv) != 3:
print(f"Usage: {sys.argv[0]} <logminer.jsonl> <olr.jsonl>", file=sys.stderr)
sys.exit(2)
def compare(lm_records, olr_records):
"""Compare LogMiner vs OLR records using content-based matching."""
diffs = []

if len(lm_records) != len(olr_records):
diffs.append(
f"Record count mismatch: LogMiner={len(lm_records)}, "
f"OLR={len(olr_records)}"
)

# Group by (txId, table, op)
lm_groups = defaultdict(list)
olr_groups = defaultdict(list)
for r in lm_records:
key = (r.get('txId', ''), r['table'], r['op'])
lm_groups[key].append(r)
for r in olr_records:
key = (r.get('txId', ''), r['table'], r['op'])
olr_groups[key].append(r)

all_keys = sorted(set(lm_groups.keys()) | set(olr_groups.keys()))

lm_records = parse_debezium_jsonl(sys.argv[1])
olr_records = parse_debezium_jsonl(sys.argv[2])
for key in all_keys:
lm_group = lm_groups.get(key, [])
olr_group = olr_groups.get(key, [])

if not lm_group and olr_group:
diffs.append(
f"Group {key}: {len(olr_group)} extra OLR records "
f"(no LogMiner match)")
continue
if lm_group and not olr_group:
diffs.append(
f"Group {key}: {len(lm_group)} LogMiner records "
f"missing from OLR")
continue

group_diffs = match_within_group(key, lm_group, olr_group)
diffs.extend(group_diffs)

return diffs


def main():
parser = argparse.ArgumentParser(
description='Compare Debezium LogMiner vs OLR adapter outputs.')
parser.add_argument('logminer_jsonl', help='LogMiner JSONL file')
parser.add_argument('olr_jsonl', help='OLR JSONL file')
parser.add_argument('--exclude-tables', default='',
help='Comma-separated list of additional tables to exclude')
args = parser.parse_args()

extra_excluded = set(t.strip() for t in args.exclude_tables.split(',') if t.strip())

lm_records = parse_debezium_jsonl(args.logminer_jsonl, extra_excluded)
olr_records = parse_debezium_jsonl(args.olr_jsonl, extra_excluded)

# Merge LogMiner's split LOB events (OLR already emits merged events)
lm_merged = merge_lob_events(lm_records)
Expand Down
5 changes: 5 additions & 0 deletions tests/dbz-twin/rac/Dockerfile.consumer
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
FROM python:3.12-slim
RUN pip install --no-cache-dir kafka-python-ng==2.2.3 \
&& groupadd -r app && useradd -r -g app app \
&& mkdir -p /app/data && chown app:app /app/data
USER app
Loading
Loading