Skip to content

Commit

Permalink
fix PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ManyTheFish committed Feb 13, 2024
1 parent e5e811e commit 48026aa
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 115 deletions.
170 changes: 91 additions & 79 deletions milli/src/update/facet/mod.rs
Expand Up @@ -173,96 +173,108 @@ impl<'i> FacetsUpdate<'i> {
incremental_update.execute(wtxn)?;
}

if let Some(normalized_delta_data) = self.normalized_delta_data {
let mut iter = normalized_delta_data.into_stream_merger_iter()?;
while let Some((key_bytes, delta_bytes)) = iter.next()? {
let deladd_reader = KvReaderDelAdd::new(delta_bytes);

let database_set = self
.index
.facet_id_normalized_string_strings
.remap_key_type::<Bytes>()
.get(wtxn, key_bytes)?
.unwrap_or_default();

let add_set = deladd_reader
.get(DelAdd::Addition)
.and_then(|bytes| serde_json::from_slice::<BTreeSet<String>>(bytes).ok())
.unwrap_or_default();

let del_set = match deladd_reader
.get(DelAdd::Deletion)
.and_then(|bytes| serde_json::from_slice::<BTreeSet<String>>(bytes).ok())
{
Some(del_set) => {
let (field_id_bytes, _) = try_split_array_at(key_bytes).unwrap();
let field_id = FieldId::from_be_bytes(field_id_bytes);
let mut set = BTreeSet::new();
for facet in del_set {
let key =
FacetGroupKey { field_id, level: 0, left_bound: facet.as_str() };
// Check if the referenced value doesn't exist anymore before deleting it.
if self.index.facet_id_string_docids.get(wtxn, &key)?.remap_data::<DecodeIgnore>().is_none() {
set.insert(facet);
}
}
set
match self.normalized_delta_data {
Some(data) => index_facet_search(wtxn, data, self.index),
None => Ok(()),
}
}
}

fn index_facet_search(
wtxn: &mut heed::RwTxn,
normalized_delta_data: Merger<BufReader<File>, MergeFn>,
index: &Index,
) -> Result<()> {
let mut iter = normalized_delta_data.into_stream_merger_iter()?;
while let Some((key_bytes, delta_bytes)) = iter.next()? {
let deladd_reader = KvReaderDelAdd::new(delta_bytes);

let database_set = index
.facet_id_normalized_string_strings
.remap_key_type::<Bytes>()
.get(wtxn, key_bytes)?
.unwrap_or_default();

let add_set = deladd_reader
.get(DelAdd::Addition)
.and_then(|bytes| serde_json::from_slice::<BTreeSet<String>>(bytes).ok())
.unwrap_or_default();

let del_set = match deladd_reader
.get(DelAdd::Deletion)
.and_then(|bytes| serde_json::from_slice::<BTreeSet<String>>(bytes).ok())
{
Some(del_set) => {
let (field_id_bytes, _) = try_split_array_at(key_bytes).unwrap();
let field_id = FieldId::from_be_bytes(field_id_bytes);
let mut set = BTreeSet::new();
for facet in del_set {
let key = FacetGroupKey { field_id, level: 0, left_bound: facet.as_str() };
// Check if the referenced value doesn't exist anymore before deleting it.
if index
.facet_id_string_docids
.remap_data_type::<DecodeIgnore>()
.get(wtxn, &key)?
.is_none()
{
set.insert(facet);
}
None => BTreeSet::new(),
};

let set: BTreeSet<_> =
database_set.difference(&del_set).chain(add_set.iter()).cloned().collect();

if set.is_empty() {
self.index
.facet_id_normalized_string_strings
.remap_key_type::<Bytes>()
.delete(wtxn, key_bytes)?;
} else {
self.index
.facet_id_normalized_string_strings
.remap_key_type::<Bytes>()
.put(wtxn, key_bytes, &set)?;
}
set
}
None => BTreeSet::new(),
};

// We clear the FST of normalized-for-search to compute everything from scratch.
self.index.facet_id_string_fst.clear(wtxn)?;
// We compute one FST by string facet
let mut text_fsts = vec![];
let mut current_fst: Option<(u16, fst::SetBuilder<Vec<u8>>)> = None;
let database =
self.index.facet_id_normalized_string_strings.remap_data_type::<DecodeIgnore>();
for result in database.iter(wtxn)? {
let ((field_id, normalized_facet), _) = result?;
current_fst = match current_fst.take() {
Some((fid, fst_builder)) if fid != field_id => {
let fst = fst_builder.into_set();
text_fsts.push((fid, fst));
Some((field_id, fst::SetBuilder::memory()))
}
Some((field_id, fst_builder)) => Some((field_id, fst_builder)),
None => Some((field_id, fst::SetBuilder::memory())),
};
let set: BTreeSet<_> =
database_set.difference(&del_set).chain(add_set.iter()).cloned().collect();

if let Some((_, fst_builder)) = current_fst.as_mut() {
fst_builder.insert(normalized_facet)?;
}
}
if set.is_empty() {
index
.facet_id_normalized_string_strings
.remap_key_type::<Bytes>()
.delete(wtxn, key_bytes)?;
} else {
index
.facet_id_normalized_string_strings
.remap_key_type::<Bytes>()
.put(wtxn, key_bytes, &set)?;
}
}

if let Some((field_id, fst_builder)) = current_fst {
// We clear the FST of normalized-for-search to compute everything from scratch.
index.facet_id_string_fst.clear(wtxn)?;
// We compute one FST by string facet
let mut text_fsts = vec![];
let mut current_fst: Option<(u16, fst::SetBuilder<Vec<u8>>)> = None;
let database = index.facet_id_normalized_string_strings.remap_data_type::<DecodeIgnore>();
for result in database.iter(wtxn)? {
let ((field_id, normalized_facet), _) = result?;
current_fst = match current_fst.take() {
Some((fid, fst_builder)) if fid != field_id => {
let fst = fst_builder.into_set();
text_fsts.push((field_id, fst));
text_fsts.push((fid, fst));
Some((field_id, fst::SetBuilder::memory()))
}
Some((field_id, fst_builder)) => Some((field_id, fst_builder)),
None => Some((field_id, fst::SetBuilder::memory())),
};

// We write those FSTs in LMDB now
for (field_id, fst) in text_fsts {
self.index.facet_id_string_fst.put(wtxn, &field_id, &fst)?;
}
if let Some((_, fst_builder)) = current_fst.as_mut() {
fst_builder.insert(normalized_facet)?;
}
Ok(())
}

if let Some((field_id, fst_builder)) = current_fst {
let fst = fst_builder.into_set();
text_fsts.push((field_id, fst));
}

// We write those FSTs in LMDB now
for (field_id, fst) in text_fsts {
index.facet_id_string_fst.put(wtxn, &field_id, &fst)?;
}

Ok(())
}

#[cfg(test)]
Expand Down
21 changes: 0 additions & 21 deletions milli/src/update/index_documents/helpers/merge_functions.rs
Expand Up @@ -223,27 +223,6 @@ pub fn merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap<'a>(
)?)
}

