From 0f4c2e27cf937e036262655794ff74bdb75a7f54 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Mon, 1 Jul 2024 16:40:07 +0900 Subject: [PATCH] Fixes bug that causes out-of-order sstable key. (#2445) The previous way to address the problem was to replace \u{0000} with 0 in different places. This logic had several flaws: Done on the serializer side (like it was for the columnar), there was a collision problem. If a document in the segment contained a json field with a \0 and antoher doc contained the same json field but `0` then we were sending the same field path twice to the serializer. Another option would have been to normalizes all values on the writer side. This PR simplifies the logic and simply ignore json path containing a \0, both in the columnar and the inverted index. Closes #2442 --- columnar/src/columnar/writer/mod.rs | 10 +++- columnar/src/columnar/writer/serializer.rs | 25 ++-------- src/core/json_utils.rs | 5 +- src/indexer/index_writer.rs | 55 +++++++++++++++++++--- src/indexer/mod.rs | 28 +++++++---- src/indexer/path_to_unordered_id.rs | 3 +- src/postings/json_postings_writer.rs | 4 +- src/schema/term.rs | 8 +--- 8 files changed, 91 insertions(+), 47 deletions(-) diff --git a/columnar/src/columnar/writer/mod.rs b/columnar/src/columnar/writer/mod.rs index 239a7422d6..9b118e2117 100644 --- a/columnar/src/columnar/writer/mod.rs +++ b/columnar/src/columnar/writer/mod.rs @@ -8,6 +8,7 @@ use std::net::Ipv6Addr; use column_operation::ColumnOperation; pub(crate) use column_writers::CompatibleNumericalTypes; +use common::json_path_writer::JSON_END_OF_PATH; use common::CountingWriter; pub(crate) use serializer::ColumnarSerializer; use stacker::{Addr, ArenaHashMap, MemoryArena}; @@ -247,6 +248,7 @@ impl ColumnarWriter { } pub fn serialize(&mut self, num_docs: RowId, wrt: &mut dyn io::Write) -> io::Result<()> { let mut serializer = ColumnarSerializer::new(wrt); + let mut columns: Vec<(&[u8], ColumnType, Addr)> = self .numerical_field_hash_map .iter() @@ -260,7 +262,7 @@ impl ColumnarWriter { columns.extend( self.bytes_field_hash_map .iter() - .map(|(term, addr)| (term, ColumnType::Bytes, addr)), + .map(|(column_name, addr)| (column_name, ColumnType::Bytes, addr)), ); columns.extend( self.str_field_hash_map @@ -287,6 +289,12 @@ impl ColumnarWriter { let (arena, buffers, dictionaries) = (&self.arena, &mut self.buffers, &self.dictionaries); let mut symbol_byte_buffer: Vec = Vec::new(); for (column_name, column_type, addr) in columns { + if column_name.contains(&JSON_END_OF_PATH) { + // Tantivy uses b'0' as a separator for nested fields in JSON. + // Column names with a b'0' are not simply ignored by the columnar (and the inverted + // index). + continue; + } match column_type { ColumnType::Bool => { let column_writer: ColumnWriter = self.bool_field_hash_map.read(addr); diff --git a/columnar/src/columnar/writer/serializer.rs b/columnar/src/columnar/writer/serializer.rs index 394e61cd96..0e5a1e2b24 100644 --- a/columnar/src/columnar/writer/serializer.rs +++ b/columnar/src/columnar/writer/serializer.rs @@ -1,6 +1,7 @@ use std::io; use std::io::Write; +use common::json_path_writer::JSON_END_OF_PATH; use common::{BinarySerializable, CountingWriter}; use sstable::value::RangeValueWriter; use sstable::RangeSSTable; @@ -18,13 +19,8 @@ pub struct ColumnarSerializer { /// code. fn prepare_key(key: &[u8], column_type: ColumnType, buffer: &mut Vec) { buffer.clear(); - // Convert 0 bytes to '0' string, as 0 bytes are reserved for the end of the path. - if key.contains(&0u8) { - buffer.extend(key.iter().map(|&b| if b == 0 { b'0' } else { b })); - } else { - buffer.extend_from_slice(key); - } - buffer.push(0u8); + buffer.extend_from_slice(key); + buffer.push(JSON_END_OF_PATH); buffer.push(column_type.to_code()); } @@ -97,18 +93,3 @@ impl<'a, W: io::Write> io::Write for ColumnSerializer<'a, W> { self.columnar_serializer.wrt.write_all(buf) } } - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_prepare_key_bytes() { - let mut buffer: Vec = b"somegarbage".to_vec(); - prepare_key(b"root\0child", ColumnType::Str, &mut buffer); - assert_eq!(buffer.len(), 12); - assert_eq!(&buffer[..10], b"root0child"); - assert_eq!(buffer[10], 0u8); - assert_eq!(buffer[11], ColumnType::Str.to_code()); - } -} diff --git a/src/core/json_utils.rs b/src/core/json_utils.rs index ee75bb354a..2767cd6d93 100644 --- a/src/core/json_utils.rs +++ b/src/core/json_utils.rs @@ -1,4 +1,4 @@ -use common::json_path_writer::JSON_PATH_SEGMENT_SEP; +use common::json_path_writer::{JSON_END_OF_PATH, JSON_PATH_SEGMENT_SEP}; use common::{replace_in_place, JsonPathWriter}; use rustc_hash::FxHashMap; @@ -83,6 +83,9 @@ fn index_json_object<'a, V: Value<'a>>( positions_per_path: &mut IndexingPositionsPerPath, ) { for (json_path_segment, json_value_visitor) in json_visitor { + if json_path_segment.as_bytes().contains(&JSON_END_OF_PATH) { + continue; + } json_path_writer.push(json_path_segment); index_json_value( doc, diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 4792574d06..3149127a08 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -815,8 +815,9 @@ mod tests { use crate::indexer::NoMergePolicy; use crate::query::{QueryParser, TermQuery}; use crate::schema::{ - self, Facet, FacetOptions, IndexRecordOption, IpAddrOptions, NumericOptions, - TextFieldIndexing, TextOptions, Value, FAST, INDEXED, STORED, STRING, TEXT, + self, Facet, FacetOptions, IndexRecordOption, IpAddrOptions, JsonObjectOptions, + NumericOptions, Schema, TextFieldIndexing, TextOptions, Value, FAST, INDEXED, STORED, + STRING, TEXT, }; use crate::store::DOCSTORE_CACHE_CAPACITY; use crate::{ @@ -2378,11 +2379,11 @@ mod tests { #[test] fn test_bug_1617_2() { - assert!(test_operation_strategy( + test_operation_strategy( &[ IndexingOp::AddDoc { id: 13, - value: Default::default() + value: Default::default(), }, IndexingOp::DeleteDoc { id: 13 }, IndexingOp::Commit, @@ -2390,9 +2391,9 @@ mod tests { IndexingOp::Commit, IndexingOp::Merge, ], - true + true, ) - .is_ok()); + .unwrap(); } #[test] @@ -2490,4 +2491,46 @@ mod tests { Ok(()) } + + #[test] + fn test_bug_2442_reserved_character_fast_field() -> crate::Result<()> { + let mut schema_builder = schema::Schema::builder(); + let json_field = schema_builder.add_json_field("json", FAST | TEXT); + + let schema = schema_builder.build(); + let index = Index::builder().schema(schema).create_in_ram()?; + let mut index_writer = index.writer_for_tests()?; + index_writer.set_merge_policy(Box::new(NoMergePolicy)); + + index_writer + .add_document(doc!( + json_field=>json!({"\u{0000}B":"1"}) + )) + .unwrap(); + index_writer + .add_document(doc!( + json_field=>json!({" A":"1"}) + )) + .unwrap(); + index_writer.commit()?; + + Ok(()) + } + + #[test] + fn test_bug_2442_reserved_character_columnar() -> crate::Result<()> { + let mut schema_builder = Schema::builder(); + let options = JsonObjectOptions::from(FAST).set_expand_dots_enabled(); + let field = schema_builder.add_json_field("json", options); + let index = Index::create_in_ram(schema_builder.build()); + let mut index_writer = index.writer_for_tests().unwrap(); + index_writer + .add_document(doc!(field=>json!({"\u{0000}": "A"}))) + .unwrap(); + index_writer + .add_document(doc!(field=>json!({format!("\u{0000}\u{0000}"): "A"}))) + .unwrap(); + index_writer.commit().unwrap(); + Ok(()) + } } diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index b3dfb2a6f8..1a83ccca17 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -145,15 +145,27 @@ mod tests_mmap { } } #[test] - fn test_json_field_null_byte() { - // Test when field name contains a zero byte, which has special meaning in tantivy. - // As a workaround, we convert the zero byte to the ASCII character '0'. - // https://github.com/quickwit-oss/tantivy/issues/2340 - // https://github.com/quickwit-oss/tantivy/issues/2193 - let field_name_in = "\u{0000}"; - let field_name_out = "0"; - test_json_field_name(field_name_in, field_name_out); + fn test_json_field_null_byte_is_ignored() { + let mut schema_builder = Schema::builder(); + let options = JsonObjectOptions::from(TEXT | FAST).set_expand_dots_enabled(); + let field = schema_builder.add_json_field("json", options); + let index = Index::create_in_ram(schema_builder.build()); + let mut index_writer = index.writer_for_tests().unwrap(); + index_writer + .add_document(doc!(field=>json!({"key": "test1", "invalidkey\u{0000}": "test2"}))) + .unwrap(); + index_writer.commit().unwrap(); + let reader = index.reader().unwrap(); + let searcher = reader.searcher(); + let segment_reader = searcher.segment_reader(0); + let inv_indexer = segment_reader.inverted_index(field).unwrap(); + let term_dict = inv_indexer.terms(); + assert_eq!(term_dict.num_terms(), 1); + let mut term_bytes = Vec::new(); + term_dict.ord_to_term(0, &mut term_bytes).unwrap(); + assert_eq!(term_bytes, b"key\0stest1"); } + #[test] fn test_json_field_1byte() { // Test when field name contains a '1' byte, which has special meaning in tantivy. diff --git a/src/indexer/path_to_unordered_id.rs b/src/indexer/path_to_unordered_id.rs index 054654f948..545a5a6861 100644 --- a/src/indexer/path_to_unordered_id.rs +++ b/src/indexer/path_to_unordered_id.rs @@ -38,7 +38,8 @@ impl PathToUnorderedId { #[cold] fn insert_new_path(&mut self, path: &str) -> u32 { let next_id = self.map.len() as u32; - self.map.insert(path.to_string(), next_id); + let new_path = path.to_string(); + self.map.insert(new_path, next_id); next_id } diff --git a/src/postings/json_postings_writer.rs b/src/postings/json_postings_writer.rs index 2fab8efa35..6d4e8addc8 100644 --- a/src/postings/json_postings_writer.rs +++ b/src/postings/json_postings_writer.rs @@ -59,7 +59,7 @@ impl PostingsWriter for JsonPostingsWriter { /// The actual serialization format is handled by the `PostingsSerializer`. fn serialize( &self, - term_addrs: &[(Field, OrderedPathId, &[u8], Addr)], + ordered_term_addrs: &[(Field, OrderedPathId, &[u8], Addr)], ordered_id_to_path: &[&str], ctx: &IndexingContext, serializer: &mut FieldSerializer, @@ -69,7 +69,7 @@ impl PostingsWriter for JsonPostingsWriter { term_buffer.clear_with_field_and_type(Type::Json, Field::from_field_id(0)); let mut prev_term_id = u32::MAX; let mut term_path_len = 0; // this will be set in the first iteration - for (_field, path_id, term, addr) in term_addrs { + for (_field, path_id, term, addr) in ordered_term_addrs { if prev_term_id != path_id.path_id() { term_buffer.truncate_value_bytes(0); term_buffer.append_path(ordered_id_to_path[path_id.path_id() as usize].as_bytes()); diff --git a/src/schema/term.rs b/src/schema/term.rs index b4d0d6288b..a0d1a8f677 100644 --- a/src/schema/term.rs +++ b/src/schema/term.rs @@ -249,12 +249,8 @@ impl Term { #[inline] pub fn append_path(&mut self, bytes: &[u8]) -> &mut [u8] { let len_before = self.0.len(); - if bytes.contains(&0u8) { - self.0 - .extend(bytes.iter().map(|&b| if b == 0 { b'0' } else { b })); - } else { - self.0.extend_from_slice(bytes); - } + assert!(!bytes.contains(&JSON_END_OF_PATH)); + self.0.extend_from_slice(bytes); &mut self.0[len_before..] } }