From daeef5103f2740c58a326f121e2d6df98ef230bb Mon Sep 17 00:00:00 2001 From: ankitsheoran1 Date: Sun, 23 Nov 2025 09:32:13 +0530 Subject: [PATCH 1/3] optimize perf of ingest --- src/connectors/kafka/processor.rs | 2 +- src/event/mod.rs | 10 +- src/handlers/http/ingest.rs | 4 +- src/handlers/http/modal/utils/ingest_utils.rs | 2 +- src/parseable/streams.rs | 141 +++++++++++++----- src/storage/field_stats.rs | 2 +- 6 files changed, 112 insertions(+), 49 deletions(-) diff --git a/src/connectors/kafka/processor.rs b/src/connectors/kafka/processor.rs index 23baae214..b04faaa2d 100644 --- a/src/connectors/kafka/processor.rs +++ b/src/connectors/kafka/processor.rs @@ -103,7 +103,7 @@ impl Processor, ()> for ParseableSinkProcessor { let len = records.len(); debug!("Processing {len} records"); - self.build_event_from_chunk(&records).await?.process()?; + self.build_event_from_chunk(&records).await?.process().await?; debug!("Processed {len} records"); Ok(()) diff --git a/src/event/mod.rs b/src/event/mod.rs index d5a0ef25d..5bcdb3563 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -56,7 +56,7 @@ pub struct Event { // Events holds the schema related to a each event for a single log stream impl Event { - pub fn process(self) -> Result<(), EventError> { + pub async fn process(self) -> Result<(), EventError> { let mut key = get_schema_key(&self.rb.schema().fields); if self.time_partition.is_some() { let parsed_timestamp_to_min = self.parsed_timestamp.format("%Y%m%dT%H%M").to_string(); @@ -73,13 +73,14 @@ impl Event { commit_schema(&self.stream_name, self.rb.schema())?; } + // Await async push - memtable push is awaited, disk write is fire-and-forget PARSEABLE.get_or_create_stream(&self.stream_name).push( &key, &self.rb, self.parsed_timestamp, &self.custom_partition_values, self.stream_type, - )?; + ).await?; update_stats( &self.stream_name, @@ -99,16 +100,17 @@ impl Event { Ok(()) } - pub fn process_unchecked(&self) -> Result<(), EventError> { + pub async fn process_unchecked(&self) -> Result<(), EventError> { let key = get_schema_key(&self.rb.schema().fields); + // Await async push PARSEABLE.get_or_create_stream(&self.stream_name).push( &key, &self.rb, self.parsed_timestamp, &self.custom_partition_values, self.stream_type, - )?; + ).await?; Ok(()) } diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 91229b7be..42ff7e44f 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -150,7 +150,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result< StreamType::Internal, &p_custom_fields, )? - .process()?; + .process().await?; Ok(()) } @@ -416,7 +416,7 @@ pub async fn push_logs_unchecked( custom_partition_values: HashMap::new(), // should be an empty map for unchecked push stream_type: StreamType::UserDefined, }; - unchecked_event.process_unchecked()?; + unchecked_event.process_unchecked().await?; Ok(unchecked_event) } diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index b3e4d2d46..de801daab 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -170,7 +170,7 @@ pub async fn push_logs( StreamType::UserDefined, p_custom_fields, )? - .process()?; + .process().await?; } Ok(()) } diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 1dab2542a..b17daa490 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -135,50 +135,111 @@ impl Stream { } // Concatenates record batches and puts them in memory store for each event. - pub fn push( - &self, + // This method now defers memtable and disk operations to blocking thread pools. + // Disk write is awaited to ensure durability (data is persisted), while memtable push is fire-and-forget for performance. + // If disk write fails, request fails - ensuring data consistency. + // If memtable push fails, data is still on disk (safe), and memtable can be rebuilt. + pub async fn push( + self: &Arc, schema_key: &str, record: &RecordBatch, parsed_timestamp: NaiveDateTime, custom_partition_values: &HashMap, stream_type: StreamType, ) -> Result<(), StagingError> { - let mut guard = match self.writer.lock() { - Ok(guard) => guard, - Err(poisoned) => { - error!( - "Writer lock poisoned while ingesting data for stream {}", - self.stream_name - ); - poisoned.into_inner() - } - }; - if self.options.mode != Mode::Query || stream_type == StreamType::Internal { + // Clone data needed for background operations + let record_clone = record.clone(); + let schema_key_clone = schema_key.to_string(); + let options_mode = self.options.mode; + + // Defer disk write to blocking thread pool and await it + // This ensures disk write succeeds before we return, maintaining durability + // If disk write fails, request fails - data is not persisted, so we shouldn't return success + if options_mode != Mode::Query || stream_type == StreamType::Internal { let filename = self.filename_by_partition(schema_key, parsed_timestamp, custom_partition_values); - match guard.disk.get_mut(&filename) { - Some(writer) => { - writer.write(record)?; - } - None => { - // entry is not present thus we create it - std::fs::create_dir_all(&self.data_path)?; + + let stream_for_disk = Arc::clone(self); + let filename_clone = filename.clone(); + let filename_for_error = filename.clone(); // Clone for error message + let record_for_disk = record_clone.clone(); + let parsed_timestamp_for_disk = parsed_timestamp; + let stream_name_clone = self.stream_name.clone(); + + // Await disk write - this is critical for data durability + tokio::task::spawn_blocking(move || { + let mut guard = match stream_for_disk.writer.lock() { + Ok(guard) => guard, + Err(poisoned) => { + error!( + "Writer lock poisoned while writing to disk for stream {}", + stream_for_disk.stream_name + ); + poisoned.into_inner() + } + }; + + match guard.disk.get_mut(&filename_clone) { + Some(writer) => { + // Blocking disk write - runs in background thread pool + writer.write(&record_for_disk) + } + None => { + // Create directory - blocking I/O operation + std::fs::create_dir_all(&stream_for_disk.data_path)?; - let range = TimeRange::granularity_range( - parsed_timestamp.and_local_timezone(Utc).unwrap(), - OBJECT_STORE_DATA_GRANULARITY, - ); - let file_path = self.data_path.join(&filename); - let mut writer = DiskWriter::try_new(file_path, &record.schema(), range) - .expect("File and RecordBatch both are checked"); + let range = TimeRange::granularity_range( + parsed_timestamp_for_disk.and_local_timezone(Utc).unwrap(), + OBJECT_STORE_DATA_GRANULARITY, + ); + let file_path = stream_for_disk.data_path.join(&filename_clone); + let mut writer = DiskWriter::try_new(file_path, &record_for_disk.schema(), range)?; - writer.write(record)?; - guard.disk.insert(filename, writer); + writer.write(&record_for_disk)?; + guard.disk.insert(filename_clone, writer); + Ok(()) + } } - }; + }) + .await + .map_err(|e| StagingError::ObjectStorage(std::io::Error::new( + std::io::ErrorKind::Other, + format!("Disk write task failed: {}", e) + )))? + .map_err(|e| { + error!( + "Disk write failed for stream {} file {}: {}", + stream_name_clone, filename_for_error, e + ); + e + })?; } - guard.mem.push(schema_key, record); + // Defer memtable push to blocking thread pool - fire-and-forget for performance + // Memtable is for query performance optimization, not durability + // If memtable push fails, data is still safely on disk and memtable can be rebuilt + { + let stream_for_memtable = Arc::clone(self); + let schema_key_for_memtable = schema_key_clone; + let record_for_memtable = record.clone(); + + // Spawn without awaiting - fire and forget for performance + // The concat operation at 16384 events is CPU-bound but won't block the request path + tokio::task::spawn_blocking(move || { + let mut guard = match stream_for_memtable.writer.lock() { + Ok(guard) => guard, + Err(poisoned) => { + error!( + "Writer lock poisoned while pushing to memtable for stream {}", + stream_for_memtable.stream_name + ); + poisoned.into_inner() + } + }; + // Push to memtable - concat happens here at 16384 events + guard.mem.push(&schema_key_for_memtable, &record_for_memtable); + }); + } Ok(()) } @@ -1328,15 +1389,15 @@ mod tests { ], ) .unwrap(); - staging - .push( - "abc", - &batch, - time, - &HashMap::new(), - StreamType::UserDefined, - ) - .unwrap(); + // Use tokio runtime to call async push from sync test context + let rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(staging.push( + "abc", + &batch, + time, + &HashMap::new(), + StreamType::UserDefined, + )).unwrap(); staging.flush(true); } diff --git a/src/storage/field_stats.rs b/src/storage/field_stats.rs index a0cc46a45..6201d34fe 100644 --- a/src/storage/field_stats.rs +++ b/src/storage/field_stats.rs @@ -162,7 +162,7 @@ pub async fn calculate_field_stats( StreamType::Internal, &p_custom_fields, )? - .process()?; + .process().await?; } Ok(stats_calculated) } From 71d9d110e1ebe570b92e07bff9bf5c2bccb065dd Mon Sep 17 00:00:00 2001 From: ankitsheoran1 Date: Sun, 23 Nov 2025 13:54:39 +0530 Subject: [PATCH 2/3] fix test case --- src/parseable/streams.rs | 34 +++++++++++++++++++++++++++++++--- 1 file changed, 31 insertions(+), 3 deletions(-) diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index b17daa490..cabdf4113 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -1389,7 +1389,8 @@ mod tests { ], ) .unwrap(); - // Use tokio runtime to call async push from sync test context + + // For sync tests - create a new runtime let rt = tokio::runtime::Runtime::new().unwrap(); rt.block_on(staging.push( "abc", @@ -1401,6 +1402,33 @@ mod tests { staging.flush(true); } + // Async version for async tests (like #[tokio::test]) + async fn write_log_async(staging: &StreamRef, schema: &Schema, mins: i64) { + let time: NaiveDateTime = Utc::now() + .checked_sub_signed(TimeDelta::minutes(mins)) + .unwrap() + .naive_utc(); + let batch = RecordBatch::try_new( + Arc::new(schema.clone()), + vec![ + Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3])), + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(StringArray::from(vec!["a", "b", "c"])), + ], + ) + .unwrap(); + + // Direct await - we're already in an async context + staging.push( + "abc", + &batch, + time, + &HashMap::new(), + StreamType::UserDefined, + ).await.unwrap(); + staging.flush(true); + } + #[test] fn different_minutes_multiple_arrow_files_to_parquet() { let temp_dir = TempDir::new().unwrap(); @@ -1529,11 +1557,11 @@ mod tests { // 2 logs in the previous minutes for i in 0..2 { - write_log(&staging, &schema, i); + write_log_async(&staging, &schema, i).await; } sleep(Duration::from_secs(60)).await; - write_log(&staging, &schema, 0); + write_log_async(&staging, &schema, 0).await; // verify the arrow files exist in staging assert_eq!(staging.arrow_files().len(), 3); From 313a5ef29db5ba7150503a008670535134786dca Mon Sep 17 00:00:00 2001 From: ankitsheoran1 Date: Sun, 23 Nov 2025 16:24:23 +0530 Subject: [PATCH 3/3] refactor --- src/parseable/streams.rs | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index cabdf4113..a89413202 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -136,9 +136,9 @@ impl Stream { // Concatenates record batches and puts them in memory store for each event. // This method now defers memtable and disk operations to blocking thread pools. - // Disk write is awaited to ensure durability (data is persisted), while memtable push is fire-and-forget for performance. + // Disk write is awaited to ensure durability (data is persisted), while memtable push is fire-and-forget. // If disk write fails, request fails - ensuring data consistency. - // If memtable push fails, data is still on disk (safe), and memtable can be rebuilt. + // If memtable push fails, data is still on disk pub async fn push( self: &Arc, schema_key: &str, @@ -147,7 +147,6 @@ impl Stream { custom_partition_values: &HashMap, stream_type: StreamType, ) -> Result<(), StagingError> { - // Clone data needed for background operations let record_clone = record.clone(); let schema_key_clone = schema_key.to_string(); let options_mode = self.options.mode; @@ -161,12 +160,12 @@ impl Stream { let stream_for_disk = Arc::clone(self); let filename_clone = filename.clone(); - let filename_for_error = filename.clone(); // Clone for error message + let filename_for_error = filename.clone(); let record_for_disk = record_clone.clone(); let parsed_timestamp_for_disk = parsed_timestamp; let stream_name_clone = self.stream_name.clone(); - // Await disk write - this is critical for data durability + // Await disk write tokio::task::spawn_blocking(move || { let mut guard = match stream_for_disk.writer.lock() { Ok(guard) => guard, @@ -185,7 +184,6 @@ impl Stream { writer.write(&record_for_disk) } None => { - // Create directory - blocking I/O operation std::fs::create_dir_all(&stream_for_disk.data_path)?; let range = TimeRange::granularity_range( @@ -215,16 +213,14 @@ impl Stream { })?; } - // Defer memtable push to blocking thread pool - fire-and-forget for performance - // Memtable is for query performance optimization, not durability + // Defer memtable push to blocking thread pool - fire-and-forget // If memtable push fails, data is still safely on disk and memtable can be rebuilt { let stream_for_memtable = Arc::clone(self); let schema_key_for_memtable = schema_key_clone; let record_for_memtable = record.clone(); - // Spawn without awaiting - fire and forget for performance - // The concat operation at 16384 events is CPU-bound but won't block the request path + // TODO: The concat operation at 16384 events is CPU-bound but won't block the request path tokio::task::spawn_blocking(move || { let mut guard = match stream_for_memtable.writer.lock() { Ok(guard) => guard,