Skip to content

Commit

Permalink
Merge #4544
Browse files Browse the repository at this point in the history
4544: Stream documents r=curquiza a=irevoire

# Pull Request

## Related issue
Fixes #4383


### Perf
2M hackernews:

main:
Time to retrieve: 7s
RAM consumption: 2+GiB

stream:
Time to retrieve: 4.7s
RAM consumption: Too small

Co-authored-by: Tamo <tamo@meilisearch.com>
  • Loading branch information
meili-bors[bot] and irevoire committed May 17, 2024
2 parents 7c19c07 + 273c6e8 commit 59ecf1c
Show file tree
Hide file tree
Showing 16 changed files with 151 additions and 91 deletions.
38 changes: 15 additions & 23 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 6 additions & 4 deletions index-scheduler/src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -785,10 +785,12 @@ impl IndexScheduler {
let dst = temp_snapshot_dir.path().join("auth");
fs::create_dir_all(&dst)?;
// TODO We can't use the open_auth_store_env function here but we should
let auth = milli::heed::EnvOpenOptions::new()
.map_size(1024 * 1024 * 1024) // 1 GiB
.max_dbs(2)
.open(&self.auth_path)?;
let auth = unsafe {
milli::heed::EnvOpenOptions::new()
.map_size(1024 * 1024 * 1024) // 1 GiB
.max_dbs(2)
.open(&self.auth_path)
}?;
auth.copy_to_file(dst.join("data.mdb"), CompactionOption::Enabled)?;

// 5. Copy and tarball the flat snapshot
Expand Down
14 changes: 8 additions & 6 deletions index-scheduler/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,10 +453,12 @@ impl IndexScheduler {
)
};

let env = heed::EnvOpenOptions::new()
.max_dbs(11)
.map_size(budget.task_db_size)
.open(options.tasks_path)?;
let env = unsafe {
heed::EnvOpenOptions::new()
.max_dbs(11)
.map_size(budget.task_db_size)
.open(options.tasks_path)
}?;

let features = features::FeatureData::new(&env, options.instance_features)?;

Expand Down Expand Up @@ -585,9 +587,9 @@ impl IndexScheduler {
}

fn is_good_heed(tasks_path: &Path, map_size: usize) -> bool {
if let Ok(env) =
if let Ok(env) = unsafe {
heed::EnvOpenOptions::new().map_size(clamp_to_page_size(map_size)).open(tasks_path)
{
} {
env.prepare_for_closing().wait();
true
} else {
Expand Down
2 changes: 1 addition & 1 deletion meilisearch-auth/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub fn open_auth_store_env(path: &Path) -> milli::heed::Result<milli::heed::Env>
let mut options = EnvOpenOptions::new();
options.map_size(AUTH_STORE_SIZE); // 1GB
options.max_dbs(2);
options.open(path)
unsafe { options.open(path) }
}

impl HeedAuthStore {
Expand Down
1 change: 0 additions & 1 deletion meilisearch-types/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,6 @@ impl ErrorCode for HeedError {
HeedError::Mdb(_)
| HeedError::Encoding(_)
| HeedError::Decoding(_)
| HeedError::InvalidDatabaseTyping
| HeedError::DatabaseClosing
| HeedError::BadOpenOptions { .. } => Code::Internal,
}
Expand Down
1 change: 1 addition & 0 deletions meilisearch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ tracing-subscriber = { version = "0.3.18", features = ["json"] }
tracing-trace = { version = "0.1.0", path = "../tracing-trace" }
tracing-actix-web = "0.7.9"
build-info = { version = "1.7.0", path = "../build-info" }
roaring = "0.10.3"

[dev-dependencies]
actix-rt = "2.9.0"
Expand Down
116 changes: 84 additions & 32 deletions meilisearch/src/routes/indexes/documents.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use std::io::ErrorKind;
use std::io::{ErrorKind, Write};

use actix_web::http::header::CONTENT_TYPE;
use actix_web::web::Data;
use actix_web::{web, HttpMessage, HttpRequest, HttpResponse};
use bstr::ByteSlice as _;
use bytes::Bytes;
use deserr::actix_web::{AwebJson, AwebQueryParameter};
use deserr::Deserr;
use futures::StreamExt;
use futures_util::Stream;
use index_scheduler::{IndexScheduler, TaskId};
use meilisearch_types::deserr::query_params::Param;
use meilisearch_types::deserr::{DeserrJsonError, DeserrQueryParamError};
Expand All @@ -22,7 +24,9 @@ use meilisearch_types::tasks::KindWithContent;
use meilisearch_types::{milli, Document, Index};
use mime::Mime;
use once_cell::sync::Lazy;
use serde::Deserialize;
use roaring::RoaringBitmap;
use serde::ser::SerializeSeq;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use tempfile::tempfile;
use tokio::fs::File;
Expand Down Expand Up @@ -230,6 +234,34 @@ pub async fn get_documents(
documents_by_query(&index_scheduler, index_uid, query)
}

pub struct Writer2Streamer {
sender: tokio::sync::mpsc::Sender<Result<Bytes, anyhow::Error>>,
}

impl Write for Writer2Streamer {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.sender.blocking_send(Ok(buf.to_vec().into())).map_err(std::io::Error::other)?;
Ok(buf.len())
}

fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}

pub fn stream(
data: impl Serialize + Send + 'static,
) -> impl Stream<Item = Result<Bytes, anyhow::Error>> {
let (sender, receiver) = tokio::sync::mpsc::channel::<Result<Bytes, anyhow::Error>>(1);

tokio::task::spawn_blocking(move || {
serde_json::to_writer(std::io::BufWriter::new(Writer2Streamer { sender }), &data)
});
futures_util::stream::unfold(receiver, |mut receiver| async {
receiver.recv().await.map(|value| (value, receiver))
})
}

fn documents_by_query(
index_scheduler: &IndexScheduler,
index_uid: web::Path<String>,
Expand All @@ -239,12 +271,13 @@ fn documents_by_query(
let BrowseQuery { offset, limit, fields, filter } = query;

let index = index_scheduler.index(&index_uid)?;
let (total, documents) = retrieve_documents(&index, offset, limit, filter, fields)?;
let documents = retrieve_documents(index, offset, limit, filter, fields)?;

let ret = PaginationView::new(offset, limit, total as usize, documents);
let ret = PaginationView::new(offset, limit, documents.total_documents as usize, documents);

debug!(returns = ?ret, "Get documents");
Ok(HttpResponse::Ok().json(ret))

Ok(HttpResponse::Ok().streaming(stream(ret)))
}

#[derive(Deserialize, Debug, Deserr)]
Expand Down Expand Up @@ -590,14 +623,47 @@ fn some_documents<'a, 't: 'a>(
}))
}

