@@ -317,10 +317,11 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
317317}
318318
319319/*
320- * Helper function for advancing physical replication slot forward.
321- * The LSN position to move to is compared simply to the slot's
322- * restart_lsn, knowing that any position older than that would be
323- * removed by successive checkpoints.
320+ * Helper function for advancing our physical replication slot forward.
321+ *
322+ * The LSN position to move to is compared simply to the slot's restart_lsn,
323+ * knowing that any position older than that would be removed by successive
324+ * checkpoints.
324325 */
325326static XLogRecPtr
326327pg_physical_replication_slot_advance (XLogRecPtr moveto )
@@ -340,76 +341,97 @@ pg_physical_replication_slot_advance(XLogRecPtr moveto)
340341}
341342
342343/*
343- * Helper function for advancing logical replication slot forward.
344+ * Helper function for advancing our logical replication slot forward.
345+ *
344346 * The slot's restart_lsn is used as start point for reading records,
345347 * while confirmed_lsn is used as base point for the decoding context.
346- * The LSN position to move to is checked by doing a per-record scan and
347- * logical decoding which makes sure that confirmed_lsn is updated to a
348- * LSN which allows the future slot consumer to get consistent logical
349- * changes.
348+ *
349+ * We cannot just do LogicalConfirmReceivedLocation to update confirmed_flush,
350+ * because we need to digest WAL to advance restart_lsn allowing to recycle
351+ * WAL and removal of old catalog tuples. As decoding is done in fast_forward
352+ * mode, no changes are generated anyway.
350353 */
351354static XLogRecPtr
352355pg_logical_replication_slot_advance (XLogRecPtr moveto )
353356{
354357 LogicalDecodingContext * ctx ;
355358 ResourceOwner old_resowner = CurrentResourceOwner ;
356- XLogRecPtr startlsn = MyReplicationSlot -> data . restart_lsn ;
357- XLogRecPtr retlsn = MyReplicationSlot -> data . confirmed_flush ;
359+ XLogRecPtr startlsn ;
360+ XLogRecPtr retlsn ;
358361
359362 PG_TRY ();
360363 {
361- /* restart at slot's confirmed_flush */
364+ /*
365+ * Create our decoding context in fast_forward mode, passing start_lsn
366+ * as InvalidXLogRecPtr, so that we start processing from my slot's
367+ * confirmed_flush.
368+ */
362369 ctx = CreateDecodingContext (InvalidXLogRecPtr ,
363370 NIL ,
364- true,
371+ true, /* fast_forward */
365372 logical_read_local_xlog_page ,
366373 NULL , NULL , NULL );
367374
368- CurrentResourceOwner = ResourceOwnerCreate (CurrentResourceOwner ,
369- "logical decoding" );
375+ /*
376+ * Start reading at the slot's restart_lsn, which we know to point to
377+ * a valid record.
378+ */
379+ startlsn = MyReplicationSlot -> data .restart_lsn ;
380+
381+ /* Initialize our return value in case we don't do anything */
382+ retlsn = MyReplicationSlot -> data .confirmed_flush ;
370383
371384 /* invalidate non-timetravel entries */
372385 InvalidateSystemCaches ();
373386
374- /* Decode until we run out of records */
375- while ((startlsn != InvalidXLogRecPtr && startlsn < moveto ) ||
376- (ctx -> reader -> EndRecPtr != InvalidXLogRecPtr && ctx -> reader -> EndRecPtr < moveto ))
387+ /* Decode at least one record, until we run out of records */
388+ while ((!XLogRecPtrIsInvalid (startlsn ) &&
389+ startlsn < moveto ) ||
390+ (!XLogRecPtrIsInvalid (ctx -> reader -> EndRecPtr ) &&
391+ ctx -> reader -> EndRecPtr < moveto ))
377392 {
378- XLogRecord * record ;
379393 char * errm = NULL ;
394+ XLogRecord * record ;
380395
396+ /*
397+ * Read records. No changes are generated in fast_forward mode,
398+ * but snapbuilder/slot statuses are updated properly.
399+ */
381400 record = XLogReadRecord (ctx -> reader , startlsn , & errm );
382401 if (errm )
383402 elog (ERROR , "%s" , errm );
384403
385- /*
386- * Now that we've set up the xlog reader state, subsequent calls
387- * pass InvalidXLogRecPtr to say "continue from last record"
388- */
404+ /* Read sequentially from now on */
389405 startlsn = InvalidXLogRecPtr ;
390406
391407 /*
392- * The {begin_txn,change,commit_txn}_wrapper callbacks above will
393- * store the description into our tuplestore.
408+ * Process the record. Storage-level changes are ignored in
409+ * fast_forward mode, but other modules (such as snapbuilder)
410+ * might still have critical updates to do.
394411 */
395- if (record != NULL )
412+ if (record )
396413 LogicalDecodingProcessRecord (ctx , ctx -> reader );
397414
398- /* Stop once the moving point wanted by caller has been reached */
415+ /* Stop once the requested target has been reached */
399416 if (moveto <= ctx -> reader -> EndRecPtr )
400417 break ;
401418
402419 CHECK_FOR_INTERRUPTS ();
403420 }
404421
422+ /*
423+ * Logical decoding could have clobbered CurrentResourceOwner during
424+ * transaction management, so restore the executor's value. (This is
425+ * a kluge, but it's not worth cleaning up right now.)
426+ */
405427 CurrentResourceOwner = old_resowner ;
406428
407429 if (ctx -> reader -> EndRecPtr != InvalidXLogRecPtr )
408430 {
409431 LogicalConfirmReceivedLocation (moveto );
410432
411433 /*
412- * If only the confirmed_flush_lsn has changed the slot won't get
434+ * If only the confirmed_flush LSN has changed the slot won't get
413435 * marked as dirty by the above. Callers on the walsender
414436 * interface are expected to keep track of their own progress and
415437 * don't need it written out. But SQL-interface users cannot
0 commit comments