From d37633e03442f03ae3a04d8e9781c6a363ba87f1 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Mon, 21 Feb 2022 17:16:52 +0900 Subject: [PATCH] Minor changes in indexing. (#1285) --- benches/index-bench.rs | 21 +++-- src/indexer/segment_writer.rs | 35 ++++---- src/postings/postings_writer.rs | 38 ++++----- src/schema/field_type.rs | 145 ++++++++++++++++++++------------ src/schema/schema.rs | 8 +- 5 files changed, 140 insertions(+), 107 deletions(-) diff --git a/benches/index-bench.rs b/benches/index-bench.rs index d4fae63e2a..350305b1ba 100644 --- a/benches/index-bench.rs +++ b/benches/index-bench.rs @@ -4,6 +4,7 @@ use tantivy::schema::{INDEXED, STORED, STRING, TEXT}; use tantivy::Index; const HDFS_LOGS: &str = include_str!("hdfs.json"); +const NUM_REPEATS: usize = 10; pub fn hdfs_index_benchmark(c: &mut Criterion) { let schema = { @@ -27,7 +28,7 @@ pub fn hdfs_index_benchmark(c: &mut Criterion) { b.iter(|| { let index = Index::create_in_ram(schema.clone()); let index_writer = index.writer_with_num_threads(1, 100_000_000).unwrap(); - for _ in 0..10 { + for _ in 0..NUM_REPEATS { for doc_json in HDFS_LOGS.trim().split("\n") { let doc = schema.parse_document(doc_json).unwrap(); index_writer.add_document(doc).unwrap(); @@ -39,7 +40,7 @@ pub fn hdfs_index_benchmark(c: &mut Criterion) { b.iter(|| { let index = Index::create_in_ram(schema.clone()); let mut index_writer = index.writer_with_num_threads(1, 100_000_000).unwrap(); - for _ in 0..10 { + for _ in 0..NUM_REPEATS { for doc_json in HDFS_LOGS.trim().split("\n") { let doc = schema.parse_document(doc_json).unwrap(); index_writer.add_document(doc).unwrap(); @@ -52,9 +53,11 @@ pub fn hdfs_index_benchmark(c: &mut Criterion) { b.iter(|| { let index = Index::create_in_ram(schema_with_store.clone()); let index_writer = index.writer_with_num_threads(1, 100_000_000).unwrap(); - for doc_json in HDFS_LOGS.trim().split("\n") { - let doc = schema.parse_document(doc_json).unwrap(); - index_writer.add_document(doc).unwrap(); + for _ in 0..NUM_REPEATS { + for doc_json in HDFS_LOGS.trim().split("\n") { + let doc = schema.parse_document(doc_json).unwrap(); + index_writer.add_document(doc).unwrap(); + } } }) }); @@ -62,9 +65,11 @@ pub fn hdfs_index_benchmark(c: &mut Criterion) { b.iter(|| { let index = Index::create_in_ram(schema_with_store.clone()); let mut index_writer = index.writer_with_num_threads(1, 100_000_000).unwrap(); - for doc_json in HDFS_LOGS.trim().split("\n") { - let doc = schema.parse_document(doc_json).unwrap(); - index_writer.add_document(doc).unwrap(); + for _ in 0..NUM_REPEATS { + for doc_json in HDFS_LOGS.trim().split("\n") { + let doc = schema.parse_document(doc_json).unwrap(); + index_writer.add_document(doc).unwrap(); + } } index_writer.commit().unwrap(); }) diff --git a/src/indexer/segment_writer.rs b/src/indexer/segment_writer.rs index 746747e5a9..cbd2241fb2 100644 --- a/src/indexer/segment_writer.rs +++ b/src/indexer/segment_writer.rs @@ -55,7 +55,7 @@ fn remap_doc_opstamps( /// The segment is layed on disk when the segment gets `finalized`. pub struct SegmentWriter { pub(crate) max_doc: DocId, - pub(crate) indexing_context: IndexingContext, + pub(crate) ctx: IndexingContext, pub(crate) per_field_postings_writers: PerFieldPostingsWriter, pub(crate) segment_serializer: SegmentSerializer, pub(crate) fast_field_writers: FastFieldsWriter, @@ -101,7 +101,7 @@ impl SegmentWriter { .collect(); Ok(SegmentWriter { max_doc: 0, - indexing_context: IndexingContext::new(table_size), + ctx: IndexingContext::new(table_size), per_field_postings_writers, fieldnorms_writer: FieldNormsWriter::for_schema(&schema), segment_serializer, @@ -130,7 +130,7 @@ impl SegmentWriter { .transpose()?; remap_and_write( &self.per_field_postings_writers, - self.indexing_context, + self.ctx, &self.fast_field_writers, &self.fieldnorms_writer, &self.schema, @@ -142,7 +142,7 @@ impl SegmentWriter { } pub fn mem_usage(&self) -> usize { - self.indexing_context.mem_usage() + self.ctx.mem_usage() + self.fieldnorms_writer.mem_usage() + self.fast_field_writers.mem_usage() + self.segment_serializer.mem_usage() @@ -162,8 +162,7 @@ impl SegmentWriter { if !field_entry.is_indexed() { continue; } - let (term_buffer, indexing_context) = - (&mut self.term_buffer, &mut self.indexing_context); + let (term_buffer, ctx) = (&mut self.term_buffer, &mut self.ctx); let postings_writer: &mut dyn PostingsWriter = self.per_field_postings_writers.get_for_field_mut(field); match *field_entry.field_type() { @@ -177,12 +176,8 @@ impl SegmentWriter { .token_stream(facet_str) .process(&mut |token| { term_buffer.set_text(&token.text); - let unordered_term_id = postings_writer.subscribe( - doc_id, - 0u32, - term_buffer, - indexing_context, - ); + let unordered_term_id = + postings_writer.subscribe(doc_id, 0u32, term_buffer, ctx); // TODO pass indexing context directly in subscribe function unordered_term_id_opt = Some(unordered_term_id); }); @@ -229,7 +224,7 @@ impl SegmentWriter { field, &mut *token_stream, term_buffer, - indexing_context, + ctx, &mut indexing_position, ); } @@ -241,7 +236,7 @@ impl SegmentWriter { term_buffer.set_field(Type::U64, field); let u64_val = value.as_u64().ok_or_else(make_schema_error)?; term_buffer.set_u64(u64_val); - postings_writer.subscribe(doc_id, 0u32, term_buffer, indexing_context); + postings_writer.subscribe(doc_id, 0u32, term_buffer, ctx); } } FieldType::Date(_) => { @@ -249,7 +244,7 @@ impl SegmentWriter { term_buffer.set_field(Type::Date, field); let date_val = value.as_date().ok_or_else(make_schema_error)?; term_buffer.set_i64(date_val.timestamp()); - postings_writer.subscribe(doc_id, 0u32, term_buffer, indexing_context); + postings_writer.subscribe(doc_id, 0u32, term_buffer, ctx); } } FieldType::I64(_) => { @@ -257,7 +252,7 @@ impl SegmentWriter { term_buffer.set_field(Type::I64, field); let i64_val = value.as_i64().ok_or_else(make_schema_error)?; term_buffer.set_i64(i64_val); - postings_writer.subscribe(doc_id, 0u32, term_buffer, indexing_context); + postings_writer.subscribe(doc_id, 0u32, term_buffer, ctx); } } FieldType::F64(_) => { @@ -265,7 +260,7 @@ impl SegmentWriter { term_buffer.set_field(Type::F64, field); let f64_val = value.as_f64().ok_or_else(make_schema_error)?; term_buffer.set_f64(f64_val); - postings_writer.subscribe(doc_id, 0u32, term_buffer, indexing_context); + postings_writer.subscribe(doc_id, 0u32, term_buffer, ctx); } } FieldType::Bytes(_) => { @@ -273,7 +268,7 @@ impl SegmentWriter { term_buffer.set_field(Type::Bytes, field); let bytes = value.as_bytes().ok_or_else(make_schema_error)?; term_buffer.set_bytes(bytes); - postings_writer.subscribe(doc_id, 0u32, term_buffer, indexing_context); + postings_writer.subscribe(doc_id, 0u32, term_buffer, ctx); } } } @@ -324,7 +319,7 @@ impl SegmentWriter { /// `doc_id_map` is used to map to the new doc_id order. fn remap_and_write( per_field_postings_writers: &PerFieldPostingsWriter, - indexing_context: IndexingContext, + ctx: IndexingContext, fast_field_writers: &FastFieldsWriter, fieldnorms_writer: &FieldNormsWriter, schema: &Schema, @@ -339,7 +334,7 @@ fn remap_and_write( .open_read(SegmentComponent::FieldNorms)?; let fieldnorm_readers = FieldNormReaders::open(fieldnorm_data)?; let term_ord_map = serialize_postings( - indexing_context, + ctx, per_field_postings_writers, fieldnorm_readers, doc_id_map, diff --git a/src/postings/postings_writer.rs b/src/postings/postings_writer.rs index bee4c301c5..f4ca6394b6 100644 --- a/src/postings/postings_writer.rs +++ b/src/postings/postings_writer.rs @@ -49,7 +49,7 @@ fn make_field_partition( /// It pushes all term, one field at a time, towards the /// postings serializer. pub(crate) fn serialize_postings( - indexing_context: IndexingContext, + ctx: IndexingContext, per_field_postings_writers: &PerFieldPostingsWriter, fieldnorm_readers: FieldNormReaders, doc_id_map: Option<&DocIdMapping>, @@ -57,8 +57,8 @@ pub(crate) fn serialize_postings( serializer: &mut InvertedIndexSerializer, ) -> crate::Result>> { let mut term_offsets: Vec<(Term<&[u8]>, Addr, UnorderedTermId)> = - Vec::with_capacity(indexing_context.term_index.len()); - term_offsets.extend(indexing_context.term_index.iter()); + Vec::with_capacity(ctx.term_index.len()); + term_offsets.extend(ctx.term_index.iter()); term_offsets.sort_unstable_by_key(|(k, _, _)| k.clone()); let mut unordered_term_mappings: HashMap> = @@ -94,7 +94,7 @@ pub(crate) fn serialize_postings( postings_writer.serialize( &term_offsets[byte_offsets], doc_id_map, - &indexing_context, + &ctx, &mut field_serializer, )?; field_serializer.close()?; @@ -118,14 +118,14 @@ pub(crate) trait PostingsWriter { /// * doc - the document id /// * pos - the term position (expressed in tokens) /// * term - the term - /// * indexing_context - Contains a term hashmap and a memory arena to store all necessary - /// posting list information. + /// * ctx - Contains a term hashmap and a memory arena to store all necessary posting list + /// information. fn subscribe( &mut self, doc: DocId, pos: u32, term: &Term, - indexing_context: &mut IndexingContext, + ctx: &mut IndexingContext, ) -> UnorderedTermId; /// Serializes the postings on disk. @@ -134,7 +134,7 @@ pub(crate) trait PostingsWriter { &self, term_addrs: &[(Term<&[u8]>, Addr, UnorderedTermId)], doc_id_map: Option<&DocIdMapping>, - indexing_context: &IndexingContext, + ctx: &IndexingContext, serializer: &mut FieldSerializer, ) -> io::Result<()>; @@ -145,7 +145,7 @@ pub(crate) trait PostingsWriter { field: Field, token_stream: &mut dyn TokenStream, term_buffer: &mut Term, - indexing_context: &mut IndexingContext, + ctx: &mut IndexingContext, indexing_position: &mut IndexingPosition, ) { term_buffer.set_field(Type::Str, field); @@ -165,7 +165,7 @@ pub(crate) trait PostingsWriter { term_buffer.set_text(token.text.as_str()); let start_position = indexing_position.end_position + token.position as u32; end_position = start_position + token.position_length as u32; - self.subscribe(doc_id, start_position, term_buffer, indexing_context); + self.subscribe(doc_id, start_position, term_buffer, ctx); num_tokens += 1; }); indexing_position.end_position = end_position + POSITION_GAP; @@ -203,14 +203,11 @@ impl PostingsWriter for SpecializedPostingsWriter doc: DocId, position: u32, term: &Term, - indexing_context: &mut IndexingContext, + ctx: &mut IndexingContext, ) -> UnorderedTermId { debug_assert!(term.as_slice().len() >= 4); self.total_num_tokens += 1; - let (term_index, arena) = ( - &mut indexing_context.term_index, - &mut indexing_context.arena, - ); + let (term_index, arena) = (&mut ctx.term_index, &mut ctx.arena); term_index.mutate_or_create(term.as_slice(), |opt_recorder: Option| { if let Some(mut recorder) = opt_recorder { let current_doc = recorder.current_doc(); @@ -233,20 +230,15 @@ impl PostingsWriter for SpecializedPostingsWriter &self, term_addrs: &[(Term<&[u8]>, Addr, UnorderedTermId)], doc_id_map: Option<&DocIdMapping>, - indexing_context: &IndexingContext, + ctx: &IndexingContext, serializer: &mut FieldSerializer, ) -> io::Result<()> { let mut buffer_lender = BufferLender::default(); for (term, addr, _) in term_addrs { - let recorder: Rec = indexing_context.term_index.read(*addr); + let recorder: Rec = ctx.term_index.read(*addr); let term_doc_freq = recorder.term_doc_freq().unwrap_or(0u32); serializer.new_term(term.value_bytes(), term_doc_freq)?; - recorder.serialize( - &indexing_context.arena, - doc_id_map, - serializer, - &mut buffer_lender, - ); + recorder.serialize(&ctx.arena, doc_id_map, serializer, &mut buffer_lender); serializer.close_term()?; } Ok(()) diff --git a/src/schema/field_type.rs b/src/schema/field_type.rs index 6b995773ae..c0e429a947 100644 --- a/src/schema/field_type.rs +++ b/src/schema/field_type.rs @@ -1,6 +1,7 @@ use chrono::{FixedOffset, Utc}; use serde::{Deserialize, Serialize}; use serde_json::Value as JsonValue; +use thiserror::Error; use crate::schema::bytes_options::BytesOptions; use crate::schema::facet_options::FacetOptions; @@ -9,17 +10,20 @@ use crate::tokenizer::PreTokenizedString; /// Possible error that may occur while parsing a field value /// At this point the JSON is known to be valid. -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Error)] pub enum ValueParsingError { - /// Encountered a numerical value that overflows or underflow its integer type. - OverflowError(String), - /// The json node is not of the correct type. - /// (e.g. 3 for a `Str` type or `"abc"` for a u64 type) - /// Tantivy will try to autocast values. - TypeError(String), - /// The json node is a string but contains json that is - /// not valid base64. - InvalidBase64(String), + #[error("Overflow error. Expected {expected}, got {json}")] + OverflowError { + expected: &'static str, + json: serde_json::Value, + }, + #[error("Type error. Expected {expected}, got {json}")] + TypeError { + expected: &'static str, + json: serde_json::Value, + }, + #[error("Invalid base64: {base64}")] + InvalidBase64 { base64: String }, } /// Type of the value that a field can take. @@ -67,6 +71,19 @@ impl Type { *self as u8 } + /// Returns a human readable name for the Type. + pub fn name(&self) -> &'static str { + match self { + Type::Str => "Str", + Type::U64 => "U64", + Type::I64 => "I64", + Type::F64 => "F64", + Type::Date => "Date", + Type::Facet => "Facet", + Type::Bytes => "Bytes", + } + } + /// Interprets a 1byte code as a type. /// Returns None if the code is invalid. pub fn from_code(code: u8) -> Option { @@ -133,6 +150,24 @@ impl FieldType { } } + /// Returns the index record option for the field. + /// + /// If the field is not indexed, returns `None`. + pub fn index_record_option(&self) -> Option { + match self { + FieldType::Str(text_options) => text_options + .get_indexing_options() + .map(|text_indexing| text_indexing.index_option()), + field_type => { + if field_type.is_indexed() { + Some(IndexRecordOption::Basic) + } else { + None + } + } + } + } + /// returns true iff the field is normed. pub fn has_fieldnorms(&self) -> bool { match *self { @@ -152,6 +187,10 @@ impl FieldType { /// Given a field configuration, return the maximal possible /// `IndexRecordOption` available. /// + /// For the Json object, this does not necessarily mean it is the index record + /// option level is available for all terms. + /// (Non string terms have the Basic indexing option at most.) + /// /// If the field is not indexed, then returns `None`. pub fn get_index_record_option(&self) -> Option { match *self { @@ -189,25 +228,26 @@ impl FieldType { JsonValue::String(ref field_text) => match *self { FieldType::Date(_) => { let dt_with_fixed_tz: chrono::DateTime = - chrono::DateTime::parse_from_rfc3339(field_text).map_err(|err| { - ValueParsingError::TypeError(format!( - "Failed to parse date from JSON. Expected rfc3339 format, got {}. \ - {:?}", - field_text, err - )) + chrono::DateTime::parse_from_rfc3339(field_text).map_err(|_err| { + ValueParsingError::TypeError { + expected: "rfc3339 format", + json: json.clone(), + } })?; Ok(Value::Date(dt_with_fixed_tz.with_timezone(&Utc))) } FieldType::Str(_) => Ok(Value::Str(field_text.clone())), - FieldType::U64(_) | FieldType::I64(_) | FieldType::F64(_) => Err( - ValueParsingError::TypeError(format!("Expected an integer, got {:?}", json)), - ), + FieldType::U64(_) | FieldType::I64(_) | FieldType::F64(_) => { + Err(ValueParsingError::TypeError { + expected: "an integer", + json: json.clone(), + }) + } FieldType::Facet(_) => Ok(Value::Facet(Facet::from(field_text))), FieldType::Bytes(_) => base64::decode(field_text).map(Value::Bytes).map_err(|_| { - ValueParsingError::InvalidBase64(format!( - "Expected base64 string, got {:?}", - field_text - )) + ValueParsingError::InvalidBase64 { + base64: field_text.clone(), + } }), }, JsonValue::Number(ref field_val_num) => match *self { @@ -215,29 +255,37 @@ impl FieldType { if let Some(field_val_i64) = field_val_num.as_i64() { Ok(Value::I64(field_val_i64)) } else { - let msg = format!("Expected an i64 int, got {:?}", json); - Err(ValueParsingError::OverflowError(msg)) + Err(ValueParsingError::OverflowError { + expected: "an i64 int", + json: json.clone(), + }) } } FieldType::U64(_) => { if let Some(field_val_u64) = field_val_num.as_u64() { Ok(Value::U64(field_val_u64)) } else { - let msg = format!("Expected a u64 int, got {:?}", json); - Err(ValueParsingError::OverflowError(msg)) + Err(ValueParsingError::OverflowError { + expected: "u64", + json: json.clone(), + }) } } FieldType::F64(_) => { if let Some(field_val_f64) = field_val_num.as_f64() { Ok(Value::F64(field_val_f64)) } else { - let msg = format!("Expected a f64 int, got {:?}", json); - Err(ValueParsingError::OverflowError(msg)) + Err(ValueParsingError::OverflowError { + expected: "a f64", + json: json.clone(), + }) } } FieldType::Str(_) | FieldType::Facet(_) | FieldType::Bytes(_) => { - let msg = format!("Expected a string, got {:?}", json); - Err(ValueParsingError::TypeError(msg)) + Err(ValueParsingError::TypeError { + expected: "a string", + json: json.clone(), + }) } }, JsonValue::Object(_) => match *self { @@ -247,28 +295,21 @@ impl FieldType { { Ok(Value::PreTokStr(tok_str_val)) } else { - let msg = format!( - "Json value {:?} cannot be translated to PreTokenizedString.", - json - ); - Err(ValueParsingError::TypeError(msg)) + Err(ValueParsingError::TypeError { + expected: "a string or an pretokenized string", + json: json.clone(), + }) } } - _ => { - let msg = format!( - "Json value not supported error {:?}. Expected {:?}", - json, self - ); - Err(ValueParsingError::TypeError(msg)) - } + _ => Err(ValueParsingError::TypeError { + expected: self.value_type().name(), + json: json.clone(), + }), }, - _ => { - let msg = format!( - "Json value not supported error {:?}. Expected {:?}", - json, self - ); - Err(ValueParsingError::TypeError(msg)) - } + _ => Err(ValueParsingError::TypeError { + expected: self.value_type().name(), + json: json.clone(), + }), } } } @@ -317,13 +358,13 @@ mod tests { let result = FieldType::Bytes(Default::default()).value_from_json(&json!(521)); match result { - Err(ValueParsingError::TypeError(_)) => {} + Err(ValueParsingError::TypeError { .. }) => {} _ => panic!("Expected parse failure for wrong type"), } let result = FieldType::Bytes(Default::default()).value_from_json(&json!("-")); match result { - Err(ValueParsingError::InvalidBase64(_)) => {} + Err(ValueParsingError::InvalidBase64 { .. }) => {} _ => panic!("Expected parse failure for invalid base64"), } } diff --git a/src/schema/schema.rs b/src/schema/schema.rs index 4f62d57e61..b501ee70b6 100644 --- a/src/schema/schema.rs +++ b/src/schema/schema.rs @@ -666,7 +666,7 @@ mod tests { json_err, Err(DocParsingError::ValueError( _, - ValueParsingError::TypeError(_) + ValueParsingError::TypeError { .. } )) ); } @@ -684,7 +684,7 @@ mod tests { json_err, Err(DocParsingError::ValueError( _, - ValueParsingError::OverflowError(_) + ValueParsingError::OverflowError { .. } )) ); } @@ -702,7 +702,7 @@ mod tests { json_err, Err(DocParsingError::ValueError( _, - ValueParsingError::OverflowError(_) + ValueParsingError::OverflowError { .. } )) )); } @@ -720,7 +720,7 @@ mod tests { json_err, Err(DocParsingError::ValueError( _, - ValueParsingError::OverflowError(_) + ValueParsingError::OverflowError { .. } )) ); }