fn retrieve_documents<S: AsRef<str>>(
index: &Index,
pub struct DocumentsStreamer {
attributes_to_retrieve: Option<Vec<String>>,
documents: RoaringBitmap,
rtxn: RoTxn<'static>,
index: Index,
pub total_documents: u64,
}

impl Serialize for DocumentsStreamer {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut seq = serializer.serialize_seq(Some(self.documents.len() as usize)).unwrap();

let documents = some_documents(&self.index, &self.rtxn, self.documents.iter()).unwrap();
for document in documents {
let document = document.unwrap();
let document = match self.attributes_to_retrieve {
Some(ref attributes_to_retrieve) => permissive_json_pointer::select_values(
&document,
attributes_to_retrieve.iter().map(|s| s.as_ref()),
),
None => document,
};

seq.serialize_element(&document)?;
}
seq.end()
}
}

fn retrieve_documents(
index: Index,
offset: usize,
limit: usize,
filter: Option<Value>,
attributes_to_retrieve: Option<Vec<S>>,
) -> Result<(u64, Vec<Document>), ResponseError> {
let rtxn = index.read_txn()?;
attributes_to_retrieve: Option<Vec<String>>,
) -> Result<DocumentsStreamer, ResponseError> {
let rtxn = index.static_read_txn()?;

let filter = &filter;
let filter = if let Some(filter) = filter {
parse_filter(filter)
Expand All @@ -607,7 +673,7 @@ fn retrieve_documents<S: AsRef<str>>(
};

let candidates = if let Some(filter) = filter {
filter.evaluate(&rtxn, index).map_err(|err| match err {
filter.evaluate(&rtxn, &index).map_err(|err| match err {
milli::Error::UserError(milli::UserError::InvalidFilter(_)) => {
ResponseError::from_msg(err.to_string(), Code::InvalidDocumentFilter)
}
Expand All @@ -617,27 +683,13 @@ fn retrieve_documents<S: AsRef<str>>(
index.documents_ids(&rtxn)?
};

let (it, number_of_documents) = {
let number_of_documents = candidates.len();
(
some_documents(index, &rtxn, candidates.into_iter().skip(offset).take(limit))?,
number_of_documents,
)
};

let documents: Result<Vec<_>, ResponseError> = it
.map(|document| {
Ok(match &attributes_to_retrieve {
Some(attributes_to_retrieve) => permissive_json_pointer::select_values(
&document?,
attributes_to_retrieve.iter().map(|s| s.as_ref()),
),
None => document?,
})
})
.collect();

Ok((number_of_documents, documents?))
Ok(DocumentsStreamer {
total_documents: candidates.len(),
attributes_to_retrieve,
documents: candidates.into_iter().skip(offset).take(limit).collect(),
rtxn,
index,
})
}

fn retrieve_document<S: AsRef<str>>(
Expand Down

0 comments on commit 59ecf1c

Please sign in to comment.