Skip to content

Commit

Permalink
fix PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ManyTheFish committed Feb 13, 2024
1 parent e5e811e commit a14bf6f
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 84 deletions.
170 changes: 91 additions & 79 deletions milli/src/update/facet/mod.rs
Expand Up @@ -173,96 +173,108 @@ impl<'i> FacetsUpdate<'i> {
incremental_update.execute(wtxn)?;
}

if let Some(normalized_delta_data) = self.normalized_delta_data {
let mut iter = normalized_delta_data.into_stream_merger_iter()?;
while let Some((key_bytes, delta_bytes)) = iter.next()? {
let deladd_reader = KvReaderDelAdd::new(delta_bytes);

let database_set = self
.index
.facet_id_normalized_string_strings
.remap_key_type::<Bytes>()
.get(wtxn, key_bytes)?
.unwrap_or_default();

let add_set = deladd_reader
.get(DelAdd::Addition)
.and_then(|bytes| serde_json::from_slice::<BTreeSet<String>>(bytes).ok())
.unwrap_or_default();

let del_set = match deladd_reader
.get(DelAdd::Deletion)
.and_then(|bytes| serde_json::from_slice::<BTreeSet<String>>(bytes).ok())
{
Some(del_set) => {
let (field_id_bytes, _) = try_split_array_at(key_bytes).unwrap();
let field_id = FieldId::from_be_bytes(field_id_bytes);
let mut set = BTreeSet::new();
for facet in del_set {
let key =
FacetGroupKey { field_id, level: 0, left_bound: facet.as_str() };
// Check if the referenced value doesn't exist anymore before deleting it.
if self.index.facet_id_string_docids.get(wtxn, &key)?.remap_data::<DecodeIgnore>().is_none() {
set.insert(facet);
}
}
set
match self.normalized_delta_data {
Some(data) => index_facet_search(wtxn, data, self.index),
None => Ok(()),
}
}
}

fn index_facet_search(
wtxn: &mut heed::RwTxn,
normalized_delta_data: Merger<BufReader<File>, MergeFn>,
index: &Index,
) -> Result<()> {
let mut iter = normalized_delta_data.into_stream_merger_iter()?;
while let Some((key_bytes, delta_bytes)) = iter.next()? {
let deladd_reader = KvReaderDelAdd::new(delta_bytes);

let database_set = index
.facet_id_normalized_string_strings
.remap_key_type::<Bytes>()
.get(wtxn, key_bytes)?
.unwrap_or_default();

let add_set = deladd_reader
.get(DelAdd::Addition)
.and_then(|bytes| serde_json::from_slice::<BTreeSet<String>>(bytes).ok())
.unwrap_or_default();

let del_set = match deladd_reader
.get(DelAdd::Deletion)
.and_then(|bytes| serde_json::from_slice::<BTreeSet<String>>(bytes).ok())
{
Some(del_set) => {
let (field_id_bytes, _) = try_split_array_at(key_bytes).unwrap();
let field_id = FieldId::from_be_bytes(field_id_bytes);
let mut set = BTreeSet::new();
for facet in del_set {
let key = FacetGroupKey { field_id, level: 0, left_bound: facet.as_str() };
// Check if the referenced value doesn't exist anymore before deleting it.
if index
.facet_id_string_docids
.remap_data_type::<DecodeIgnore>()
.get(wtxn, &key)?
.is_none()
{
set.insert(facet);
}
None => BTreeSet::new(),
};

let set: BTreeSet<_> =
database_set.difference(&del_set).chain(add_set.iter()).cloned().collect();

if set.is_empty() {
self.index
.facet_id_normalized_string_strings
.remap_key_type::<Bytes>()
.delete(wtxn, key_bytes)?;
} else {
self.index
.facet_id_normalized_string_strings
.remap_key_type::<Bytes>()
.put(wtxn, key_bytes, &set)?;
}
set
}
None => BTreeSet::new(),
};

// We clear the FST of normalized-for-search to compute everything from scratch.
self.index.facet_id_string_fst.clear(wtxn)?;
// We compute one FST by string facet
let mut text_fsts = vec![];
let mut current_fst: Option<(u16, fst::SetBuilder<Vec<u8>>)> = None;
let database =
self.index.facet_id_normalized_string_strings.remap_data_type::<DecodeIgnore>();
for result in database.iter(wtxn)? {
let ((field_id, normalized_facet), _) = result?;
current_fst = match current_fst.take() {
Some((fid, fst_builder)) if fid != field_id => {
let fst = fst_builder.into_set();
text_fsts.push((fid, fst));
Some((field_id, fst::SetBuilder::memory()))
}
Some((field_id, fst_builder)) => Some((field_id, fst_builder)),
None => Some((field_id, fst::SetBuilder::memory())),
};
let set: BTreeSet<_> =
database_set.difference(&del_set).chain(add_set.iter()).cloned().collect();

if let Some((_, fst_builder)) = current_fst.as_mut() {
fst_builder.insert(normalized_facet)?;
}
}
if set.is_empty() {
index
.facet_id_normalized_string_strings
.remap_key_type::<Bytes>()
.delete(wtxn, key_bytes)?;
} else {
index
.facet_id_normalized_string_strings
.remap_key_type::<Bytes>()
.put(wtxn, key_bytes, &set)?;
}
}

if let Some((field_id, fst_builder)) = current_fst {
// We clear the FST of normalized-for-search to compute everything from scratch.
index.facet_id_string_fst.clear(wtxn)?;
// We compute one FST by string facet
let mut text_fsts = vec![];
let mut current_fst: Option<(u16, fst::SetBuilder<Vec<u8>>)> = None;
let database = index.facet_id_normalized_string_strings.remap_data_type::<DecodeIgnore>();
for result in database.iter(wtxn)? {
let ((field_id, normalized_facet), _) = result?;
current_fst = match current_fst.take() {
Some((fid, fst_builder)) if fid != field_id => {
let fst = fst_builder.into_set();
text_fsts.push((field_id, fst));
text_fsts.push((fid, fst));
Some((field_id, fst::SetBuilder::memory()))
}
Some((field_id, fst_builder)) => Some((field_id, fst_builder)),
None => Some((field_id, fst::SetBuilder::memory())),
};

// We write those FSTs in LMDB now
for (field_id, fst) in text_fsts {
self.index.facet_id_string_fst.put(wtxn, &field_id, &fst)?;
}
if let Some((_, fst_builder)) = current_fst.as_mut() {
fst_builder.insert(normalized_facet)?;
}
Ok(())
}

if let Some((field_id, fst_builder)) = current_fst {
let fst = fst_builder.into_set();
text_fsts.push((field_id, fst));
}

// We write those FSTs in LMDB now
for (field_id, fst) in text_fsts {
index.facet_id_string_fst.put(wtxn, &field_id, &fst)?;
}

Ok(())
}

#[cfg(test)]
Expand Down
13 changes: 8 additions & 5 deletions milli/src/update/index_documents/typed_chunk.rs
Expand Up @@ -41,15 +41,19 @@ impl ChunkAccumulator {
pub fn pop_longest(&mut self) -> Option<Vec<TypedChunk>> {
match self.inner.iter().max_by_key(|v| v.len()) {
Some(left) => {
let position = self.inner.iter().position(|right| left == right);
let position = self.inner.iter().position(|right| left.len() == right.len());
position.map(|p| self.inner.remove(p)).filter(|v| !v.is_empty())
}
None => None,
}
}

pub fn insert(&mut self, chunk: TypedChunk) {
match self.inner.iter().position(|right| Some(&chunk) == right.first()) {
match self
.inner
.iter()
.position(|right| right.first().map_or(false, |right| chunk.is_batchable_with(right)))
{
Some(position) => {
let v = self.inner.get_mut(position).unwrap();
v.push(chunk);
Expand Down Expand Up @@ -87,8 +91,8 @@ pub(crate) enum TypedChunk {
ScriptLanguageDocids(HashMap<(Script, Language), (RoaringBitmap, RoaringBitmap)>),
}

impl PartialEq for TypedChunk {
fn eq(&self, other: &Self) -> bool {
impl TypedChunk {
fn is_batchable_with(&self, other: &Self) -> bool {
use TypedChunk::*;
match (self, other) {
(FieldIdDocidFacetStrings(_), FieldIdDocidFacetStrings(_))
Expand All @@ -113,7 +117,6 @@ impl PartialEq for TypedChunk {
}
}
}
impl Eq for TypedChunk {}

impl TypedChunk {
pub fn to_debug_string(&self) -> String {
Expand Down

0 comments on commit a14bf6f

Please sign in to comment.