Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: improve ingestion load and flatten #3218

Merged
merged 2 commits into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,8 @@ pub struct Limit {
pub query_group_base_speed: usize,
#[env_config(name = "ZO_INGEST_ALLOWED_UPTO", default = 5)] // in hours - in past
pub ingest_allowed_upto: i64,
#[env_config(name = "ZO_INGEST_FLATTEN_LEVEL", default = 3)] // default flatten level
pub ingest_flatten_level: u32,
#[env_config(name = "ZO_IGNORE_FILE_RETENTION_BY_STREAM", default = false)]
pub ignore_file_retention_by_stream: bool,
#[env_config(name = "ZO_LOGS_FILE_RETENTION", default = "hourly")]
Expand Down
121 changes: 115 additions & 6 deletions src/config/src/utils/flatten.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ use serde_json::value::{Map, Value};

const KEY_SEPARATOR: &str = "_";

#[inline]
pub fn flatten(to_flatten: Value) -> Result<Value, anyhow::Error> {
flatten_with_level(to_flatten, 0)
}

/// Flattens the provided JSON object (`current`).
///
/// It will return an error if flattening the object would make two keys to be
Expand All @@ -26,7 +31,7 @@ const KEY_SEPARATOR: &str = "_";
/// # Errors
/// Will return `Err` if `to_flatten` it's not an object, or if flattening the
/// object would result in two or more keys colliding.
pub fn flatten(to_flatten: Value) -> Result<Value, anyhow::Error> {
pub fn flatten_with_level(to_flatten: Value, max_level: u32) -> Result<Value, anyhow::Error> {
// quick check to see if we have an object`
let to_flatten = match to_flatten {
Value::Object(v) => {
Expand All @@ -49,7 +54,7 @@ pub fn flatten(to_flatten: Value) -> Result<Value, anyhow::Error> {
};

let mut flat = Map::<String, Value>::new();
flatten_value(to_flatten, "".to_owned(), 0, &mut flat).map(|_x| Value::Object(flat))
flatten_value(to_flatten, "".to_owned(), max_level, 0, &mut flat).map(|_x| Value::Object(flat))
}

/// Flattens the passed JSON value (`current`), whose path is `parent_key` and
Expand All @@ -58,15 +63,16 @@ pub fn flatten(to_flatten: Value) -> Result<Value, anyhow::Error> {
fn flatten_value(
current: Value,
parent_key: String,
max_level: u32,
depth: u32,
flattened: &mut Map<String, Value>,
) -> Result<(), anyhow::Error> {
match current {
Value::Object(map) => {
flatten_object(map, &parent_key, depth, flattened)?;
flatten_object(map, &parent_key, max_level, depth, flattened)?;
}
Value::Array(arr) => {
flatten_array(arr, &parent_key, depth, flattened)?;
flatten_array(arr, &parent_key, max_level, depth, flattened)?;
}
_ => {
flattened.insert(parent_key, current);
Expand All @@ -81,17 +87,26 @@ fn flatten_value(
fn flatten_object(
current: Map<String, Value>,
parent_key: &str,
max_level: u32,
depth: u32,
flattened: &mut Map<String, Value>,
) -> Result<(), anyhow::Error> {
if current.is_empty() {
return Ok(());
}
if max_level > 0 && depth >= max_level {
let v = Value::String(Value::Object(current).to_string());
flatten_value(v, parent_key.to_string(), max_level, depth, flattened)?;
return Ok(());
}
for (mut k, v) in current.into_iter() {
format_key(&mut k);
let parent_key = if depth > 0 {
format!("{}{}{}", parent_key, KEY_SEPARATOR, k)
} else {
k
};
flatten_value(v, parent_key, depth + 1, flattened)?;
flatten_value(v, parent_key, max_level, depth + 1, flattened)?;
}
Ok(())
}
Expand All @@ -102,6 +117,7 @@ fn flatten_object(
fn flatten_array(
current: Vec<Value>,
parent_key: &str,
max_level: u32,
depth: u32,
flattened: &mut Map<String, Value>,
) -> Result<(), anyhow::Error> {
Expand All @@ -113,7 +129,7 @@ fn flatten_array(
// flatten_value(obj, parent_key, depth + 1, flattened)?;
// }
let v = Value::String(Value::Array(current.to_vec()).to_string());
flatten_value(v, parent_key.to_string(), depth, flattened)?;
flatten_value(v, parent_key.to_string(), max_level, depth, flattened)?;
Ok(())
}

Expand Down Expand Up @@ -386,4 +402,97 @@ mod tests {
let output = flatten(input).unwrap();
assert_eq!(output, expected_output);
}

#[test]
fn test_flatten_with_level() {
let input = json!({
"firstName": "John",
"lastName": "Doe",
"age": 25,
"info": {
"address": {
"streetAddress": "123 Main St",
"city": "Anytown",
"state": "CA",
"postalCode": "12345",
"phoneNumbers": {
"type": "home",
"number": "555-555-1234"
}
},
"phoneNumbers": [
{
"type": "home",
"number": "555-555-1234"
},
{
"type": "work",
"number": "555-555-5678"
}
]
}
});

let expected_output_level0 = json!({
"firstname": "John",
"lastname": "Doe",
"age": 25,
"info_address_streetaddress": "123 Main St",
"info_address_city": "Anytown",
"info_address_state": "CA",
"info_address_postalcode": "12345",
"info_address_phonenumbers_number": "555-555-1234",
"info_address_phonenumbers_type": "home",
"info_phonenumbers": "[{\"number\":\"555-555-1234\",\"type\":\"home\"},{\"number\":\"555-555-5678\",\"type\":\"work\"}]"
});
let expected_output_level1 = json!({
"firstname": "John",
"lastname": "Doe",
"age": 25,
"info": "{\"address\":{\"city\":\"Anytown\",\"phoneNumbers\":{\"number\":\"555-555-1234\",\"type\":\"home\"},\"postalCode\":\"12345\",\"state\":\"CA\",\"streetAddress\":\"123 Main St\"},\"phoneNumbers\":[{\"number\":\"555-555-1234\",\"type\":\"home\"},{\"number\":\"555-555-5678\",\"type\":\"work\"}]}"
});
let expected_output_level2 = json!({
"firstname": "John",
"lastname": "Doe",
"age": 25,
"info_address": "{\"city\":\"Anytown\",\"phoneNumbers\":{\"number\":\"555-555-1234\",\"type\":\"home\"},\"postalCode\":\"12345\",\"state\":\"CA\",\"streetAddress\":\"123 Main St\"}",
"info_phonenumbers": "[{\"number\":\"555-555-1234\",\"type\":\"home\"},{\"number\":\"555-555-5678\",\"type\":\"work\"}]"
});
let expected_output_level3 = json!({
"firstname": "John",
"lastname": "Doe",
"age": 25,
"info_address_streetaddress": "123 Main St",
"info_address_city": "Anytown",
"info_address_state": "CA",
"info_address_postalcode": "12345",
"info_address_phonenumbers": "{\"number\":\"555-555-1234\",\"type\":\"home\"}",
"info_phonenumbers": "[{\"number\":\"555-555-1234\",\"type\":\"home\"},{\"number\":\"555-555-5678\",\"type\":\"work\"}]"
});
let expected_output_level4 = json!({
"firstname": "John",
"lastname": "Doe",
"age": 25,
"info_address_streetaddress": "123 Main St",
"info_address_city": "Anytown",
"info_address_state": "CA",
"info_address_postalcode": "12345",
"info_address_phonenumbers_number": "555-555-1234",
"info_address_phonenumbers_type": "home",
"info_phonenumbers": "[{\"number\":\"555-555-1234\",\"type\":\"home\"},{\"number\":\"555-555-5678\",\"type\":\"work\"}]"
});

let output = flatten_with_level(input.clone(), 0).unwrap();
assert_eq!(output, expected_output_level0);
let output = flatten_with_level(input.clone(), 1).unwrap();
assert_eq!(output, expected_output_level1);
let output = flatten_with_level(input.clone(), 2).unwrap();
assert_eq!(output, expected_output_level2);
let output = flatten_with_level(input.clone(), 3).unwrap();
assert_eq!(output, expected_output_level3);
let output = flatten_with_level(input.clone(), 4).unwrap();
assert_eq!(output, expected_output_level4);
let output = flatten_with_level(input, 5).unwrap();
assert_eq!(output, expected_output_level4);
}
}
57 changes: 39 additions & 18 deletions src/job/files/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ pub async fn run() -> Result<(), anyhow::Error> {
tokio::sync::mpsc::channel::<(String, Vec<FileKey>)>(CONFIG.limit.file_move_thread_num);
let rx = Arc::new(Mutex::new(rx));
// move files
for _ in 0..CONFIG.limit.file_move_thread_num {
for thread_id in 0..CONFIG.limit.file_move_thread_num {
let rx = rx.clone();
tokio::spawn(async move {
loop {
Expand All @@ -76,7 +76,7 @@ pub async fn run() -> Result<(), anyhow::Error> {
break;
}
Some((prefix, files)) => {
if let Err(e) = move_files(&prefix, files).await {
if let Err(e) = move_files(thread_id, &prefix, files).await {
log::error!(
"[INGESTER:JOB] Error moving parquet files to remote: {}",
e
Expand Down Expand Up @@ -182,7 +182,11 @@ async fn prepare_files() -> Result<FxIndexMap<String, Vec<FileKey>>, anyhow::Err
Ok(partition_files_with_size)
}

async fn move_files(prefix: &str, files: Vec<FileKey>) -> Result<(), anyhow::Error> {
async fn move_files(
thread_id: usize,
prefix: &str,
files: Vec<FileKey>,
) -> Result<(), anyhow::Error> {
let columns = prefix.splitn(5, '/').collect::<Vec<&str>>();
// eg: files/default/logs/olympics/0/2023/08/21/08/8b8a5451bbe1c44b/
// eg: files/default/traces/default/0/2023/09/04/05/default/service_name=ingester/
Expand All @@ -191,7 +195,10 @@ async fn move_files(prefix: &str, files: Vec<FileKey>) -> Result<(), anyhow::Err
let stream_type = StreamType::from(columns[2]);
let stream_name = columns[3].to_string();

log::debug!("[INGESTER:JOB] check deletion for partition: {}", prefix);
log::debug!(
"[INGESTER:JOB:{thread_id}] check deletion for partition: {}",
prefix
);

let wal_dir = Path::new(&CONFIG.common.data_wal_dir)
.canonicalize()
Expand All @@ -201,15 +208,15 @@ async fn move_files(prefix: &str, files: Vec<FileKey>) -> Result<(), anyhow::Err
if db::compact::retention::is_deleting_stream(&org_id, stream_type, &stream_name, None) {
for file in files {
log::warn!(
"[INGESTER:JOB] the stream [{}/{}/{}] is deleting, just delete file: {}",
"[INGESTER:JOB:{thread_id}] the stream [{}/{}/{}] is deleting, just delete file: {}",
&org_id,
stream_type,
&stream_name,
file.key,
);
if let Err(e) = tokio::fs::remove_file(wal_dir.join(&file.key)).await {
log::error!(
"[INGESTER:JOB] Failed to remove parquet file from disk: {}, {}",
"[INGESTER:JOB:{thread_id}] Failed to remove parquet file from disk: {}, {}",
file.key,
e
);
Expand All @@ -219,7 +226,10 @@ async fn move_files(prefix: &str, files: Vec<FileKey>) -> Result<(), anyhow::Err
return Ok(());
}

log::debug!("[INGESTER:JOB] start processing for partition: {}", prefix);
log::debug!(
"[INGESTER:JOB:{thread_id}] start processing for partition: {}",
prefix
);

let wal_dir = wal_dir.clone();
// sort by created time
Expand Down Expand Up @@ -267,14 +277,17 @@ async fn move_files(prefix: &str, files: Vec<FileKey>) -> Result<(), anyhow::Err
}
}

log::debug!("[INGESTER:JOB] get schema for partition: {}", prefix);
log::debug!(
"[INGESTER:JOB:{thread_id}] get schema for partition: {}",
prefix
);

// get latest schema
let latest_schema = db::schema::get(&org_id, &stream_name, stream_type)
.await
.map_err(|e| {
log::error!(
"[INGESTER:JOB] Failed to get latest schema for stream [{}/{}/{}]: {}",
"[INGESTER:JOB:{thread_id}] Failed to get latest schema for stream [{}/{}/{}]: {}",
&org_id,
stream_type,
&stream_name,
Expand All @@ -283,15 +296,18 @@ async fn move_files(prefix: &str, files: Vec<FileKey>) -> Result<(), anyhow::Err
e
})?;

log::debug!("[INGESTER:JOB] start merging for partition: {}", prefix);
log::debug!(
"[INGESTER:JOB:{thread_id}] start merging for partition: {}",
prefix
);

// start merge files and upload to s3
loop {
// yield to other tasks
tokio::task::yield_now().await;
// merge file and get the big file key
let (new_file_name, new_file_meta, new_file_list) =
match merge_files(&latest_schema, &wal_dir, &files_with_size).await {
match merge_files(thread_id, &latest_schema, &wal_dir, &files_with_size).await {
Ok(v) => v,
Err(e) => {
log::error!("[INGESTER:JOB] merge files failed: {}", e);
Expand Down Expand Up @@ -330,7 +346,7 @@ async fn move_files(prefix: &str, files: Vec<FileKey>) -> Result<(), anyhow::Err
loop {
if wal::lock_files_exists(&file.key).await {
log::warn!(
"[INGESTER:JOB] the file is still in use, waiting for a few ms: {}",
"[INGESTER:JOB:{thread_id}] the file is still in use, waiting for a few ms: {}",
file.key
);
time::sleep(time::Duration::from_millis(100)).await;
Expand All @@ -342,7 +358,7 @@ async fn move_files(prefix: &str, files: Vec<FileKey>) -> Result<(), anyhow::Err
let ret = tokio::fs::remove_file(&wal_dir.join(&file.key)).await;
if let Err(e) = ret {
log::error!(
"[INGESTER:JOB] Failed to remove parquet file from disk: {}, {}",
"[INGESTER:JOB:{thread_id}] Failed to remove parquet file from disk: {}, {}",
file.key,
e.to_string()
);
Expand Down Expand Up @@ -377,6 +393,7 @@ async fn move_files(prefix: &str, files: Vec<FileKey>) -> Result<(), anyhow::Err
/// merge some small files into one big file, upload to storage, returns the big
/// file key and merged files
async fn merge_files(
thread_id: usize,
latest_schema: &Schema,
wal_dir: &Path,
files_with_size: &[FileKey],
Expand All @@ -403,12 +420,12 @@ async fn merge_files(
let mut file_schema = None;
let tmp_dir = cache::tmpfs::Directory::default();
for file in retain_file_list.iter_mut() {
log::info!("[INGESTER:JOB] merge small file: {}", &file.key);
log::info!("[INGESTER:JOB:{thread_id}] merge small file: {}", &file.key);
let data = match get_file_contents(&wal_dir.join(&file.key)).await {
Ok(body) => body,
Err(err) => {
log::error!(
"[INGESTER:JOB] merge small file: {}, err: {}",
"[INGESTER:JOB:{thread_id}] merge small file: {}, err: {}",
&file.key,
err
);
Expand Down Expand Up @@ -472,12 +489,16 @@ async fn merge_files(
Ok(v) => v,
Err(e) => {
log::error!(
"[INGESTER:JOB] merge_parquet_files error for stream -> '{}/{}/{}'",
"[INGESTER:JOB:{thread_id}] merge_parquet_files error for stream -> '{}/{}/{}'",
org_id,
stream_type,
stream_name
);
log::error!("[INGESTER:JOB] {} for files {:?}", e, retain_file_list);
log::error!(
"[INGESTER:JOB:{thread_id}] {} for files {:?}",
e,
retain_file_list
);
return Err(e.into());
}
};
Expand All @@ -499,7 +520,7 @@ async fn merge_files(
let new_file_key =
super::generate_storage_file_name(&org_id, stream_type, &stream_name, &file_name);
log::info!(
"[INGESTER:JOB] merge file succeeded, {} files into a new file: {}, original_size: {}, compressed_size: {}",
"[INGESTER:JOB:{thread_id}] merge file succeeded, {} files into a new file: {}, original_size: {}, compressed_size: {}",
retain_file_list.len(),
new_file_key,
new_file_meta.original_size,
Expand Down