Skip to content

Commit

Permalink
perf: improve wal parquet load speed (#3378)
Browse files Browse the repository at this point in the history
Get parquet metadata take a lot of time when the schema is very big, so
we cache the metadata. it will more faster to load file for search.
  • Loading branch information
hengfeiyang committed Apr 30, 2024
1 parent 7c1da83 commit e6ce1c5
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 1 deletion.
8 changes: 8 additions & 0 deletions src/ingester/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,15 @@ mod stream;
mod wal;
mod writer;

use config::RwAHashMap;
pub use entry::Entry;
pub use immutable::read_from_immutable;
use once_cell::sync::Lazy;
pub use writer::{check_memtable_size, flush_all, get_writer, read_from_memtable, Writer};

pub static WAL_PARQUET_METADATA: Lazy<RwAHashMap<String, config::meta::stream::FileMeta>> =
Lazy::new(Default::default);

pub async fn init() -> errors::Result<()> {
// check uncompleted parquet files, need delete those files
wal::check_uncompleted_parquet_files().await?;
Expand All @@ -42,9 +47,12 @@ pub async fn init() -> errors::Result<()> {
));
interval.tick().await; // the first tick is immediate
loop {
// persist immutable data to disk
if let Err(e) = immutable::persist().await {
log::error!("persist error: {}", e);
}
// shrink metadata cache
WAL_PARQUET_METADATA.write().await.shrink_to_fit();
interval.tick().await;
}
});
Expand Down
19 changes: 18 additions & 1 deletion src/ingester/src/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ impl Partition {
stream_type: &str,
stream_name: &str,
) -> Result<(usize, Vec<(PathBuf, PersistStat)>)> {
let mut path = PathBuf::from(&CONFIG.common.data_wal_dir);
let base_path = PathBuf::from(&CONFIG.common.data_wal_dir);
let mut path = base_path.clone();
path.push("files");
path.push(org_id);
path.push(stream_type);
Expand Down Expand Up @@ -118,6 +119,7 @@ impl Partition {
.context(WriteParquetRecordBatchSnafu)?;
}
writer.close().await.context(WriteParquetRecordBatchSnafu)?;
file_meta.compressed_size = buf_parquet.len() as i64;

// write into local file
let file_name = generate_filename_with_time_range(file_meta.min_ts, file_meta.max_ts);
Expand All @@ -138,6 +140,21 @@ impl Partition {
.await
.context(WriteFileSnafu { path: path.clone() })?;

// set parquet metadata cache
let mut file_key = path.clone();
file_key.set_extension("parquet");
let file_key = file_key
.strip_prefix(base_path.clone())
.unwrap()
.to_string_lossy()
.replace('\\', "/")
.trim_start_matches('/')
.to_string();
super::WAL_PARQUET_METADATA
.write()
.await
.insert(file_key, file_meta);

// update metrics
metrics::INGEST_WAL_USED_BYTES
.with_label_values(&[org_id, stream_type])
Expand Down
7 changes: 7 additions & 0 deletions src/job/files/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use config::{
use datafusion::{arrow::json as arrow_json, datasource::MemTable, prelude::*};
use hashbrown::HashSet;
use infra::{cache, storage};
use ingester::WAL_PARQUET_METADATA;
use once_cell::sync::Lazy;
use parquet::arrow::ParquetRecordBatchStreamBuilder;
use tokio::{
Expand Down Expand Up @@ -165,6 +166,8 @@ async fn prepare_files() -> Result<FxIndexMap<String, Vec<FileKey>>, anyhow::Err
e
);
}
// delete metadata from cache
WAL_PARQUET_METADATA.write().await.remove(&file_key);
continue;
}
let prefix = file_key[..file_key.rfind('/').unwrap()].to_string();
Expand Down Expand Up @@ -221,6 +224,8 @@ async fn move_files(
e
);
}
// delete metadata from cache
WAL_PARQUET_METADATA.write().await.remove(&file.key);
PROCESSING_FILES.write().await.remove(&file.key);
}
return Ok(());
Expand Down Expand Up @@ -367,6 +372,8 @@ async fn move_files(
file.key,
e.to_string()
);
// delete metadata from cache
WAL_PARQUET_METADATA.write().await.remove(&file.key);
// need release all the files
for file in files_with_size.iter() {
PROCESSING_FILES.write().await.remove(&file.key);
Expand Down
12 changes: 12 additions & 0 deletions src/service/search/grpc/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use infra::{
errors::{Error, ErrorCodes},
schema::{unwrap_partition_time_level, unwrap_stream_settings},
};
use ingester::WAL_PARQUET_METADATA;
use tokio::time::Duration;
use tracing::{info_span, Instrument};

Expand Down Expand Up @@ -104,12 +105,23 @@ pub async fn search_parquet(
let mut new_files = Vec::with_capacity(files_num);
let files_metadata = futures::stream::iter(files)
.map(|file| async move {
let r = WAL_PARQUET_METADATA.read().await;
if let Some(meta) = r.get(file.key.as_str()) {
let mut file = file;
file.meta = meta.clone();
return file;
}
drop(r);
let source_file = CONFIG.common.data_wal_dir.to_string() + file.key.as_str();
let meta = read_metadata_from_file(&source_file.into())
.await
.unwrap_or_default();
let mut file = file;
file.meta = meta;
WAL_PARQUET_METADATA
.write()
.await
.insert(file.key.clone(), file.meta.clone());
file
})
.buffer_unordered(min(files_num, 10))
Expand Down

0 comments on commit e6ce1c5

Please sign in to comment.