Skip to content

Commit 790e521

Browse files
committed
refactor
1 parent d0cd1ca commit 790e521

File tree

1 file changed

+6
-10
lines changed

1 file changed

+6
-10
lines changed

src/parseable/streams.rs

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -136,9 +136,9 @@ impl Stream {
136136

137137
// Concatenates record batches and puts them in memory store for each event.
138138
// This method now defers memtable and disk operations to blocking thread pools.
139-
// Disk write is awaited to ensure durability (data is persisted), while memtable push is fire-and-forget for performance.
139+
// Disk write is awaited to ensure durability (data is persisted), while memtable push is fire-and-forget.
140140
// If disk write fails, request fails - ensuring data consistency.
141-
// If memtable push fails, data is still on disk (safe), and memtable can be rebuilt.
141+
// If memtable push fails, data is still on disk
142142
pub async fn push(
143143
self: &Arc<Self>,
144144
schema_key: &str,
@@ -147,7 +147,6 @@ impl Stream {
147147
custom_partition_values: &HashMap<String, String>,
148148
stream_type: StreamType,
149149
) -> Result<(), StagingError> {
150-
// Clone data needed for background operations
151150
let record_clone = record.clone();
152151
let schema_key_clone = schema_key.to_string();
153152
let options_mode = self.options.mode;
@@ -161,12 +160,12 @@ impl Stream {
161160

162161
let stream_for_disk = Arc::clone(self);
163162
let filename_clone = filename.clone();
164-
let filename_for_error = filename.clone(); // Clone for error message
163+
let filename_for_error = filename.clone();
165164
let record_for_disk = record_clone.clone();
166165
let parsed_timestamp_for_disk = parsed_timestamp;
167166
let stream_name_clone = self.stream_name.clone();
168167

169-
// Await disk write - this is critical for data durability
168+
// Await disk write
170169
tokio::task::spawn_blocking(move || {
171170
let mut guard = match stream_for_disk.writer.lock() {
172171
Ok(guard) => guard,
@@ -185,7 +184,6 @@ impl Stream {
185184
writer.write(&record_for_disk)
186185
}
187186
None => {
188-
// Create directory - blocking I/O operation
189187
std::fs::create_dir_all(&stream_for_disk.data_path)?;
190188

191189
let range = TimeRange::granularity_range(
@@ -215,16 +213,14 @@ impl Stream {
215213
})?;
216214
}
217215

218-
// Defer memtable push to blocking thread pool - fire-and-forget for performance
219-
// Memtable is for query performance optimization, not durability
216+
// Defer memtable push to blocking thread pool - fire-and-forget
220217
// If memtable push fails, data is still safely on disk and memtable can be rebuilt
221218
{
222219
let stream_for_memtable = Arc::clone(self);
223220
let schema_key_for_memtable = schema_key_clone;
224221
let record_for_memtable = record.clone();
225222

226-
// Spawn without awaiting - fire and forget for performance
227-
// The concat operation at 16384 events is CPU-bound but won't block the request path
223+
// TODO: The concat operation at 16384 events is CPU-bound but won't block the request path
228224
tokio::task::spawn_blocking(move || {
229225
let mut guard = match stream_for_memtable.writer.lock() {
230226
Ok(guard) => guard,

0 commit comments

Comments
 (0)