Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make several indexing optimizations #4350

Merged
merged 9 commits into from Feb 14, 2024
19 changes: 10 additions & 9 deletions milli/src/update/facet/bulk.rs
@@ -1,7 +1,7 @@
use std::fs::File;
use std::io::BufReader;

use grenad::CompressionType;
use grenad::{CompressionType, Merger};
use heed::types::Bytes;
use heed::{BytesDecode, BytesEncode, Error, PutFlags, RoTxn, RwTxn};
use roaring::RoaringBitmap;
Expand All @@ -14,6 +14,7 @@ use crate::heed_codec::facet::{
use crate::heed_codec::BytesRefCodec;
use crate::update::del_add::{DelAdd, KvReaderDelAdd};
use crate::update::index_documents::{create_writer, valid_lmdb_key, writer_into_reader};
use crate::update::MergeFn;
use crate::{CboRoaringBitmapCodec, CboRoaringBitmapLenCodec, FieldId, Index, Result};

/// Algorithm to insert elememts into the `facet_id_(string/f64)_docids` databases
Expand All @@ -28,15 +29,15 @@ pub struct FacetsUpdateBulk<'i> {
facet_type: FacetType,
field_ids: Vec<FieldId>,
// None if level 0 does not need to be updated
delta_data: Option<grenad::Reader<BufReader<File>>>,
delta_data: Option<Merger<BufReader<File>, MergeFn>>,
}

impl<'i> FacetsUpdateBulk<'i> {
pub fn new(
index: &'i Index,
field_ids: Vec<FieldId>,
facet_type: FacetType,
delta_data: grenad::Reader<BufReader<File>>,
delta_data: Merger<BufReader<File>, MergeFn>,
group_size: u8,
min_level_size: u8,
) -> FacetsUpdateBulk<'i> {
Expand Down Expand Up @@ -65,7 +66,7 @@ impl<'i> FacetsUpdateBulk<'i> {
}
}

#[logging_timer::time("FacetsUpdateBulk::{}")]
#[tracing::instrument(level = "trace", skip_all, target = "indexing::facets::bulk")]
pub fn execute(self, wtxn: &mut heed::RwTxn) -> Result<()> {
let Self { index, field_ids, group_size, min_level_size, facet_type, delta_data } = self;

Expand All @@ -89,7 +90,7 @@ impl<'i> FacetsUpdateBulk<'i> {
/// Implementation of `FacetsUpdateBulk` that is independent of milli's `Index` type
pub(crate) struct FacetsUpdateBulkInner<R: std::io::Read + std::io::Seek> {
pub db: heed::Database<FacetGroupKeyCodec<BytesRefCodec>, FacetGroupValueCodec>,
pub delta_data: Option<grenad::Reader<R>>,
pub delta_data: Option<Merger<R, MergeFn>>,
pub group_size: u8,
pub min_level_size: u8,
}
Expand Down Expand Up @@ -129,8 +130,8 @@ impl<R: std::io::Read + std::io::Seek> FacetsUpdateBulkInner<R> {
if self.db.is_empty(wtxn)? {
let mut buffer = Vec::new();
let mut database = self.db.iter_mut(wtxn)?.remap_types::<Bytes, Bytes>();
let mut cursor = delta_data.into_cursor()?;
while let Some((key, value)) = cursor.move_on_next()? {
let mut iter = delta_data.into_stream_merger_iter()?;
while let Some((key, value)) = iter.next()? {
if !valid_lmdb_key(key) {
continue;
}
Expand All @@ -154,8 +155,8 @@ impl<R: std::io::Read + std::io::Seek> FacetsUpdateBulkInner<R> {
let mut buffer = Vec::new();
let database = self.db.remap_types::<Bytes, Bytes>();

let mut cursor = delta_data.into_cursor()?;
while let Some((key, value)) = cursor.move_on_next()? {
let mut iter = delta_data.into_stream_merger_iter()?;
while let Some((key, value)) = iter.next()? {
if !valid_lmdb_key(key) {
continue;
}
Expand Down
14 changes: 9 additions & 5 deletions milli/src/update/facet/incremental.rs
@@ -1,6 +1,7 @@
use std::fs::File;
use std::io::BufReader;

use grenad::Merger;
use heed::types::{Bytes, DecodeIgnore};
use heed::{BytesDecode, Error, RoTxn, RwTxn};
use obkv::KvReader;
Expand All @@ -14,6 +15,7 @@ use crate::heed_codec::BytesRefCodec;
use crate::search::facet::get_highest_level;
use crate::update::del_add::DelAdd;
use crate::update::index_documents::valid_lmdb_key;
use crate::update::MergeFn;
use crate::{CboRoaringBitmapCodec, Index, Result};

enum InsertionResult {
Expand All @@ -31,14 +33,14 @@ enum DeletionResult {
/// `facet_id_(string/f64)_docids` databases.
pub struct FacetsUpdateIncremental {
inner: FacetsUpdateIncrementalInner,
delta_data: grenad::Reader<BufReader<File>>,
delta_data: Merger<BufReader<File>, MergeFn>,
}

impl FacetsUpdateIncremental {
pub fn new(
index: &Index,
facet_type: FacetType,
delta_data: grenad::Reader<BufReader<File>>,
delta_data: Merger<BufReader<File>, MergeFn>,
group_size: u8,
min_level_size: u8,
max_group_size: u8,
Expand All @@ -61,16 +63,18 @@ impl FacetsUpdateIncremental {
}
}

#[tracing::instrument(level = "trace", skip_all, target = "indexing::facets::incremental")]
pub fn execute(self, wtxn: &mut RwTxn) -> crate::Result<()> {
let mut cursor = self.delta_data.into_cursor()?;
while let Some((key, value)) = cursor.move_on_next()? {
let mut iter = self.delta_data.into_stream_merger_iter()?;

while let Some((key, value)) = iter.next()? {
if !valid_lmdb_key(key) {
continue;
}

let key = FacetGroupKeyCodec::<BytesRefCodec>::bytes_decode(key)
.map_err(heed::Error::Encoding)?;
let value = KvReader::new(value);

let docids_to_delete = value
.get(DelAdd::Deletion)
.map(CboRoaringBitmapCodec::bytes_decode)
Expand Down