New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Various block streamer issues #556
Conversation
indexer, | ||
redis_stream.clone(), | ||
) | ||
.await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Original had this swallow the error but have since removed - if we fail half way through flushing blocks, we'll end up double processing. Better to just fail hard than duplicate blocks.
.parse::<near_indexer_primitives::types::BlockHeight>() | ||
.context("Failed to parse Delta Lake metadata")?; | ||
|
||
if start_block_height >= last_indexed_block_from_metadata { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is essentially for StartBlock::Latest
, we dont need to check delta lake in this case.
@@ -246,6 +246,7 @@ impl DeltaLakeClientImpl { | |||
} | |||
}) | |||
.flat_map(|index_file| index_file.heights) | |||
.filter(|block_height| *block_height >= start_block_height) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously we would add all heights for the day the height falls within. This was especially an issue for StartBlock::Latest
as we would add many blocks initially where there should be essentially none.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome work! I like the refactoring of the block processing.
lake_prefetch_size, | ||
redis_client, | ||
indexer, | ||
redis_stream, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So previously in delta lake, we passed a clone of this. Here we pass it directly. Is the reason that we end the function afterward, and are ok with it consuming the ownership or the function, and the end of this function releases it?
Mainly just trying to understand Rust best practices.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it's fine to take ownership here because redis_stream
is no longer used from here. We'd need to .clone()
if we used it later.
fabd604
to
c9f046c
Compare
No description provided.