-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merge changelog consistency #1
base: consistency-changelog
Are you sure you want to change the base?
Merge changelog consistency #1
Conversation
fix imports changed from auto-import fix imports changed from auto-import put and restore of positions write entire offset on commit added tests cleaning cleanup cleanup pass entire clock to changelog rebase remove import position in changelog and cache store styling add position tracking to changelog stores debugging fixed tests rebase fix test
{ | ||
final Position position = Position | ||
.emptyPosition() | ||
.withComponent("A", 0, 1); | ||
final Headers headers = new RecordHeaders(); | ||
headers.add( | ||
ChangelogRecordDeserializationHelper.CHANGELOG_VERSION_HEADER_RECORD_CONSISTENCY); | ||
headers.add(new RecordHeader( | ||
ChangelogRecordDeserializationHelper.VECTOR_KEY, | ||
PositionSerde.serialize(position).array())); | ||
entries.add(new ConsumerRecord<>("", 0, 0L, RecordBatch.NO_TIMESTAMP, | ||
TimestampType.NO_TIMESTAMP_TYPE, -1, -1, | ||
"1".getBytes(UTF_8), "a".getBytes(UTF_8), headers, Optional.empty())); | ||
|
||
position = position.update("A", 0, 2); | ||
headers.add(ChangelogRecordDeserializationHelper.CHANGELOG_VERSION_HEADER_RECORD_CONSISTENCY); | ||
headers.add(new RecordHeader(Position.VECTOR_KEY, position.serialize().array())); | ||
entries.add(new ConsumerRecord<>("", 0, 0L, RecordBatch.NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE, -1, -1, | ||
} | ||
{ | ||
final Position position = Position | ||
.emptyPosition() | ||
.withComponent("A", 0, 2); | ||
final Headers headers = new RecordHeaders(); | ||
headers.add( | ||
ChangelogRecordDeserializationHelper.CHANGELOG_VERSION_HEADER_RECORD_CONSISTENCY); | ||
headers.add(new RecordHeader( | ||
ChangelogRecordDeserializationHelper.VECTOR_KEY, | ||
PositionSerde.serialize(position).array())); | ||
entries.add(new ConsumerRecord<>("", 0, 0L, RecordBatch.NO_TIMESTAMP, | ||
TimestampType.NO_TIMESTAMP_TYPE, -1, -1, | ||
"1".getBytes(UTF_8), null, headers, Optional.empty())); | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most of the changes I made were just trivial conflict resolution, but you should be aware that there were several tests I discovered had this same bug. The Headers object was shared among all the records, so rather than sending several records with different headers, we were actually sending several objects with the same headers. To achieve the scenario you were going for, we need a fresh headers object each time.
1c41696
to
84e1349
Compare
Hey @vpapavas ,
This PR merges trunk into your branch to resolve conflicts with apache#11557
Unfortunately, the diff you'll see here contains all the lines from those other commits as well.
I think, rather than reading through this diff, you'll want to just merge it (to your PR branch), and then check out the resulting diff on your PR.
Committer Checklist (excluded from commit message)