-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Replicate entries
and operations
in their topologically sorted order
#437
Conversation
for (public_key, log_heights) in remote_needs { | ||
for (log_id, seq_num) in log_heights { | ||
let entry_messages: Vec<Message> = store | ||
// Get the entries the remote needs for each log. | ||
let log_entries = store | ||
.get_entries_from(&public_key, &log_id, &seq_num) | ||
.await | ||
.expect("Fatal database error") | ||
.iter() | ||
.map(|entry| { | ||
trace!( | ||
"Prepare message containing entry at {:?} on {:?} for {}", | ||
entry.seq_num(), | ||
entry.log_id(), | ||
entry.public_key().display() | ||
); | ||
|
||
Message::Entry(entry.clone().encoded_entry, entry.payload().cloned()) | ||
}) | ||
.collect(); | ||
messages.extend(entry_messages); | ||
.expect("Fatal database error"); | ||
|
||
for entry in log_entries { | ||
// Get the entry as well as we need some additional information in order to | ||
// send the entries in the correct order. | ||
let operation = store | ||
.get_operation(&entry.hash().into()) | ||
.await | ||
.expect("Fatal database error") | ||
.expect("Operation should be in store"); | ||
|
||
// We only send entries if their operation has been materialized. | ||
if let Some(sorted_index) = operation.sorted_index { | ||
entries.push((entry, operation.document_id, sorted_index)); | ||
} | ||
} | ||
} | ||
} | ||
|
||
messages | ||
// Sort all entries by document_id & sorted_index. | ||
entries.sort_by( | ||
|(_, document_id_a, sorted_index_a), (_, document_id_b, sorted_index_b)| { | ||
(document_id_a, sorted_index_a).cmp(&(document_id_b, sorted_index_b)) | ||
}, | ||
); | ||
|
||
// Compose the actual messages. | ||
entries | ||
.iter() | ||
.map(|(entry, _, _)| { | ||
trace!( | ||
"Prepare message containing entry at {:?} on {:?} for {}", | ||
entry.seq_num(), | ||
entry.log_id(), | ||
entry.public_key().display() | ||
); | ||
|
||
Message::Entry(entry.clone().encoded_entry, entry.payload().cloned()) | ||
}) | ||
.collect() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If all of this could be it's own SQL query that would be rather nice....
Codecov ReportPatch coverage:
Additional details and impacted files@@ Coverage Diff @@
## development #437 +/- ##
===============================================
+ Coverage 90.05% 90.26% +0.20%
===============================================
Files 87 87
Lines 8439 8492 +53
===============================================
+ Hits 7600 7665 +65
+ Misses 839 827 -12
☔ View full report in Codecov by Sentry. |
Closed as work was rebased over here: #442 |
LogHeightStrategy
sendsentries
andoperations
in their topologically sorted order andIngest
expects them to arrive in this way.📋 Checklist
CHANGELOG.md