Skip to content

Commit

Permalink
Fix a bug and make some cleaning
Browse files Browse the repository at this point in the history
  • Loading branch information
ManyTheFish committed Feb 8, 2024
1 parent 4c87cac commit e8ed27b
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 151 deletions.
182 changes: 33 additions & 149 deletions milli/src/update/index_documents/mod.rs
Expand Up @@ -10,7 +10,7 @@ use std::iter::FromIterator;
use std::num::NonZeroU32;
use std::result::Result as StdResult;

use crossbeam_channel::{after, select, Receiver, Sender};
use crossbeam_channel::{Receiver, Sender};
use grenad::{Merger, MergerBuilder};
use heed::types::Str;
use heed::Database;
Expand Down Expand Up @@ -370,9 +370,9 @@ where
// compute the chunk size from the number of available threads and the inputed data size.
let total_size = flattened_documents.metadata().map(|m| m.len());
let current_num_threads = pool.current_num_threads();
// if we have more than 2 thread, create a number of chuk equal to 2/3 threads count
// if we have more than 2 thread, create a number of chunk equal to 3/4 threads count
let chunk_count = if current_num_threads > 2 {
(current_num_threads * 2 / 3).max(2)
(current_num_threads * 3 / 4).max(2)
} else {
current_num_threads
};
Expand Down Expand Up @@ -459,9 +459,36 @@ where
return Err(Error::InternalError(InternalError::AbortedIndexation));
}

