Skip to content

Conversation

@farost
Copy link
Member

@farost farost commented Sep 3, 2025

Usage and product changes

Stabilize the database import function by eliminating rare decoding errors that could occur during the import of large datasets. All the exported files that the import function could not process are still valid and should be correctly imported after the proposed changes.

Implementation

  • For better control of the manual bytes processing, replace VecDeque with prost::bytes::* and isolate the file reader's moving in a single function (Iterator::next).
  • Since read functions can read fewer bytes than requested even without meeting EOF, replace single if calls with loops <- this was probably the main issue before.

I was not able to reduce the bug to a single test case, even with thousands of records (however, we had a report of a failure after processing 200 entries), so no new behavior tests were introduced. I decided not to spend more time searching for such cases.

@farost farost marked this pull request as ready for review September 3, 2025 18:30

fn try_read_more(&mut self, bytes_to_read: usize) -> std::io::Result<usize> {
let mut addition = vec![0; bytes_to_read];
fn read_more(&mut self, bytes_to_read: usize) -> std::io::Result<usize> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No "not-try" methods, so it's probably not a try

}

fn try_get_next_message_len(&mut self) -> Result<Option<usize>> {
fn decode_next_len(&mut self) -> Result<Option<(usize /*len*/, usize /*consumed*/)>> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Return consumed to move the buffer/cursor/reader only in a single function when needed

let mut cursor: &[u8] = &self.buffer;
match prost::decode_length_delimiter(&mut cursor) {
Ok(len) => {
let consumed = self.buffer.len() - cursor.len();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cursor points to buffer

Ok(bytes_read) if bytes_read >= required => {}
_ => return Some(Err(Error::Migration(MigrationError::CannotDecodeImportedConcept))),
let required = consumed + message_len;
while self.buffer.len() < required {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More read retries


let mut message_buf = self.get_message_buf(message_len);
Some(M::decode(&mut message_buf).map_err(|_| Error::Migration(MigrationError::CannotDecodeImportedConcept)))
self.buffer.advance(consumed);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Advance in a single place

Copy link
Member

@flyingsilverfin flyingsilverfin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest I kinda skimmed it but if it works i'm happy!

@farost farost merged commit 281a8aa into typedb:master Sep 4, 2025
0 of 9 checks passed
@farost farost deleted the fix-migration-import branch September 4, 2025 08:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants