Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/connectors/kafka/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ impl Processor<Vec<ConsumerRecord>, ()> for ParseableSinkProcessor {
let len = records.len();
debug!("Processing {len} records");

self.build_event_from_chunk(&records).await?.process()?;
self.build_event_from_chunk(&records).await?.process().await?;

debug!("Processed {len} records");
Ok(())
Expand Down
10 changes: 6 additions & 4 deletions src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ pub struct Event {

// Events holds the schema related to a each event for a single log stream
impl Event {
pub fn process(self) -> Result<(), EventError> {
pub async fn process(self) -> Result<(), EventError> {
let mut key = get_schema_key(&self.rb.schema().fields);
if self.time_partition.is_some() {
let parsed_timestamp_to_min = self.parsed_timestamp.format("%Y%m%dT%H%M").to_string();
Expand All @@ -73,13 +73,14 @@ impl Event {
commit_schema(&self.stream_name, self.rb.schema())?;
}

// Await async push - memtable push is awaited, disk write is fire-and-forget
PARSEABLE.get_or_create_stream(&self.stream_name).push(
&key,
&self.rb,
self.parsed_timestamp,
&self.custom_partition_values,
self.stream_type,
)?;
).await?;

update_stats(
&self.stream_name,
Expand All @@ -99,16 +100,17 @@ impl Event {
Ok(())
}

pub fn process_unchecked(&self) -> Result<(), EventError> {
pub async fn process_unchecked(&self) -> Result<(), EventError> {
let key = get_schema_key(&self.rb.schema().fields);

// Await async push
PARSEABLE.get_or_create_stream(&self.stream_name).push(
&key,
&self.rb,
self.parsed_timestamp,
&self.custom_partition_values,
self.stream_type,
)?;
).await?;

Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<
StreamType::Internal,
&p_custom_fields,
)?
.process()?;
.process().await?;

Ok(())
}
Expand Down Expand Up @@ -416,7 +416,7 @@ pub async fn push_logs_unchecked(
custom_partition_values: HashMap::new(), // should be an empty map for unchecked push
stream_type: StreamType::UserDefined,
};
unchecked_event.process_unchecked()?;
unchecked_event.process_unchecked().await?;

Ok(unchecked_event)
}
Expand Down
2 changes: 1 addition & 1 deletion src/handlers/http/modal/utils/ingest_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ pub async fn push_logs(
StreamType::UserDefined,
p_custom_fields,
)?
.process()?;
.process().await?;
}
Ok(())
}
Expand Down
169 changes: 127 additions & 42 deletions src/parseable/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,50 +135,107 @@ impl Stream {
}

// Concatenates record batches and puts them in memory store for each event.
pub fn push(
&self,
// This method now defers memtable and disk operations to blocking thread pools.
// Disk write is awaited to ensure durability (data is persisted), while memtable push is fire-and-forget.
// If disk write fails, request fails - ensuring data consistency.
// If memtable push fails, data is still on disk
pub async fn push(
self: &Arc<Self>,
schema_key: &str,
record: &RecordBatch,
parsed_timestamp: NaiveDateTime,
custom_partition_values: &HashMap<String, String>,
stream_type: StreamType,
) -> Result<(), StagingError> {
let mut guard = match self.writer.lock() {
Ok(guard) => guard,
Err(poisoned) => {
error!(
"Writer lock poisoned while ingesting data for stream {}",
self.stream_name
);
poisoned.into_inner()
}
};
if self.options.mode != Mode::Query || stream_type == StreamType::Internal {
let record_clone = record.clone();
let schema_key_clone = schema_key.to_string();
let options_mode = self.options.mode;

// Defer disk write to blocking thread pool and await it
// This ensures disk write succeeds before we return, maintaining durability
// If disk write fails, request fails - data is not persisted, so we shouldn't return success
if options_mode != Mode::Query || stream_type == StreamType::Internal {
let filename =
self.filename_by_partition(schema_key, parsed_timestamp, custom_partition_values);
match guard.disk.get_mut(&filename) {
Some(writer) => {
writer.write(record)?;
}
None => {
// entry is not present thus we create it
std::fs::create_dir_all(&self.data_path)?;

let stream_for_disk = Arc::clone(self);
let filename_clone = filename.clone();
let filename_for_error = filename.clone();
let record_for_disk = record_clone.clone();
let parsed_timestamp_for_disk = parsed_timestamp;
let stream_name_clone = self.stream_name.clone();

// Await disk write
tokio::task::spawn_blocking(move || {
let mut guard = match stream_for_disk.writer.lock() {
Ok(guard) => guard,
Err(poisoned) => {
error!(
"Writer lock poisoned while writing to disk for stream {}",
stream_for_disk.stream_name
);
poisoned.into_inner()
}
};
Comment on lines +170 to +179
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Lock poisoning should fail the operation, not continue with potentially corrupt state.

Using into_inner() after a poisoned lock allows the operation to proceed with potentially corrupt state. For data durability guarantees, this should fail fast.

Apply this diff to fail on poisoned locks:

             tokio::task::spawn_blocking(move || {
-                let mut guard = match stream_for_disk.writer.lock() {
-                    Ok(guard) => guard,
-                    Err(poisoned) => {
-                        error!(
-                            "Writer lock poisoned while writing to disk for stream {}",
-                            stream_for_disk.stream_name
-                        );
-                        poisoned.into_inner()
-                    }
-                };
+                let mut guard = stream_for_disk.writer.lock()
+                    .map_err(|e| StagingError::ObjectStorage(std::io::Error::new(
+                        std::io::ErrorKind::Other,
+                        format!("Writer lock poisoned for stream {}: {}", stream_for_disk.stream_name, e)
+                    )))?;

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In src/parseable/streams.rs around lines 171-180, the match on
stream_for_disk.writer.lock() currently calls poisoned.into_inner(), which
continues with potentially corrupted state; instead, do not recover the poisoned
lock — log the poisoning and fail the operation by converting the PoisonError
into an error return (or propagate it) so the caller sees failure (e.g., map the
PoisonError to an appropriate Err value and return early or use the ? operator);
if the enclosing function does not return a Result, change its signature to
return Result and propagate the error (or explicitly panic) so the code does not
continue on a poisoned lock.


match guard.disk.get_mut(&filename_clone) {
Some(writer) => {
// Blocking disk write - runs in background thread pool
writer.write(&record_for_disk)
}
None => {
std::fs::create_dir_all(&stream_for_disk.data_path)?;

let range = TimeRange::granularity_range(
parsed_timestamp.and_local_timezone(Utc).unwrap(),
OBJECT_STORE_DATA_GRANULARITY,
);
let file_path = self.data_path.join(&filename);
let mut writer = DiskWriter::try_new(file_path, &record.schema(), range)
.expect("File and RecordBatch both are checked");
let range = TimeRange::granularity_range(
parsed_timestamp_for_disk.and_local_timezone(Utc).unwrap(),
OBJECT_STORE_DATA_GRANULARITY,
);
let file_path = stream_for_disk.data_path.join(&filename_clone);
let mut writer = DiskWriter::try_new(file_path, &record_for_disk.schema(), range)?;

writer.write(record)?;
guard.disk.insert(filename, writer);
writer.write(&record_for_disk)?;
guard.disk.insert(filename_clone, writer);
Ok(())
}
}
};
})
.await
.map_err(|e| StagingError::ObjectStorage(std::io::Error::new(
std::io::ErrorKind::Other,
format!("Disk write task failed: {}", e)
)))?
.map_err(|e| {
error!(
"Disk write failed for stream {} file {}: {}",
stream_name_clone, filename_for_error, e
);
e
})?;
}

guard.mem.push(schema_key, record);
// Defer memtable push to blocking thread pool - fire-and-forget
// If memtable push fails, data is still safely on disk and memtable can be rebuilt
{
let stream_for_memtable = Arc::clone(self);
let schema_key_for_memtable = schema_key_clone;
let record_for_memtable = record.clone();

// TODO: The concat operation at 16384 events is CPU-bound but won't block the request path
tokio::task::spawn_blocking(move || {
let mut guard = match stream_for_memtable.writer.lock() {
Ok(guard) => guard,
Err(poisoned) => {
error!(
"Writer lock poisoned while pushing to memtable for stream {}",
stream_for_memtable.stream_name
);
poisoned.into_inner()
}
Comment on lines +225 to +233
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Same lock poisoning issue: should fail instead of continuing.

Identical to the disk write path, using into_inner() after lock poisoning may propagate corrupt state. While memtable is fire-and-forget, it should still fail cleanly.

Apply similar fix as suggested for disk write path - return early or log and skip the operation:

             tokio::task::spawn_blocking(move || {
-                let mut guard = match stream_for_memtable.writer.lock() {
-                    Ok(guard) => guard,
-                    Err(poisoned) => {
-                        error!(
-                            "Writer lock poisoned while pushing to memtable for stream {}",
-                            stream_for_memtable.stream_name
-                        );
-                        poisoned.into_inner()
-                    }
-                };
+                let mut guard = match stream_for_memtable.writer.lock() {
+                    Ok(guard) => guard,
+                    Err(e) => {
+                        error!(
+                            "Writer lock poisoned while pushing to memtable for stream {}: {}. Skipping memtable update.",
+                            stream_for_memtable.stream_name, e
+                        );
+                        return;
+                    }
+                };
🤖 Prompt for AI Agents
In src/parseable/streams.rs around lines 229 to 237, the code currently calls
poisoned.into_inner() on a poisoned writer lock for the memtable which can
propagate corrupted state; instead, treat a poisoned lock as an operation
failure and return early (or propagate an error) rather than continuing. Replace
the Err(poisoned) branch so it logs the poisoning with stream context and then
returns an Err (or a suitable early return/skip path consistent with the
surrounding function's error handling) instead of calling into_inner(); ensure
the function's signature and callers handle the propagated error or skip
accordingly.

};
// Push to memtable - concat happens here at 16384 events
guard.mem.push(&schema_key_for_memtable, &record_for_memtable);
});
}

Ok(())
}
Expand Down Expand Up @@ -1328,15 +1385,43 @@ mod tests {
],
)
.unwrap();
staging
.push(
"abc",
&batch,
time,
&HashMap::new(),
StreamType::UserDefined,
)
.unwrap();

// For sync tests - create a new runtime
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(staging.push(
"abc",
&batch,
time,
&HashMap::new(),
StreamType::UserDefined,
)).unwrap();
staging.flush(true);
}

// Async version for async tests (like #[tokio::test])
async fn write_log_async(staging: &StreamRef, schema: &Schema, mins: i64) {
let time: NaiveDateTime = Utc::now()
.checked_sub_signed(TimeDelta::minutes(mins))
.unwrap()
.naive_utc();
let batch = RecordBatch::try_new(
Arc::new(schema.clone()),
vec![
Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3])),
Arc::new(Int32Array::from(vec![1, 2, 3])),
Arc::new(StringArray::from(vec!["a", "b", "c"])),
],
)
.unwrap();

// Direct await - we're already in an async context
staging.push(
"abc",
&batch,
time,
&HashMap::new(),
StreamType::UserDefined,
).await.unwrap();
staging.flush(true);
}

Expand Down Expand Up @@ -1468,11 +1553,11 @@ mod tests {

// 2 logs in the previous minutes
for i in 0..2 {
write_log(&staging, &schema, i);
write_log_async(&staging, &schema, i).await;
}
sleep(Duration::from_secs(60)).await;

write_log(&staging, &schema, 0);
write_log_async(&staging, &schema, 0).await;

// verify the arrow files exist in staging
assert_eq!(staging.arrow_files().len(), 3);
Expand Down
2 changes: 1 addition & 1 deletion src/storage/field_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ pub async fn calculate_field_stats(
StreamType::Internal,
&p_custom_fields,
)?
.process()?;
.process().await?;
}
Ok(stats_calculated)
}
Expand Down
Loading