select! {
recv(lmdb_writer_rx) -> msg => match msg {
Ok(result) => {
match lmdb_writer_rx.clone().recv_timeout(std::time::Duration::from_millis(250)) {
Err(status) => {
if let Some(typed_chunks) = chunk_accumulator.pop_longest() {
let (docids, is_merged_database) =
write_typed_chunk_into_index(typed_chunks, self.index, self.wtxn)?;
if !docids.is_empty() {
final_documents_ids |= docids;
let documents_seen_count = final_documents_ids.len();
(self.progress)(UpdateIndexingStep::IndexDocuments {
documents_seen: documents_seen_count as usize,
total_documents: documents_count,
});
debug!(
"We have seen {} documents on {} total document so far",
documents_seen_count, documents_count
);
}
if is_merged_database {
databases_seen += 1;
(self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase {
databases_seen,
total_databases: TOTAL_POSTING_DATABASE_COUNT,
});
}
// If no more chunk remains in the chunk accumulator and the channel is disconected, break.
} else if status == crossbeam_channel::RecvTimeoutError::Disconnected {
break;
}
}
Ok(result) => {
let typed_chunk = match result? {
TypedChunk::WordDocids {
word_docids_reader,
Expand Down Expand Up @@ -521,151 +548,8 @@ where
};

chunk_accumulator.insert(typed_chunk);
}, Err(_) => {
while let Some(typed_chunks) = chunk_accumulator.pop_longest() {
let (docids, is_merged_database) =
write_typed_chunk_into_index(typed_chunks, self.index, self.wtxn)?;
if !docids.is_empty() {
final_documents_ids |= docids;
let documents_seen_count = final_documents_ids.len();
(self.progress)(UpdateIndexingStep::IndexDocuments {
documents_seen: documents_seen_count as usize,
total_documents: documents_count,
});
debug!(
"We have seen {} documents on {} total document so far",
documents_seen_count, documents_count
);
}
if is_merged_database {
databases_seen += 1;
(self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase {
databases_seen,
total_databases: TOTAL_POSTING_DATABASE_COUNT,
});
}
}

break;
}
},
recv(after(std::time::Duration::from_millis(100))) -> _ => {
if let Some(typed_chunks) = chunk_accumulator.pop_longest() {
let (docids, is_merged_database) =
write_typed_chunk_into_index(typed_chunks, self.index, self.wtxn)?;
if !docids.is_empty() {
final_documents_ids |= docids;
let documents_seen_count = final_documents_ids.len();
(self.progress)(UpdateIndexingStep::IndexDocuments {
documents_seen: documents_seen_count as usize,
total_documents: documents_count,
});
debug!(
"We have seen {} documents on {} total document so far",
documents_seen_count, documents_count
);
}
if is_merged_database {
databases_seen += 1;
(self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase {
databases_seen,
total_databases: TOTAL_POSTING_DATABASE_COUNT,
});
}
}
},
}

// match lmdb_writer_rx.clone().recv_timeout(std::time::Duration::from_millis(500)) {
// Err(status) => {
// if let Some(typed_chunks) = chunk_accumulator.pop_longest() {
// let (docids, is_merged_database) =
// write_typed_chunk_into_index(typed_chunks, self.index, self.wtxn)?;
// if !docids.is_empty() {
// final_documents_ids |= docids;
// let documents_seen_count = final_documents_ids.len();
// (self.progress)(UpdateIndexingStep::IndexDocuments {
// documents_seen: documents_seen_count as usize,
// total_documents: documents_count,
// });
// debug!(
// "We have seen {} documents on {} total document so far",
// documents_seen_count, documents_count
// );
// }
// if is_merged_database {
// databases_seen += 1;
// (self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase {
// databases_seen,
// total_databases: TOTAL_POSTING_DATABASE_COUNT,
// });
// }
// } else if status == crossbeam_channel::RecvTimeoutError::Disconnected {
// break;
// }
// }
// Ok(result) => {
// let typed_chunk = match result? {
// TypedChunk::WordDocids {
// word_docids_reader,
// exact_word_docids_reader,
// word_fid_docids_reader,
// } => {
// let cloneable_chunk =
// unsafe { as_cloneable_grenad(&word_docids_reader)? };
// let word_docids = word_docids.get_or_insert_with(|| {
// MergerBuilder::new(merge_deladd_cbo_roaring_bitmaps as MergeFn)
// });
// word_docids.push(cloneable_chunk.into_cursor()?);
// let cloneable_chunk =
// unsafe { as_cloneable_grenad(&exact_word_docids_reader)? };
// let exact_word_docids = exact_word_docids.get_or_insert_with(|| {
// MergerBuilder::new(merge_deladd_cbo_roaring_bitmaps as MergeFn)
// });
// exact_word_docids.push(cloneable_chunk.into_cursor()?);
// let cloneable_chunk =
// unsafe { as_cloneable_grenad(&word_fid_docids_reader)? };
// let word_fid_docids = word_fid_docids.get_or_insert_with(|| {
// MergerBuilder::new(merge_deladd_cbo_roaring_bitmaps as MergeFn)
// });
// word_fid_docids.push(cloneable_chunk.into_cursor()?);
// TypedChunk::WordDocids {
// word_docids_reader,
// exact_word_docids_reader,
// word_fid_docids_reader,
// }
// }
// TypedChunk::WordPositionDocids(chunk) => {
// let cloneable_chunk = unsafe { as_cloneable_grenad(&chunk)? };
// let word_position_docids =
// word_position_docids.get_or_insert_with(|| {
// MergerBuilder::new(merge_deladd_cbo_roaring_bitmaps as MergeFn)
// });
// word_position_docids.push(cloneable_chunk.into_cursor()?);
// TypedChunk::WordPositionDocids(chunk)
// }
// TypedChunk::VectorPoints {
// expected_dimension,
// remove_vectors,
// embeddings,
// manual_vectors,
// embedder_name,
// } => {
// dimension.insert(embedder_name.clone(), expected_dimension);
// TypedChunk::VectorPoints {
// remove_vectors,
// embeddings,
// expected_dimension,
// manual_vectors,
// embedder_name,
// }
// }
// otherwise => otherwise,
// };

// chunk_accumulator.insert(typed_chunk);
// }
// }
}

// We write the field distribution into the main database
Expand Down
7 changes: 5 additions & 2 deletions milli/src/update/index_documents/typed_chunk.rs
Expand Up @@ -12,7 +12,7 @@ use obkv::{KvReader, KvWriter};
use roaring::RoaringBitmap;

use super::helpers::{
self, keep_first, merge_deladd_cbo_roaring_bitmaps,
self, keep_first, merge_deladd_btreeset_string, merge_deladd_cbo_roaring_bitmaps,
merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, merge_ignore_values, valid_lmdb_key,
CursorClonableMmap,
};
Expand All @@ -29,6 +29,9 @@ use crate::{
lat_lng_to_xyz, DocumentId, FieldId, GeoPoint, Index, InternalError, Result, SerializationError,
};

/// This struct accumulates and group the TypedChunks
/// and is able to give the biggest accumulated group to index them all together
/// with a merger.
#[derive(Default)]
pub(crate) struct ChunkAccumulator {
inner: Vec<Vec<TypedChunk>>,
Expand Down Expand Up @@ -368,7 +371,7 @@ pub(crate) fn write_typed_chunk_into_index(
let mut facet_id_string_builder =
MergerBuilder::new(merge_deladd_cbo_roaring_bitmaps as MergeFn);
let mut normalized_facet_id_string_builder =
MergerBuilder::new(merge_deladd_cbo_roaring_bitmaps as MergeFn);
MergerBuilder::new(merge_deladd_btreeset_string as MergeFn);
let mut data_size = 0;
for typed_chunk in typed_chunks {
let TypedChunk::FieldIdFacetStringDocids((
Expand Down

0 comments on commit e8ed27b

Please sign in to comment.