@@ -78,6 +78,7 @@ static void pgaio_worker_shmem_init(bool first_time);
7878
7979static bool pgaio_worker_needs_synchronous_execution (PgAioHandle * ioh );
8080static int pgaio_worker_submit (uint16 num_staged_ios , PgAioHandle * * staged_ios );
81+ static void pgaio_worker_wait_one (PgAioHandle * ioh , uint64 ref_generation );
8182
8283
8384const IoMethodOps pgaio_worker_ops = {
@@ -86,6 +87,7 @@ const IoMethodOps pgaio_worker_ops = {
8687
8788 .needs_synchronous_execution = pgaio_worker_needs_synchronous_execution ,
8889 .submit = pgaio_worker_submit ,
90+ .wait_one = pgaio_worker_wait_one ,
8991};
9092
9193
@@ -124,6 +126,7 @@ pgaio_worker_shmem_size(void)
124126
125127 sz = pgaio_worker_queue_shmem_size (& queue_size );
126128 sz = add_size (sz , pgaio_worker_control_shmem_size ());
129+ sz = add_size (sz , pgaio_cq_shmem_size ());
127130
128131 return sz ;
129132}
@@ -134,6 +137,8 @@ pgaio_worker_shmem_init(bool first_time)
134137 bool found ;
135138 int queue_size ;
136139
140+ pgaio_cq_shmem_init (first_time );
141+
137142 io_worker_submission_queue =
138143 ShmemInitStruct ("AioWorkerSubmissionQueue" ,
139144 pgaio_worker_queue_shmem_size (& queue_size ),
@@ -284,7 +289,7 @@ pgaio_worker_submit_internal(int nios, PgAioHandle *ios[])
284289 {
285290 for (int i = 0 ; i < nsync ; ++ i )
286291 {
287- pgaio_io_perform_synchronously (synchronous_ios [i ]);
292+ pgaio_io_perform_synchronously (synchronous_ios [i ], false );
288293 }
289294 }
290295}
@@ -297,13 +302,55 @@ pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
297302 PgAioHandle * ioh = staged_ios [i ];
298303
299304 pgaio_io_prepare_submit (ioh );
305+ pgaio_cq_prepare_submit (ioh );
300306 }
301307
302308 pgaio_worker_submit_internal (num_staged_ios , staged_ios );
303309
304310 return num_staged_ios ;
305311}
306312
313+ static void
314+ pgaio_worker_wait_one (PgAioHandle * ioh , uint64 ref_generation )
315+ {
316+ PgAioHandleState state ;
317+ int processed = 0 ;
318+
319+ /*
320+ * Initial check without condition variable churn for the common case that
321+ * the worker has already called pgaio_cq_insert().
322+ */
323+ START_CRIT_SECTION ();
324+ if (!pgaio_cq_in_progress (ioh ))
325+ processed = pgaio_cq_try_process_completion (ioh );
326+ END_CRIT_SECTION ();
327+
328+ if (processed > 0 )
329+ return ;
330+
331+ /*
332+ * Wait for the worker to call pgaio_cq_insert(), or any backend to beat
333+ * us to pgaio_cq_try_process_completion().
334+ */
335+ START_CRIT_SECTION ();
336+ ConditionVariablePrepareToSleep (& ioh -> cv );
337+ while (!pgaio_io_was_recycled (ioh , ref_generation , & state ) &&
338+ state == PGAIO_HS_SUBMITTED )
339+ {
340+ bool in_progress = pgaio_cq_in_progress (ioh );
341+
342+ if (!in_progress && pgaio_cq_try_process_completion (ioh ) > 0 )
343+ break ;
344+
345+ ConditionVariableSleep (& ioh -> cv ,
346+ in_progress ?
347+ WAIT_EVENT_AIO_IO_IPC_EXECUTION :
348+ WAIT_EVENT_AIO_IO_COMPLETION );
349+ }
350+ ConditionVariableCancelSleep ();
351+ END_CRIT_SECTION ();
352+ }
353+
307354/*
308355 * on_shmem_exit() callback that releases the worker's slot in
309356 * io_worker_control.
@@ -440,7 +487,7 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
440487 errno = error_errno ;
441488
442489 START_CRIT_SECTION ();
443- pgaio_io_process_completion (error_ioh , - error_errno );
490+ pgaio_cq_insert (error_ioh , - error_errno , true );
444491 END_CRIT_SECTION ();
445492 }
446493
@@ -495,6 +542,7 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
495542 if (io_index != UINT32_MAX )
496543 {
497544 PgAioHandle * ioh = NULL ;
545+ int32 result ;
498546
499547 ioh = & pgaio_ctl -> io_handles [io_index ];
500548 error_ioh = ioh ;
@@ -550,11 +598,13 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
550598
551599 /*
552600 * We don't expect this to ever fail with ERROR or FATAL, no need
553- * to keep error_ioh set to the IO.
554- * pgaio_io_perform_synchronously() contains a critical section to
555- * ensure we don't accidentally fail.
601+ * to keep error_ioh set to the IO. We hand the result off to
602+ * regular backends to process.
556603 */
557- pgaio_io_perform_synchronously (ioh );
604+ START_CRIT_SECTION ();
605+ result = pgaio_io_perform_synchronously (ioh , false);
606+ pgaio_cq_insert (ioh , result , true);
607+ END_CRIT_SECTION ();
558608
559609 RESUME_INTERRUPTS ();
560610 errcallback .arg = NULL ;
0 commit comments