Skip to content

Commit

Permalink
Fixes bug that causes out-of-order sstable key.
Browse files Browse the repository at this point in the history
The previous way to address the problem was to replace \u{0000}
with 0 in different places.

This logic had several flaws:
Done on the serializer side (like it was for the columnar), there was
a collision problem.

If a document in the segment contained a json field with a \0 and
antoher doc contained the same json field but `0` then we were sending
the same field path twice to the serializer.

Another option would have been to normalizes all values on the writer
side.

This PR simplifies the logic and simply ignore json path containing a
\0, both in the columnar and the inverted index.

Closes #2442
  • Loading branch information
PSeitz authored and fulmicoton committed Jun 25, 2024
1 parent 5908414 commit f26e837
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 30 deletions.
9 changes: 8 additions & 1 deletion columnar/src/columnar/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::net::Ipv6Addr;

use column_operation::ColumnOperation;
pub(crate) use column_writers::CompatibleNumericalTypes;
use common::json_path_writer::JSON_END_OF_PATH;
use common::CountingWriter;
pub(crate) use serializer::ColumnarSerializer;
use stacker::{Addr, ArenaHashMap, MemoryArena};
Expand Down Expand Up @@ -247,6 +248,7 @@ impl ColumnarWriter {
}
pub fn serialize(&mut self, num_docs: RowId, wrt: &mut dyn io::Write) -> io::Result<()> {
let mut serializer = ColumnarSerializer::new(wrt);

let mut columns: Vec<(&[u8], ColumnType, Addr)> = self
.numerical_field_hash_map
.iter()
Expand All @@ -260,7 +262,7 @@ impl ColumnarWriter {
columns.extend(
self.bytes_field_hash_map
.iter()
.map(|(term, addr)| (term, ColumnType::Bytes, addr)),
.map(|(column_name, addr)| (column_name, ColumnType::Bytes, addr)),
);
columns.extend(
self.str_field_hash_map
Expand All @@ -287,6 +289,11 @@ impl ColumnarWriter {
let (arena, buffers, dictionaries) = (&self.arena, &mut self.buffers, &self.dictionaries);
let mut symbol_byte_buffer: Vec<u8> = Vec::new();
for (column_name, column_type, addr) in columns {
if column_name.contains(&JSON_END_OF_PATH) {
// Tantivy uses b'0' as a separator for nested fields in JSON.
// Column names with a b'0' are not simply ignored by the columnar (and the inverted index).
continue;
}
match column_type {
ColumnType::Bool => {
let column_writer: ColumnWriter = self.bool_field_hash_map.read(addr);
Expand Down
10 changes: 3 additions & 7 deletions columnar/src/columnar/writer/serializer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::io;
use std::io::Write;

use common::json_path_writer::JSON_END_OF_PATH;
use common::{BinarySerializable, CountingWriter};
use sstable::value::RangeValueWriter;
use sstable::RangeSSTable;
Expand All @@ -18,13 +19,8 @@ pub struct ColumnarSerializer<W: io::Write> {
/// code.
fn prepare_key(key: &[u8], column_type: ColumnType, buffer: &mut Vec<u8>) {
buffer.clear();
// Convert 0 bytes to '0' string, as 0 bytes are reserved for the end of the path.
if key.contains(&0u8) {
buffer.extend(key.iter().map(|&b| if b == 0 { b'0' } else { b }));
} else {
buffer.extend_from_slice(key);
}
buffer.push(0u8);
buffer.extend_from_slice(key);
buffer.push(JSON_END_OF_PATH);
buffer.push(column_type.to_code());
}

Expand Down
5 changes: 4 additions & 1 deletion src/core/json_utils.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use common::json_path_writer::JSON_PATH_SEGMENT_SEP;
use common::json_path_writer::{JSON_END_OF_PATH, JSON_PATH_SEGMENT_SEP};
use common::{replace_in_place, JsonPathWriter};
use rustc_hash::FxHashMap;

Expand Down Expand Up @@ -83,6 +83,9 @@ fn index_json_object<'a, V: Value<'a>>(
positions_per_path: &mut IndexingPositionsPerPath,
) {
for (json_path_segment, json_value_visitor) in json_visitor {
if json_path_segment.as_bytes().contains(&JSON_END_OF_PATH) {
continue;
}
json_path_writer.push(json_path_segment);
index_json_value(
doc,
Expand Down
52 changes: 48 additions & 4 deletions src/indexer/index_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -815,8 +815,7 @@ mod tests {
use crate::indexer::NoMergePolicy;
use crate::query::{QueryParser, TermQuery};
use crate::schema::{
self, Facet, FacetOptions, IndexRecordOption, IpAddrOptions, NumericOptions,
TextFieldIndexing, TextOptions, Value, FAST, INDEXED, STORED, STRING, TEXT,
self, Facet, FacetOptions, IndexRecordOption, IpAddrOptions, JsonObjectOptions, NumericOptions, Schema, TextFieldIndexing, TextOptions, Value, FAST, INDEXED, STORED, STRING, TEXT
};
use crate::store::DOCSTORE_CACHE_CAPACITY;
use crate::{
Expand Down Expand Up @@ -2378,7 +2377,7 @@ mod tests {

#[test]
fn test_bug_1617_2() {
assert!(test_operation_strategy(
test_operation_strategy(
&[
IndexingOp::AddDoc {
id: 13,
Expand All @@ -2392,7 +2391,7 @@ mod tests {
],
true
)
.is_ok());
.unwrap();
}

#[test]
Expand Down Expand Up @@ -2490,4 +2489,49 @@ mod tests {

Ok(())
}

#[test]
fn test_bug_2442_reserved_character_fast_field() -> crate::Result<()> {
let mut schema_builder = schema::Schema::builder();
let json_field = schema_builder.add_json_field("json", FAST | TEXT);

let schema = schema_builder.build();
let index = Index::builder().schema(schema).create_in_ram()?;
let mut index_writer = index.writer_for_tests()?;
index_writer.set_merge_policy(Box::new(NoMergePolicy));

index_writer
.add_document(doc!(
json_field=>json!({"\u{0000}B":"1"})
))
.unwrap();
index_writer
.add_document(doc!(
json_field=>json!({" A":"1"})
))
.unwrap();
index_writer.commit()?;

Ok(())
}


#[test]
fn test_bug_2442_reserved_character_columnar() -> crate::Result<()> {
let mut schema_builder = Schema::builder();
let options = JsonObjectOptions::from(FAST).set_expand_dots_enabled();
let field = schema_builder.add_json_field("json", options);
let index = Index::create_in_ram(schema_builder.build());
let mut index_writer = index.writer_for_tests().unwrap();
index_writer
.add_document(doc!(field=>json!({"\u{0000}": "A"})))
.unwrap();
index_writer
.add_document(
doc!(field=>json!({format!("\u{0000}\u{0000}"): "A"})),
)
.unwrap();
index_writer.commit().unwrap();
Ok(())
}
}
28 changes: 20 additions & 8 deletions src/indexer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,15 +145,27 @@ mod tests_mmap {
}
}
#[test]
fn test_json_field_null_byte() {
// Test when field name contains a zero byte, which has special meaning in tantivy.
// As a workaround, we convert the zero byte to the ASCII character '0'.
// https://github.com/quickwit-oss/tantivy/issues/2340
// https://github.com/quickwit-oss/tantivy/issues/2193
let field_name_in = "\u{0000}";
let field_name_out = "0";
test_json_field_name(field_name_in, field_name_out);
fn test_json_field_null_byte_is_ignored() {
let mut schema_builder = Schema::builder();
let options = JsonObjectOptions::from(TEXT | FAST).set_expand_dots_enabled();
let field = schema_builder.add_json_field("json", options);
let index = Index::create_in_ram(schema_builder.build());
let mut index_writer = index.writer_for_tests().unwrap();
index_writer
.add_document(doc!(field=>json!({"key": "test1", "invalidkey\u{0000}": "test2"})))
.unwrap();
index_writer.commit().unwrap();
let reader = index.reader().unwrap();
let searcher = reader.searcher();
let segment_reader = searcher.segment_reader(0);
let inv_indexer = segment_reader.inverted_index(field).unwrap();
let term_dict = inv_indexer.terms();
assert_eq!(term_dict.num_terms(), 1);
let mut term_bytes = Vec::new();
term_dict.ord_to_term(0, &mut term_bytes).unwrap();
assert_eq!(term_bytes, b"key\0stest1");
}

#[test]
fn test_json_field_1byte() {
// Test when field name contains a '1' byte, which has special meaning in tantivy.
Expand Down
3 changes: 2 additions & 1 deletion src/indexer/path_to_unordered_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ impl PathToUnorderedId {
#[cold]
fn insert_new_path(&mut self, path: &str) -> u32 {
let next_id = self.map.len() as u32;
self.map.insert(path.to_string(), next_id);
let new_path = path.to_string();
self.map.insert(new_path, next_id);
next_id
}

Expand Down
4 changes: 2 additions & 2 deletions src/postings/json_postings_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl<Rec: Recorder> PostingsWriter for JsonPostingsWriter<Rec> {
/// The actual serialization format is handled by the `PostingsSerializer`.
fn serialize(
&self,
term_addrs: &[(Field, OrderedPathId, &[u8], Addr)],
ordered_term_addrs: &[(Field, OrderedPathId, &[u8], Addr)],
ordered_id_to_path: &[&str],
ctx: &IndexingContext,
serializer: &mut FieldSerializer,
Expand All @@ -69,7 +69,7 @@ impl<Rec: Recorder> PostingsWriter for JsonPostingsWriter<Rec> {
term_buffer.clear_with_field_and_type(Type::Json, Field::from_field_id(0));
let mut prev_term_id = u32::MAX;
let mut term_path_len = 0; // this will be set in the first iteration
for (_field, path_id, term, addr) in term_addrs {
for (_field, path_id, term, addr) in ordered_term_addrs {
if prev_term_id != path_id.path_id() {
term_buffer.truncate_value_bytes(0);
term_buffer.append_path(ordered_id_to_path[path_id.path_id() as usize].as_bytes());
Expand Down
8 changes: 2 additions & 6 deletions src/schema/term.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,12 +249,8 @@ impl Term {
#[inline]
pub fn append_path(&mut self, bytes: &[u8]) -> &mut [u8] {
let len_before = self.0.len();
if bytes.contains(&0u8) {
self.0
.extend(bytes.iter().map(|&b| if b == 0 { b'0' } else { b }));
} else {
self.0.extend_from_slice(bytes);
}
assert!(!bytes.contains(&JSON_END_OF_PATH));
self.0.extend_from_slice(bytes);
&mut self.0[len_before..]
}
}
Expand Down

0 comments on commit f26e837

Please sign in to comment.