From a35d3ff6e808896366a6b66671f6832f32dfb4da Mon Sep 17 00:00:00 2001 From: Adrien Guillo Date: Thu, 11 Jan 2024 16:20:17 -0500 Subject: [PATCH] Create empty shards if necessary --- .../file_backed_index/serialize.rs | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/file_backed_index/serialize.rs b/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/file_backed_index/serialize.rs index b05a50ebdc5..8627be9c5a6 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/file_backed_index/serialize.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/file_backed_index/serialize.rs @@ -21,6 +21,7 @@ use std::collections::HashMap; use itertools::Itertools; use quickwit_doc_mapper::{BinaryFormat, FieldMappingType}; +use quickwit_proto::metastore::SourceType; use quickwit_proto::types::SourceId; use serde::{Deserialize, Serialize}; @@ -59,6 +60,7 @@ pub(crate) struct FileBackedIndexV0_7 { #[serde(rename = "index")] metadata: IndexMetadata, splits: Vec, + // TODO: Remove `skip_serializing_if` when we release ingest v2. #[serde(default, skip_serializing_if = "HashMap::is_empty")] shards: HashMap, #[serde(default)] @@ -76,6 +78,7 @@ impl From for FileBackedIndexV0_7 { .per_source_shards .into_iter() .filter_map(|(source_id, shards)| { + // TODO: Remove this filter when we release ingest v2. // Skip serializing empty shards since the feature is hidden and disabled by // default. This way, we can still modify the serialization format without worrying // about backward compatibility post `0.7`. @@ -130,7 +133,7 @@ impl From for FileBackedIndex { split.split_metadata.index_uid = index.metadata.index_uid.clone(); } } - let shards = index + let mut shards: HashMap = index .shards .into_iter() .map(|(source_id, serde_shards)| { @@ -141,6 +144,16 @@ impl From for FileBackedIndex { ) }) .collect(); + // TODO: Remove this when we release ingest v2. + for source in index.metadata.sources.values() { + if source.source_type() == SourceType::IngestV2 + && !shards.contains_key(&source.source_id) + { + let index_uid = index.metadata.index_uid.clone(); + let source_id = source.source_id.clone(); + shards.insert(source_id.clone(), Shards::empty(index_uid, source_id)); + } + } Self::new(index.metadata, index.splits, shards, index.delete_tasks) } }