pub fn merge_btreeset_string<'a>(_key: &[u8], values: &[Cow<'a, [u8]>]) -> Result<Cow<'a, [u8]>> {
if values.len() == 1 {
Ok(values[0].clone())
} else {
// TODO improve the perf by using a `#[borrow] Cow<str>`.
let strings: BTreeSet<String> = values
.iter()
.map(AsRef::as_ref)
.map(serde_json::from_slice::<BTreeSet<String>>)
.map(StdResult::unwrap)
.reduce(|mut current, new| {
for x in new {
current.insert(x);
}
current
})
.unwrap();
Ok(Cow::Owned(serde_json::to_vec(&strings).unwrap()))
}
}

/// Do a union of BtreeSet on both sides of a DelAdd obkv
/// separately and outputs a new DelAdd with both unions.
pub fn merge_deladd_btreeset_string<'a>(
Expand Down
8 changes: 4 additions & 4 deletions milli/src/update/index_documents/helpers/mod.rs
Expand Up @@ -13,10 +13,10 @@ pub use grenad_helpers::{
GrenadParameters,
};
pub use merge_functions::{
keep_first, keep_latest_obkv, merge_btreeset_string, merge_cbo_roaring_bitmaps,
merge_deladd_btreeset_string, merge_deladd_cbo_roaring_bitmaps,
merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, merge_roaring_bitmaps,
obkvs_keep_last_addition_merge_deletions, obkvs_merge_additions_and_deletions, MergeFn,
keep_first, keep_latest_obkv, merge_cbo_roaring_bitmaps, merge_deladd_btreeset_string,
merge_deladd_cbo_roaring_bitmaps, merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap,
merge_roaring_bitmaps, obkvs_keep_last_addition_merge_deletions,
obkvs_merge_additions_and_deletions, MergeFn,
};

