@@ -135,50 +135,111 @@ impl Stream {
135135 }
136136
137137 // Concatenates record batches and puts them in memory store for each event.
138- pub fn push (
139- & self ,
138+ // This method now defers memtable and disk operations to blocking thread pools.
139+ // Disk write is awaited to ensure durability (data is persisted), while memtable push is fire-and-forget for performance.
140+ // If disk write fails, request fails - ensuring data consistency.
141+ // If memtable push fails, data is still on disk (safe), and memtable can be rebuilt.
142+ pub async fn push (
143+ self : & Arc < Self > ,
140144 schema_key : & str ,
141145 record : & RecordBatch ,
142146 parsed_timestamp : NaiveDateTime ,
143147 custom_partition_values : & HashMap < String , String > ,
144148 stream_type : StreamType ,
145149 ) -> Result < ( ) , StagingError > {
146- let mut guard = match self . writer . lock ( ) {
147- Ok ( guard) => guard,
148- Err ( poisoned) => {
149- error ! (
150- "Writer lock poisoned while ingesting data for stream {}" ,
151- self . stream_name
152- ) ;
153- poisoned. into_inner ( )
154- }
155- } ;
156- if self . options . mode != Mode :: Query || stream_type == StreamType :: Internal {
150+ // Clone data needed for background operations
151+ let record_clone = record. clone ( ) ;
152+ let schema_key_clone = schema_key. to_string ( ) ;
153+ let options_mode = self . options . mode ;
154+
155+ // Defer disk write to blocking thread pool and await it
156+ // This ensures disk write succeeds before we return, maintaining durability
157+ // If disk write fails, request fails - data is not persisted, so we shouldn't return success
158+ if options_mode != Mode :: Query || stream_type == StreamType :: Internal {
157159 let filename =
158160 self . filename_by_partition ( schema_key, parsed_timestamp, custom_partition_values) ;
159- match guard. disk . get_mut ( & filename) {
160- Some ( writer) => {
161- writer. write ( record) ?;
162- }
163- None => {
164- // entry is not present thus we create it
165- std:: fs:: create_dir_all ( & self . data_path ) ?;
161+
162+ let stream_for_disk = Arc :: clone ( self ) ;
163+ let filename_clone = filename. clone ( ) ;
164+ let filename_for_error = filename. clone ( ) ; // Clone for error message
165+ let record_for_disk = record_clone. clone ( ) ;
166+ let parsed_timestamp_for_disk = parsed_timestamp;
167+ let stream_name_clone = self . stream_name . clone ( ) ;
168+
169+ // Await disk write - this is critical for data durability
170+ tokio:: task:: spawn_blocking ( move || {
171+ let mut guard = match stream_for_disk. writer . lock ( ) {
172+ Ok ( guard) => guard,
173+ Err ( poisoned) => {
174+ error ! (
175+ "Writer lock poisoned while writing to disk for stream {}" ,
176+ stream_for_disk. stream_name
177+ ) ;
178+ poisoned. into_inner ( )
179+ }
180+ } ;
181+
182+ match guard. disk . get_mut ( & filename_clone) {
183+ Some ( writer) => {
184+ // Blocking disk write - runs in background thread pool
185+ writer. write ( & record_for_disk)
186+ }
187+ None => {
188+ // Create directory - blocking I/O operation
189+ std:: fs:: create_dir_all ( & stream_for_disk. data_path ) ?;
166190
167- let range = TimeRange :: granularity_range (
168- parsed_timestamp. and_local_timezone ( Utc ) . unwrap ( ) ,
169- OBJECT_STORE_DATA_GRANULARITY ,
170- ) ;
171- let file_path = self . data_path . join ( & filename) ;
172- let mut writer = DiskWriter :: try_new ( file_path, & record. schema ( ) , range)
173- . expect ( "File and RecordBatch both are checked" ) ;
191+ let range = TimeRange :: granularity_range (
192+ parsed_timestamp_for_disk. and_local_timezone ( Utc ) . unwrap ( ) ,
193+ OBJECT_STORE_DATA_GRANULARITY ,
194+ ) ;
195+ let file_path = stream_for_disk. data_path . join ( & filename_clone) ;
196+ let mut writer = DiskWriter :: try_new ( file_path, & record_for_disk. schema ( ) , range) ?;
174197
175- writer. write ( record) ?;
176- guard. disk . insert ( filename, writer) ;
198+ writer. write ( & record_for_disk) ?;
199+ guard. disk . insert ( filename_clone, writer) ;
200+ Ok ( ( ) )
201+ }
177202 }
178- } ;
203+ } )
204+ . await
205+ . map_err ( |e| StagingError :: ObjectStorage ( std:: io:: Error :: new (
206+ std:: io:: ErrorKind :: Other ,
207+ format ! ( "Disk write task failed: {}" , e)
208+ ) ) ) ?
209+ . map_err ( |e| {
210+ error ! (
211+ "Disk write failed for stream {} file {}: {}" ,
212+ stream_name_clone, filename_for_error, e
213+ ) ;
214+ e
215+ } ) ?;
179216 }
180217
181- guard. mem . push ( schema_key, record) ;
218+ // Defer memtable push to blocking thread pool - fire-and-forget for performance
219+ // Memtable is for query performance optimization, not durability
220+ // If memtable push fails, data is still safely on disk and memtable can be rebuilt
221+ {
222+ let stream_for_memtable = Arc :: clone ( self ) ;
223+ let schema_key_for_memtable = schema_key_clone;
224+ let record_for_memtable = record. clone ( ) ;
225+
226+ // Spawn without awaiting - fire and forget for performance
227+ // The concat operation at 16384 events is CPU-bound but won't block the request path
228+ tokio:: task:: spawn_blocking ( move || {
229+ let mut guard = match stream_for_memtable. writer . lock ( ) {
230+ Ok ( guard) => guard,
231+ Err ( poisoned) => {
232+ error ! (
233+ "Writer lock poisoned while pushing to memtable for stream {}" ,
234+ stream_for_memtable. stream_name
235+ ) ;
236+ poisoned. into_inner ( )
237+ }
238+ } ;
239+ // Push to memtable - concat happens here at 16384 events
240+ guard. mem . push ( & schema_key_for_memtable, & record_for_memtable) ;
241+ } ) ;
242+ }
182243
183244 Ok ( ( ) )
184245 }
@@ -1328,15 +1389,15 @@ mod tests {
13281389 ] ,
13291390 )
13301391 . unwrap ( ) ;
1331- staging
1332- . push (
1333- "abc" ,
1334- & batch ,
1335- time ,
1336- & HashMap :: new ( ) ,
1337- StreamType :: UserDefined ,
1338- )
1339- . unwrap ( ) ;
1392+ // Use tokio runtime to call async push from sync test context
1393+ let rt = tokio :: runtime :: Runtime :: new ( ) . unwrap ( ) ;
1394+ rt . block_on ( staging . push (
1395+ "abc" ,
1396+ & batch ,
1397+ time ,
1398+ & HashMap :: new ( ) ,
1399+ StreamType :: UserDefined ,
1400+ ) ) . unwrap ( ) ;
13401401 staging. flush ( true ) ;
13411402 }
13421403
0 commit comments