Skip to content

Commit

Permalink
change format for store to make it faster with small documents (#1569)
Browse files Browse the repository at this point in the history
* use new format for docstore blocks

* move index to end of block

it makes writing the block faster due to one less memcopy
  • Loading branch information
trinity-1686a committed Oct 4, 2022
1 parent 4cf911d commit 5945dbf
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 59 deletions.
91 changes: 39 additions & 52 deletions src/store/reader.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::io;
use std::iter::Sum;
use std::ops::AddAssign;
use std::ops::{AddAssign, Range};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};

use common::{BinarySerializable, HasLen, VInt};
use common::{BinarySerializable, HasLen};
use lru::LruCache;
use ownedbytes::OwnedBytes;

Expand Down Expand Up @@ -211,17 +211,10 @@ impl StoreReader {
doc_id: DocId,
checkpoint: &Checkpoint,
) -> crate::Result<OwnedBytes> {
let mut cursor = &block[..];
let cursor_len_before = cursor.len();
for _ in checkpoint.doc_range.start..doc_id {
let doc_length = VInt::deserialize(&mut cursor)?.val() as usize;
cursor = &cursor[doc_length..];
}
let doc_pos = doc_id - checkpoint.doc_range.start;

let doc_length = VInt::deserialize(&mut cursor)?.val() as usize;
let start_pos = cursor_len_before - cursor.len();
let end_pos = cursor_len_before - cursor.len() + doc_length;
Ok(block.slice(start_pos..end_pos))
let range = block_read_index(&block, doc_pos)?;
Ok(block.slice(range))
}

