diff --git a/src/connectors/kafka/processor.rs b/src/connectors/kafka/processor.rs index 23baae214..b04faaa2d 100644 --- a/src/connectors/kafka/processor.rs +++ b/src/connectors/kafka/processor.rs @@ -103,7 +103,7 @@ impl Processor, ()> for ParseableSinkProcessor { let len = records.len(); debug!("Processing {len} records"); - self.build_event_from_chunk(&records).await?.process()?; + self.build_event_from_chunk(&records).await?.process().await?; debug!("Processed {len} records"); Ok(()) diff --git a/src/event/mod.rs b/src/event/mod.rs index d5a0ef25d..5bcdb3563 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -56,7 +56,7 @@ pub struct Event { // Events holds the schema related to a each event for a single log stream impl Event { - pub fn process(self) -> Result<(), EventError> { + pub async fn process(self) -> Result<(), EventError> { let mut key = get_schema_key(&self.rb.schema().fields); if self.time_partition.is_some() { let parsed_timestamp_to_min = self.parsed_timestamp.format("%Y%m%dT%H%M").to_string(); @@ -73,13 +73,14 @@ impl Event { commit_schema(&self.stream_name, self.rb.schema())?; } + // Await async push - memtable push is awaited, disk write is fire-and-forget PARSEABLE.get_or_create_stream(&self.stream_name).push( &key, &self.rb, self.parsed_timestamp, &self.custom_partition_values, self.stream_type, - )?; + ).await?; update_stats( &self.stream_name, @@ -99,16 +100,17 @@ impl Event { Ok(()) } - pub fn process_unchecked(&self) -> Result<(), EventError> { + pub async fn process_unchecked(&self) -> Result<(), EventError> { let key = get_schema_key(&self.rb.schema().fields); + // Await async push PARSEABLE.get_or_create_stream(&self.stream_name).push( &key, &self.rb, self.parsed_timestamp, &self.custom_partition_values, self.stream_type, - )?; + ).await?; Ok(()) } diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 91229b7be..42ff7e44f 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -150,7 +150,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result< StreamType::Internal, &p_custom_fields, )? - .process()?; + .process().await?; Ok(()) } @@ -416,7 +416,7 @@ pub async fn push_logs_unchecked( custom_partition_values: HashMap::new(), // should be an empty map for unchecked push stream_type: StreamType::UserDefined, }; - unchecked_event.process_unchecked()?; + unchecked_event.process_unchecked().await?; Ok(unchecked_event) } diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index b3e4d2d46..de801daab 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -170,7 +170,7 @@ pub async fn push_logs( StreamType::UserDefined, p_custom_fields, )? - .process()?; + .process().await?; } Ok(()) } diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 1dab2542a..a89413202 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -135,50 +135,107 @@ impl Stream { } // Concatenates record batches and puts them in memory store for each event. - pub fn push( - &self, + // This method now defers memtable and disk operations to blocking thread pools. + // Disk write is awaited to ensure durability (data is persisted), while memtable push is fire-and-forget. + // If disk write fails, request fails - ensuring data consistency. + // If memtable push fails, data is still on disk + pub async fn push( + self: &Arc, schema_key: &str, record: &RecordBatch, parsed_timestamp: NaiveDateTime, custom_partition_values: &HashMap, stream_type: StreamType, ) -> Result<(), StagingError> { - let mut guard = match self.writer.lock() { - Ok(guard) => guard, - Err(poisoned) => { - error!( - "Writer lock poisoned while ingesting data for stream {}", - self.stream_name - ); - poisoned.into_inner() - } - }; - if self.options.mode != Mode::Query || stream_type == StreamType::Internal { + let record_clone = record.clone(); + let schema_key_clone = schema_key.to_string(); + let options_mode = self.options.mode; + + // Defer disk write to blocking thread pool and await it + // This ensures disk write succeeds before we return, maintaining durability + // If disk write fails, request fails - data is not persisted, so we shouldn't return success + if options_mode != Mode::Query || stream_type == StreamType::Internal { let filename = self.filename_by_partition(schema_key, parsed_timestamp, custom_partition_values); - match guard.disk.get_mut(&filename) { - Some(writer) => { - writer.write(record)?; - } - None => { - // entry is not present thus we create it - std::fs::create_dir_all(&self.data_path)?; + + let stream_for_disk = Arc::clone(self); + let filename_clone = filename.clone(); + let filename_for_error = filename.clone(); + let record_for_disk = record_clone.clone(); + let parsed_timestamp_for_disk = parsed_timestamp; + let stream_name_clone = self.stream_name.clone(); + + // Await disk write + tokio::task::spawn_blocking(move || { + let mut guard = match stream_for_disk.writer.lock() { + Ok(guard) => guard, + Err(poisoned) => { + error!( + "Writer lock poisoned while writing to disk for stream {}", + stream_for_disk.stream_name + ); + poisoned.into_inner() + } + }; + + match guard.disk.get_mut(&filename_clone) { + Some(writer) => { + // Blocking disk write - runs in background thread pool + writer.write(&record_for_disk) + } + None => { + std::fs::create_dir_all(&stream_for_disk.data_path)?; - let range = TimeRange::granularity_range( - parsed_timestamp.and_local_timezone(Utc).unwrap(), - OBJECT_STORE_DATA_GRANULARITY, - ); - let file_path = self.data_path.join(&filename); - let mut writer = DiskWriter::try_new(file_path, &record.schema(), range) - .expect("File and RecordBatch both are checked"); + let range = TimeRange::granularity_range( + parsed_timestamp_for_disk.and_local_timezone(Utc).unwrap(), + OBJECT_STORE_DATA_GRANULARITY, + ); + let file_path = stream_for_disk.data_path.join(&filename_clone); + let mut writer = DiskWriter::try_new(file_path, &record_for_disk.schema(), range)?; - writer.write(record)?; - guard.disk.insert(filename, writer); + writer.write(&record_for_disk)?; + guard.disk.insert(filename_clone, writer); + Ok(()) + } } - }; + }) + .await + .map_err(|e| StagingError::ObjectStorage(std::io::Error::new( + std::io::ErrorKind::Other, + format!("Disk write task failed: {}", e) + )))? + .map_err(|e| { + error!( + "Disk write failed for stream {} file {}: {}", + stream_name_clone, filename_for_error, e + ); + e + })?; } - guard.mem.push(schema_key, record); + // Defer memtable push to blocking thread pool - fire-and-forget + // If memtable push fails, data is still safely on disk and memtable can be rebuilt + { + let stream_for_memtable = Arc::clone(self); + let schema_key_for_memtable = schema_key_clone; + let record_for_memtable = record.clone(); + + // TODO: The concat operation at 16384 events is CPU-bound but won't block the request path + tokio::task::spawn_blocking(move || { + let mut guard = match stream_for_memtable.writer.lock() { + Ok(guard) => guard, + Err(poisoned) => { + error!( + "Writer lock poisoned while pushing to memtable for stream {}", + stream_for_memtable.stream_name + ); + poisoned.into_inner() + } + }; + // Push to memtable - concat happens here at 16384 events + guard.mem.push(&schema_key_for_memtable, &record_for_memtable); + }); + } Ok(()) } @@ -1328,15 +1385,43 @@ mod tests { ], ) .unwrap(); - staging - .push( - "abc", - &batch, - time, - &HashMap::new(), - StreamType::UserDefined, - ) - .unwrap(); + + // For sync tests - create a new runtime + let rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(staging.push( + "abc", + &batch, + time, + &HashMap::new(), + StreamType::UserDefined, + )).unwrap(); + staging.flush(true); + } + + // Async version for async tests (like #[tokio::test]) + async fn write_log_async(staging: &StreamRef, schema: &Schema, mins: i64) { + let time: NaiveDateTime = Utc::now() + .checked_sub_signed(TimeDelta::minutes(mins)) + .unwrap() + .naive_utc(); + let batch = RecordBatch::try_new( + Arc::new(schema.clone()), + vec![ + Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3])), + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(StringArray::from(vec!["a", "b", "c"])), + ], + ) + .unwrap(); + + // Direct await - we're already in an async context + staging.push( + "abc", + &batch, + time, + &HashMap::new(), + StreamType::UserDefined, + ).await.unwrap(); staging.flush(true); } @@ -1468,11 +1553,11 @@ mod tests { // 2 logs in the previous minutes for i in 0..2 { - write_log(&staging, &schema, i); + write_log_async(&staging, &schema, i).await; } sleep(Duration::from_secs(60)).await; - write_log(&staging, &schema, 0); + write_log_async(&staging, &schema, 0).await; // verify the arrow files exist in staging assert_eq!(staging.arrow_files().len(), 3); diff --git a/src/storage/field_stats.rs b/src/storage/field_stats.rs index a0cc46a45..6201d34fe 100644 --- a/src/storage/field_stats.rs +++ b/src/storage/field_stats.rs @@ -162,7 +162,7 @@ pub async fn calculate_field_stats( StreamType::Internal, &p_custom_fields, )? - .process()?; + .process().await?; } Ok(stats_calculated) }