use crate::MAX_WORD_LENGTH;
Expand Down
6 changes: 3 additions & 3 deletions milli/src/update/index_documents/mod.rs
Expand Up @@ -25,9 +25,9 @@ use self::enrich::enrich_documents_batch;
pub use self::enrich::{extract_finite_float_from_value, DocumentId};
pub use self::helpers::{
as_cloneable_grenad, create_sorter, create_writer, fst_stream_into_hashset,
fst_stream_into_vec, merge_btreeset_string, merge_cbo_roaring_bitmaps,
merge_deladd_cbo_roaring_bitmaps, merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap,
merge_roaring_bitmaps, valid_lmdb_key, write_sorter_into_database, writer_into_reader, MergeFn,
fst_stream_into_vec, merge_cbo_roaring_bitmaps, merge_deladd_cbo_roaring_bitmaps,
merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, merge_roaring_bitmaps,
valid_lmdb_key, write_sorter_into_database, writer_into_reader, MergeFn,
};
use self::helpers::{grenad_obkv_into_chunks, GrenadParameters};
pub use self::transform::{Transform, TransformOutput};
Expand Down
13 changes: 8 additions & 5 deletions milli/src/update/index_documents/typed_chunk.rs
Expand Up @@ -41,15 +41,19 @@ impl ChunkAccumulator {
pub fn pop_longest(&mut self) -> Option<Vec<TypedChunk>> {
match self.inner.iter().max_by_key(|v| v.len()) {
Some(left) => {
let position = self.inner.iter().position(|right| left == right);
let position = self.inner.iter().position(|right| left.len() == right.len());
position.map(|p| self.inner.remove(p)).filter(|v| !v.is_empty())
}
None => None,
}
}

pub fn insert(&mut self, chunk: TypedChunk) {
match self.inner.iter().position(|right| Some(&chunk) == right.first()) {
match self
.inner
.iter()
.position(|right| right.first().map_or(false, |right| chunk.is_batchable_with(right)))
{
Some(position) => {
let v = self.inner.get_mut(position).unwrap();
v.push(chunk);
Expand Down Expand Up @@ -87,8 +91,8 @@ pub(crate) enum TypedChunk {
ScriptLanguageDocids(HashMap<(Script, Language), (RoaringBitmap, RoaringBitmap)>),
}

impl PartialEq for TypedChunk {
fn eq(&self, other: &Self) -> bool {
impl TypedChunk {
fn is_batchable_with(&self, other: &Self) -> bool {
use TypedChunk::*;
match (self, other) {
(FieldIdDocidFacetStrings(_), FieldIdDocidFacetStrings(_))
Expand All @@ -113,7 +117,6 @@ impl PartialEq for TypedChunk {
}
}
}
impl Eq for TypedChunk {}

impl TypedChunk {
pub fn to_debug_string(&self) -> String {
Expand Down
5 changes: 2 additions & 3 deletions milli/src/update/mod.rs
Expand Up @@ -3,9 +3,8 @@ pub use self::clear_documents::ClearDocuments;
pub use self::facet::bulk::FacetsUpdateBulk;
pub use self::facet::incremental::FacetsUpdateIncrementalInner;
pub use self::index_documents::{
merge_btreeset_string, merge_cbo_roaring_bitmaps, merge_roaring_bitmaps,
DocumentAdditionResult, DocumentId, IndexDocuments, IndexDocumentsConfig, IndexDocumentsMethod,
MergeFn,
merge_cbo_roaring_bitmaps, merge_roaring_bitmaps, DocumentAdditionResult, DocumentId,
IndexDocuments, IndexDocumentsConfig, IndexDocumentsMethod, MergeFn,
};
pub use self::indexer_config::IndexerConfig;
pub use self::settings::{validate_embedding_settings, Setting, Settings};
Expand Down

0 comments on commit 48026aa

Please sign in to comment.