/// Iterator over all Documents in their order as they are stored in the doc store.
Expand Down Expand Up @@ -254,9 +247,7 @@ impl StoreReader {
let mut curr_block = curr_checkpoint
.as_ref()
.map(|checkpoint| self.read_block(checkpoint).map_err(|e| e.kind())); // map error in order to enable cloning
let mut block_start_pos = 0;
let mut num_skipped = 0;
let mut reset_block_pos = false;
let mut doc_pos = 0;
(0..last_doc_id)
.filter_map(move |doc_id| {
// filter_map is only used to resolve lifetime issues between the two closures on
Expand All @@ -268,24 +259,19 @@ impl StoreReader {
curr_block = curr_checkpoint
.as_ref()
.map(|checkpoint| self.read_block(checkpoint).map_err(|e| e.kind()));
reset_block_pos = true;
num_skipped = 0;
doc_pos = 0;
}

let alive = alive_bitset.map_or(true, |bitset| bitset.is_alive(doc_id));
if alive {
let ret = Some((curr_block.clone(), num_skipped, reset_block_pos));
// the map block will move over the num_skipped, so we reset to 0
num_skipped = 0;
reset_block_pos = false;
ret
let res = if alive {
Some((curr_block.clone(), doc_pos))
} else {
// we keep the number of skipped documents to move forward in the map block
num_skipped += 1;
None
}
};
doc_pos += 1;
res
})
.map(move |(block, num_skipped, reset_block_pos)| {
.map(move |(block, doc_pos)| {
let block = block
.ok_or_else(|| {
DataCorruption::comment_only(
Expand All @@ -296,30 +282,9 @@ impl StoreReader {
.map_err(|error_kind| {
std::io::Error::new(error_kind, "error when reading block in doc store")
})?;
// this flag is set, when filter_map moved to the next block
if reset_block_pos {
block_start_pos = 0;
}
let mut cursor = &block[block_start_pos..];
let mut pos = 0;
// move forward 1 doc + num_skipped in block and return length of current doc
let doc_length = loop {
let doc_length = VInt::deserialize(&mut cursor)?.val() as usize;
let num_bytes_read = block[block_start_pos..].len() - cursor.len();
block_start_pos += num_bytes_read;

pos += 1;
if pos == num_skipped + 1 {
break doc_length;
} else {
block_start_pos += doc_length;
cursor = &block[block_start_pos..];
}
};
let end_pos = block_start_pos + doc_length;
let doc_bytes = block.slice(block_start_pos..end_pos);
block_start_pos = end_pos;
Ok(doc_bytes)

let range = block_read_index(&block, doc_pos)?;
Ok(block.slice(range))
})
}

Expand All @@ -329,6 +294,28 @@ impl StoreReader {
}
}

fn block_read_index(block: &[u8], doc_pos: u32) -> crate::Result<Range<usize>> {
let doc_pos = doc_pos as usize;
let size_of_u32 = std::mem::size_of::<u32>();

let index_len_pos = block.len() - size_of_u32;
let index_len = u32::deserialize(&mut &block[index_len_pos..])? as usize;

if doc_pos > index_len {
return Err(crate::TantivyError::InternalError(
"Attempted to read doc from wrong block".to_owned(),
));
}

let index_start = block.len() - (index_len + 1) * size_of_u32;
let index = &block[index_start..index_start + index_len * size_of_u32];

let start_offset = u32::deserialize(&mut &index[doc_pos * size_of_u32..])? as usize;
let end_offset = u32::deserialize(&mut &index[(doc_pos + 1) * size_of_u32..])
.unwrap_or(index_start as u32) as usize;
Ok(start_offset..end_offset)
}

#[cfg(feature = "quickwit")]
impl StoreReader {
/// Advanced API.
Expand Down Expand Up @@ -427,7 +414,7 @@ mod tests {
assert_eq!(store.cache_stats().cache_hits, 1);
assert_eq!(store.cache_stats().cache_misses, 2);

assert_eq!(store.cache.peek_lru(), Some(9210));
assert_eq!(store.cache.peek_lru(), Some(11163));

Ok(())
}
Expand Down
30 changes: 23 additions & 7 deletions src/store/writer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::io::{self, Write};

use common::{BinarySerializable, VInt};
use common::BinarySerializable;

use super::compressors::Compressor;
use super::StoreReader;
Expand All @@ -22,6 +22,7 @@ pub struct StoreWriter {
num_docs_in_current_block: DocId,
intermediary_buffer: Vec<u8>,
current_block: Vec<u8>,
doc_pos: Vec<u32>,
block_compressor: BlockCompressor,
}

Expand All @@ -42,6 +43,7 @@ impl StoreWriter {
block_size,
num_docs_in_current_block: 0,
intermediary_buffer: Vec::new(),
doc_pos: Vec::new(),
current_block: Vec::new(),
block_compressor,
})
Expand All @@ -53,12 +55,17 @@ impl StoreWriter {

/// The memory used (inclusive childs)
pub fn mem_usage(&self) -> usize {
self.intermediary_buffer.capacity() + self.current_block.capacity()
self.intermediary_buffer.capacity()
+ self.current_block.capacity()
+ self.doc_pos.capacity() * std::mem::size_of::<u32>()
}

/// Checks if the current block is full, and if so, compresses and flushes it.
fn check_flush_block(&mut self) -> io::Result<()> {
if self.current_block.len() > self.block_size {
// this does not count the VInt storing the index lenght itself, but it is negligible in
// front of everything else.
let index_len = self.doc_pos.len() * std::mem::size_of::<usize>();
if self.current_block.len() + index_len > self.block_size {
self.send_current_block_to_compressor()?;
}
Ok(())
Expand All @@ -70,8 +77,19 @@ impl StoreWriter {
if self.current_block.is_empty() {
return Ok(());
}

let size_of_u32 = std::mem::size_of::<u32>();
self.current_block
.reserve((self.doc_pos.len() + 1) * size_of_u32);

for pos in self.doc_pos.iter() {
pos.serialize(&mut self.current_block)?;
}
(self.doc_pos.len() as u32).serialize(&mut self.current_block)?;

self.block_compressor
.compress_block_and_write(&self.current_block, self.num_docs_in_current_block)?;
self.doc_pos.clear();
self.current_block.clear();
self.num_docs_in_current_block = 0;
Ok(())
Expand All @@ -87,8 +105,7 @@ impl StoreWriter {
// calling store bytes would be preferable for code reuse, but then we can't use
// intermediary_buffer due to the borrow checker
// a new buffer costs ~1% indexing performance
let doc_num_bytes = self.intermediary_buffer.len();
VInt(doc_num_bytes as u64).serialize_into_vec(&mut self.current_block);
self.doc_pos.push(self.current_block.len() as u32);
self.current_block
.write_all(&self.intermediary_buffer[..])?;
self.num_docs_in_current_block += 1;
Expand All @@ -101,8 +118,7 @@ impl StoreWriter {
/// The document id is implicitly the current number
/// of documents.
pub fn store_bytes(&mut self, serialized_document: &[u8]) -> io::Result<()> {
let doc_num_bytes = serialized_document.len();
VInt(doc_num_bytes as u64).serialize_into_vec(&mut self.current_block);
self.doc_pos.push(self.current_block.len() as u32);
self.current_block.extend_from_slice(serialized_document);
self.num_docs_in_current_block += 1;
self.check_flush_block()?;
Expand Down

0 comments on commit 5945dbf

Please sign in to comment.