Skip to content

Commit

Permalink
Merge pull request #1076 from appaquet/fix/store-reader-iterator
Browse files Browse the repository at this point in the history
Fix panic in store reader raw document iterator during segment merge
  • Loading branch information
PSeitz committed Jun 14, 2021
2 parents 9f32d40 + 873ac1a commit 221e7cb
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 3 deletions.
41 changes: 39 additions & 2 deletions src/store/mod.rs
Expand Up @@ -57,6 +57,7 @@ pub mod tests {
use futures::executor::block_on;

use super::*;
use crate::fastfield::DeleteBitSet;
use crate::schema::{self, FieldValue, TextFieldIndexing, STORED, TEXT};
use crate::schema::{Document, TextOptions};
use crate::{
Expand Down Expand Up @@ -107,15 +108,51 @@ pub mod tests {
schema
}

const NUM_DOCS: usize = 1_000;
#[test]
fn test_doc_store_iter_with_delete_bug_1077() -> crate::Result<()> {
// this will cover deletion of the first element in a checkpoint
let deleted_docids = (200..300).collect::<Vec<_>>();
let delete_bitset = DeleteBitSet::for_test(&deleted_docids, NUM_DOCS as u32);

let path = Path::new("store");
let directory = RamDirectory::create();
let store_wrt = directory.open_write(path)?;
let schema = write_lorem_ipsum_store(store_wrt, NUM_DOCS, Compressor::Lz4);
let field_title = schema.get_field("title").unwrap();
let store_file = directory.open_read(path)?;
let store = StoreReader::open(store_file)?;
for i in 0..NUM_DOCS as u32 {
assert_eq!(
*store
.get(i)?
.get_first(field_title)
.unwrap()
.text()
.unwrap(),
format!("Doc {}", i)
);
}
for (_, doc) in store.iter(Some(&delete_bitset)).enumerate() {
let doc = doc?;
let title_content = doc.get_first(field_title).unwrap().text().unwrap();
if !title_content.starts_with("Doc ") {
panic!("unexpected title_content {}", title_content);
}
}

Ok(())
}

fn test_store(compressor: Compressor) -> crate::Result<()> {
let path = Path::new("store");
let directory = RamDirectory::create();
let store_wrt = directory.open_write(path)?;
let schema = write_lorem_ipsum_store(store_wrt, 1_000, compressor);
let schema = write_lorem_ipsum_store(store_wrt, NUM_DOCS, compressor);
let field_title = schema.get_field("title").unwrap();
let store_file = directory.open_read(path)?;
let store = StoreReader::open(store_file)?;
for i in 0..1_000 {
for i in 0..NUM_DOCS as u32 {
assert_eq!(
*store
.get(i)?
Expand Down
4 changes: 3 additions & 1 deletion src/store/reader.rs
Expand Up @@ -166,6 +166,7 @@ impl StoreReader {
.map(|checkpoint| self.read_block(&checkpoint).map_err(|e| e.kind())); // map error in order to enable cloning
let mut block_start_pos = 0;
let mut num_skipped = 0;
let mut reset_block_pos = false;
(0..last_docid)
.filter_map(move |doc_id| {
// filter_map is only used to resolve lifetime issues between the two closures on
Expand All @@ -175,8 +176,8 @@ impl StoreReader {
// we keep the number of skipped documents to move forward in the map block
num_skipped += 1;
}

// check move to next checkpoint
let mut reset_block_pos = false;
if doc_id >= curr_checkpoint.as_ref().unwrap().doc_range.end {
curr_checkpoint = checkpoint_block_iter.next();
curr_block = curr_checkpoint
Expand All @@ -190,6 +191,7 @@ impl StoreReader {
let ret = Some((curr_block.clone(), num_skipped, reset_block_pos));
// the map block will move over the num_skipped, so we reset to 0
num_skipped = 0;
reset_block_pos = false;
ret
} else {
None
Expand Down

0 comments on commit 221e7cb

Please sign in to comment.