Skip to content

Commit

Permalink
perf: improve ingestion load and flatten (#3218)
Browse files Browse the repository at this point in the history
- [x] impl #3216 
- [x] add more logs for parquet job
  • Loading branch information
hengfeiyang committed Apr 11, 2024
1 parent d4ff188 commit 4f95333
Show file tree
Hide file tree
Showing 10 changed files with 166 additions and 33 deletions.
2 changes: 2 additions & 0 deletions src/config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,8 @@ pub struct Limit {
pub query_group_base_speed: usize,
#[env_config(name = "ZO_INGEST_ALLOWED_UPTO", default = 5)] // in hours - in past
pub ingest_allowed_upto: i64,
#[env_config(name = "ZO_INGEST_FLATTEN_LEVEL", default = 3)] // default flatten level
pub ingest_flatten_level: u32,
#[env_config(name = "ZO_IGNORE_FILE_RETENTION_BY_STREAM", default = false)]
pub ignore_file_retention_by_stream: bool,
#[env_config(name = "ZO_LOGS_FILE_RETENTION", default = "hourly")]
Expand Down
121 changes: 115 additions & 6 deletions src/config/src/utils/flatten.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ use serde_json::value::{Map, Value};

const KEY_SEPARATOR: &str = "_";

#[inline]
pub fn flatten(to_flatten: Value) -> Result<Value, anyhow::Error> {
flatten_with_level(to_flatten, 0)
}

/// Flattens the provided JSON object (`current`).
///
/// It will return an error if flattening the object would make two keys to be
Expand All @@ -26,7 +31,7 @@ const KEY_SEPARATOR: &str = "_";
/// # Errors
/// Will return `Err` if `to_flatten` it's not an object, or if flattening the
/// object would result in two or more keys colliding.
pub fn flatten(to_flatten: Value) -> Result<Value, anyhow::Error> {
pub fn flatten_with_level(to_flatten: Value, max_level: u32) -> Result<Value, anyhow::Error> {
// quick check to see if we have an object`
let to_flatten = match to_flatten {
Value::Object(v) => {
Expand All @@ -49,7 +54,7 @@ pub fn flatten(to_flatten: Value) -> Result<Value, anyhow::Error> {
};

let mut flat = Map::<String, Value>::new();
flatten_value(to_flatten, "".to_owned(), 0, &mut flat).map(|_x| Value::Object(flat))
flatten_value(to_flatten, "".to_owned(), max_level, 0, &mut flat).map(|_x| Value::Object(flat))
}

/// Flattens the passed JSON value (`current`), whose path is `parent_key` and
Expand All @@ -58,15 +63,16 @@ pub fn flatten(to_flatten: Value) -> Result<Value, anyhow::Error> {
fn flatten_value(
current: Value,
parent_key: String,
max_level: u32,
depth: u32,
flattened: &mut Map<String, Value>,
) -> Result<(), anyhow::Error> {
match current {
Value::Object(map) => {
flatten_object(map, &parent_key, depth, flattened)?;
flatten_object(map, &parent_key, max_level, depth, flattened)?;
}
Value::Array(arr) => {
flatten_array(arr, &parent_key, depth, flattened)?;
flatten_array(arr, &parent_key, max_level, depth, flattened)?;
}
_ => {
flattened.insert(parent_key, current);
Expand All @@ -81,17 +87,26 @@ fn flatten_value(
fn flatten_object(
current: Map<String, Value>,
parent_key: &str,
max_level: u32,
depth: u32,
flattened: &mut Map<String, Value>,
) -> Result<(), anyhow::Error> {
if current.is_empty() {
return Ok(());
}
if max_level > 0 && depth >= max_level {
let v = Value::String(Value::Object(current).to_string());
flatten_value(v, parent_key.to_string(), max_level, depth, flattened)?;
return Ok(());
}
for (mut k, v) in current.into_iter() {
format_key(&mut k);
let parent_key = if depth > 0 {
format!("{}{}{}", parent_key, KEY_SEPARATOR, k)
} else {
k
};
flatten_value(v, parent_key, depth + 1, flattened)?;
flatten_value(v, parent_key, max_level, depth + 1, flattened)?;
}
Ok(())
}
Expand All @@ -102,6 +117,7 @@ fn flatten_object(
fn flatten_array(
current: Vec<Value>,
parent_key: &str,
max_level: u32,
depth: u32,
flattened: &mut Map<String, Value>,
) -> Result<(), anyhow::Error> {
Expand All @@ -113,7 +129,7 @@ fn flatten_array(
// flatten_value(obj, parent_key, depth + 1, flattened)?;
// }
let v = Value::String(Value::Array(current.to_vec()).to_string());
flatten_value(v, parent_key.to_string(), depth, flattened)?;
flatten_value(v, parent_key.to_string(), max_level, depth, flattened)?;
Ok(())
}

Expand Down Expand Up @@ -386,4 +402,97 @@ mod tests {
let output = flatten(input).unwrap();
assert_eq!(output, expected_output);
}

#[test]
fn test_flatten_with_level() {
let input = json!({
"firstName": "John",
"lastName": "Doe",
"age": 25,
"info": {
"address": {
"streetAddress": "123 Main St",
"city": "Anytown",
"state": "CA",
"postalCode": "12345",
"phoneNumbers": {
"type": "home",
"number": "555-555-1234"
}
},
"phoneNumbers": [
{
"type": "home",
"number": "555-555-1234"
},
{
"type": "work",
"number": "555-555-5678"
}
]
}
});

let expected_output_level0 = json!({
"firstname": "John",
"lastname": "Doe",
"age": 25,
"info_address_streetaddress": "123 Main St",
"info_address_city": "Anytown",
"info_address_state": "CA",
"info_address_postalcode": "12345",
"info_address_phonenumbers_number": "555-555-1234",
"info_address_phonenumbers_type": "home",
"info_phonenumbers": "[{\"number\":\"555-555-1234\",\"type\":\"home\"},{\"number\":\"555-555-5678\",\"type\":\"work\"}]"
});
let expected_output_level1 = json!({
"firstname": "John",
"lastname": "Doe",
"age": 25,
"info": "{\"address\":{\"city\":\"Anytown\",\"phoneNumbers\":{\"number\":\"555-555-1234\",\"type\":\"home\"},\"postalCode\":\"12345\",\"state\":\"CA\",\"streetAddress\":\"123 Main St\"},\"phoneNumbers\":[{\"number\":\"555-555-1234\",\"type\":\"home\"},{\"number\":\"555-555-5678\",\"type\":\"work\"}]}"
});
let expected_output_level2 = json!({
"firstname": "John",
"lastname": "Doe",
"age": 25,
"info_address": "{\"city\":\"Anytown\",\"phoneNumbers\":{\"number\":\"555-555-1234\",\"type\":\"home\"},\"postalCode\":\"12345\",\"state\":\"CA\",\"streetAddress\":\"123 Main St\"}",
"info_phonenumbers": "[{\"number\":\"555-555-1234\",\"type\":\"home\"},{\"number\":\"555-555-5678\",\"type\":\"work\"}]"
});
let expected_output_level3 = json!({
"firstname": "John",
"lastname": "Doe",
"age": 25,
"info_address_streetaddress": "123 Main St",
"info_address_city": "Anytown",
"info_address_state": "CA",
"info_address_postalcode": "12345",
"info_address_phonenumbers": "{\"number\":\"555-555-1234\",\"type\":\"home\"}",
"info_phonenumbers": "[{\"number\":\"555-555-1234\",\"type\":\"home\"},{\"number\":\"555-555-5678\",\"type\":\"work\"}]"
});
let expected_output_level4 = json!({
"firstname": "John",
"lastname": "Doe",
"age": 25,
"info_address_streetaddress": "123 Main St",
"info_address_city": "Anytown",
"info_address_state": "CA",
"info_address_postalcode": "12345",
"info_address_phonenumbers_number": "555-555-1234",
"info_address_phonenumbers_type": "home",
"info_phonenumbers": "[{\"number\":\"555-555-1234\",\"type\":\"home\"},{\"number\":\"555-555-5678\",\"type\":\"work\"}]"
});

let output = flatten_with_level(input.clone(), 0).unwrap();
assert_eq!(output, expected_output_level0);
let output = flatten_with_level(input.clone(), 1).unwrap();
assert_eq!(output, expected_output_level1);
let output = flatten_with_level(input.clone(), 2).unwrap();
assert_eq!(output, expected_output_level2);
let output = flatten_with_level(input.clone(), 3).unwrap();
assert_eq!(output, expected_output_level3);
let output = flatten_with_level(input.clone(), 4).unwrap();
assert_eq!(output, expected_output_level4);
let output = flatten_with_level(input, 5).unwrap();
assert_eq!(output, expected_output_level4);
}
}
57 changes: 39 additions & 18 deletions src/job/files/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ pub async fn run() -> Result<(), anyhow::Error> {
tokio::sync::mpsc::channel::<(String, Vec<FileKey>)>(CONFIG.limit.file_move_thread_num);
let rx = Arc::new(Mutex::new(rx));
// move files
for _ in 0..CONFIG.limit.file_move_thread_num {
for thread_id in 0..CONFIG.limit.file_move_thread_num {
let rx = rx.clone();
tokio::spawn(async move {
loop {
Expand All @@ -76,7 +76,7 @@ pub async fn run() -> Result<(), anyhow::Error> {
break;
}
Some((prefix, files)) => {
if let Err(e) = move_files(&prefix, files).await {
if let Err(e) = move_files(thread_id, &prefix, files).await {
log::error!(
"[INGESTER:JOB] Error moving parquet files to remote: {}",
e
Expand Down Expand Up @@ -182,7 +182,11 @@ async fn prepare_files() -> Result<FxIndexMap<String, Vec<FileKey>>, anyhow::Err
Ok(partition_files_with_size)
}

async fn move_files(prefix: &str, files: Vec<FileKey>) -> Result<(), anyhow::Error> {
async fn move_files(
thread_id: usize,
prefix: &str,
files: Vec<FileKey>,
) -> Result<(), anyhow::Error> {
let columns = prefix.splitn(5, '/').collect::<Vec<&str>>();
// eg: files/default/logs/olympics/0/2023/08/21/08/8b8a5451bbe1c44b/
// eg: files/default/traces/default/0/2023/09/04/05/default/service_name=ingester/
Expand All @@ -191,7 +195,10 @@ async fn move_files(prefix: &str, files: Vec<FileKey>) -> Result<(), anyhow::Err
let stream_type = StreamType::from(columns[2]);
let stream_name = columns[3].to_string();

log::debug!("[INGESTER:JOB] check deletion for partition: {}", prefix);
log::debug!(
"[INGESTER:JOB:{thread_id}] check deletion for partition: {}",
prefix
);

let wal_dir = Path::new(&CONFIG.common.data_wal_dir)
.canonicalize()
Expand All @@ -201,15 +208,15 @@ async fn move_files(prefix: &str, files: Vec<FileKey>) -> Result<(), anyhow::Err
if db::compact::retention::is_deleting_stream(&org_id, stream_type, &stream_name, None) {
for file in files {
log::warn!(
"[INGESTER:JOB] the stream [{}/{}/{}] is deleting, just delete file: {}",
"[INGESTER:JOB:{thread_id}] the stream [{}/{}/{}] is deleting, just delete file: {}",
&org_id,
stream_type,
&stream_name,
file.key,
);
if let Err(e) = tokio::fs::remove_file(wal_dir.join(&file.key)).await {
log::error!(
"[INGESTER:JOB] Failed to remove parquet file from disk: {}, {}",
"[INGESTER:JOB:{thread_id}] Failed to remove parquet file from disk: {}, {}",
file.key,
e
);
Expand All @@ -219,7 +226,10 @@ async fn move_files(prefix: &str, files: Vec<FileKey>) -> Result<(), anyhow::Err
return Ok(());
}

log::debug!("[INGESTER:JOB] start processing for partition: {}", prefix);
log::debug!(
"[INGESTER:JOB:{thread_id}] start processing for partition: {}",
prefix
);

let wal_dir = wal_dir.clone();
// sort by created time
Expand Down Expand Up @@ -267,14 +277,17 @@ async fn move_files(prefix: &str, files: Vec<FileKey>) -> Result<(), anyhow::Err
}
}

log::debug!("[INGESTER:JOB] get schema for partition: {}", prefix);
log::debug!(
"[INGESTER:JOB:{thread_id}] get schema for partition: {}",
prefix
);

// get latest schema
let latest_schema = db::schema::get(&org_id, &stream_name, stream_type)
.await
.map_err(|e| {
log::error!(
"[INGESTER:JOB] Failed to get latest schema for stream [{}/{}/{}]: {}",
"[INGESTER:JOB:{thread_id}] Failed to get latest schema for stream [{}/{}/{}]: {}",
&org_id,
stream_type,
&stream_name,
Expand All @@ -283,15 +296,18 @@ async fn move_files(prefix: &str, files: Vec<FileKey>) -> Result<(), anyhow::Err
e
})?;

log::debug!("[INGESTER:JOB] start merging for partition: {}", prefix);
log::debug!(
"[INGESTER:JOB:{thread_id}] start merging for partition: {}",
prefix
);

// start merge files and upload to s3
loop {
// yield to other tasks
tokio::task::yield_now().await;
// merge file and get the big file key
let (new_file_name, new_file_meta, new_file_list) =
match merge_files(&latest_schema, &wal_dir, &files_with_size).await {
match merge_files(thread_id, &latest_schema, &wal_dir, &files_with_size).await {
Ok(v) => v,
Err(e) => {
log::error!("[INGESTER:JOB] merge files failed: {}", e);
Expand Down Expand Up @@ -330,7 +346,7 @@ async fn move_files(prefix: &str, files: Vec<FileKey>) -> Result<(), anyhow::Err
loop {
if wal::lock_files_exists(&file.key).await {
log::warn!(
"[INGESTER:JOB] the file is still in use, waiting for a few ms: {}",
"[INGESTER:JOB:{thread_id}] the file is still in use, waiting for a few ms: {}",
file.key
);
time::sleep(time::Duration::from_millis(100)).await;
Expand All @@ -342,7 +358,7 @@ async fn move_files(prefix: &str, files: Vec<FileKey>) -> Result<(), anyhow::Err
let ret = tokio::fs::remove_file(&wal_dir.join(&file.key)).await;
if let Err(e) = ret {
log::error!(
"[INGESTER:JOB] Failed to remove parquet file from disk: {}, {}",
"[INGESTER:JOB:{thread_id}] Failed to remove parquet file from disk: {}, {}",
file.key,
e.to_string()
);
Expand Down Expand Up @@ -377,6 +393,7 @@ async fn move_files(prefix: &str, files: Vec<FileKey>) -> Result<(), anyhow::Err
/// merge some small files into one big file, upload to storage, returns the big
/// file key and merged files
async fn merge_files(
thread_id: usize,
latest_schema: &Schema,
wal_dir: &Path,
files_with_size: &[FileKey],
Expand All @@ -403,12 +420,12 @@ async fn merge_files(
let mut file_schema = None;
let tmp_dir = cache::tmpfs::Directory::default();
for file in retain_file_list.iter_mut() {
log::info!("[INGESTER:JOB] merge small file: {}", &file.key);
log::info!("[INGESTER:JOB:{thread_id}] merge small file: {}", &file.key);
let data = match get_file_contents(&wal_dir.join(&file.key)).await {
Ok(body) => body,
Err(err) => {
log::error!(
"[INGESTER:JOB] merge small file: {}, err: {}",
"[INGESTER:JOB:{thread_id}] merge small file: {}, err: {}",
&file.key,
err
);
Expand Down Expand Up @@ -472,12 +489,16 @@ async fn merge_files(
Ok(v) => v,
Err(e) => {
log::error!(
"[INGESTER:JOB] merge_parquet_files error for stream -> '{}/{}/{}'",
"[INGESTER:JOB:{thread_id}] merge_parquet_files error for stream -> '{}/{}/{}'",
org_id,
stream_type,
stream_name
);
log::error!("[INGESTER:JOB] {} for files {:?}", e, retain_file_list);
log::error!(
"[INGESTER:JOB:{thread_id}] {} for files {:?}",
e,
retain_file_list
);
return Err(e.into());
}
};
Expand All @@ -499,7 +520,7 @@ async fn merge_files(
let new_file_key =
super::generate_storage_file_name(&org_id, stream_type, &stream_name, &file_name);
log::info!(
"[INGESTER:JOB] merge file succeeded, {} files into a new file: {}, original_size: {}, compressed_size: {}",
"[INGESTER:JOB:{thread_id}] merge file succeeded, {} files into a new file: {}, original_size: {}, compressed_size: {}",
retain_file_list.len(),
new_file_key,
new_file_meta.original_size,
Expand Down

0 comments on commit 4f95333

Please sign in to comment.