Skip to content

Commit

Permalink
Fix recovery_prefetch with low maintenance_io_concurrency.
Browse files Browse the repository at this point in the history
We should process completed IOs *before* trying to start more, so that
it is always possible to decode one more record when the decoded record
queue is empty, even if maintenance_io_concurrency is set so low that a
single earlier WAL record might have saturated the IO queue.

That bug was hidden because the effect of maintenance_io_concurrency was
arbitrarily clamped to be at least 2.  Fix the ordering, and also remove
that clamp.  We need a special case for 0, which is now treated the same
as recovery_prefetch=off, but otherwise the number is used directly.
This allows for testing with 1, which would have made the problem
obvious in simple test scenarios.

Also add an explicit error message for missing contrecords.  It was a
bit strange that we didn't report an error already, and became a latent
bug with prefetching, since the internal state that tracks aborted
contrecords would not survive retrying, as revealed by
026_overwrite_contrecord.pl with this adjustment.  Reporting an error
prevents that.

Back-patch to 15.

Reported-by: Justin Pryzby <pryzby@telsasoft.com>
Reviewed-by: Kyotaro Horiguchi <horikyota.ntt@gmail.com>
Discussion: https://postgr.es/m/20220831140128.GS31833%40telsasoft.com
  • Loading branch information
macdice committed Sep 8, 2022
1 parent 12d40d4 commit adb4661
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 23 deletions.
54 changes: 36 additions & 18 deletions src/backend/access/transam/xlogprefetcher.c
Expand Up @@ -72,7 +72,9 @@
int recovery_prefetch = RECOVERY_PREFETCH_TRY;

#ifdef USE_PREFETCH
#define RecoveryPrefetchEnabled() (recovery_prefetch != RECOVERY_PREFETCH_OFF)
#define RecoveryPrefetchEnabled() \
(recovery_prefetch != RECOVERY_PREFETCH_OFF && \
maintenance_io_concurrency > 0)
#else
#define RecoveryPrefetchEnabled() false
#endif
Expand Down Expand Up @@ -985,6 +987,7 @@ XLogRecord *
XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
{
DecodedXLogRecord *record;
XLogRecPtr replayed_up_to;

/*
* See if it's time to reset the prefetching machinery, because a relevant
Expand All @@ -1000,7 +1003,8 @@ XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)

if (RecoveryPrefetchEnabled())
{
max_inflight = Max(maintenance_io_concurrency, 2);
Assert(maintenance_io_concurrency > 0);
max_inflight = maintenance_io_concurrency;
max_distance = max_inflight * XLOGPREFETCHER_DISTANCE_MULTIPLIER;
}
else
Expand All @@ -1018,14 +1022,34 @@ XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
}

/*
* Release last returned record, if there is one. We need to do this so
* that we can check for empty decode queue accurately.
* Release last returned record, if there is one, as it's now been
* replayed.
*/
XLogReleasePreviousRecord(prefetcher->reader);
replayed_up_to = XLogReleasePreviousRecord(prefetcher->reader);

/* If there's nothing queued yet, then start prefetching. */
/*
* Can we drop any filters yet? If we were waiting for a relation to be
* created or extended, it is now OK to access blocks in the covered
* range.
*/
XLogPrefetcherCompleteFilters(prefetcher, replayed_up_to);

/*
* All IO initiated by earlier WAL is now completed. This might trigger
* further prefetching.
*/
lrq_complete_lsn(prefetcher->streaming_read, replayed_up_to);

/*
* If there's nothing queued yet, then start prefetching to cause at least
* one record to be queued.
*/
if (!XLogReaderHasQueuedRecordOrError(prefetcher->reader))
{
Assert(lrq_inflight(prefetcher->streaming_read) == 0);
Assert(lrq_completed(prefetcher->streaming_read) == 0);
lrq_prefetch(prefetcher->streaming_read);
}

/* Read the next record. */
record = XLogNextRecord(prefetcher->reader, errmsg);
Expand All @@ -1039,12 +1063,13 @@ XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
Assert(record == prefetcher->reader->record);

/*
* Can we drop any prefetch filters yet, given the record we're about to
* return? This assumes that any records with earlier LSNs have been
* replayed, so if we were waiting for a relation to be created or
* extended, it is now OK to access blocks in the covered range.
* If maintenance_io_concurrency is set very low, we might have started
* prefetching some but not all of the blocks referenced in the record
* we're about to return. Forget about the rest of the blocks in this
* record by dropping the prefetcher's reference to it.
*/
XLogPrefetcherCompleteFilters(prefetcher, record->lsn);
if (record == prefetcher->record)
prefetcher->record = NULL;

/*
* See if it's time to compute some statistics, because enough WAL has
Expand All @@ -1053,13 +1078,6 @@ XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
if (unlikely(record->lsn >= prefetcher->next_stats_shm_lsn))
XLogPrefetcherComputeStats(prefetcher);

/*
* The caller is about to replay this record, so we can now report that
* all IO initiated because of early WAL must be finished. This may
* trigger more readahead.
*/
lrq_complete_lsn(prefetcher->streaming_read, record->lsn);

Assert(record == prefetcher->reader->record);

return &record->header;
Expand Down
23 changes: 19 additions & 4 deletions src/backend/access/transam/xlogreader.c
Expand Up @@ -275,22 +275,24 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
}

/*
* See if we can release the last record that was returned by
* XLogNextRecord(), if any, to free up space.
* Release the last record that was returned by XLogNextRecord(), if any, to
* free up space. Returns the LSN past the end of the record.
*/
void
XLogRecPtr
XLogReleasePreviousRecord(XLogReaderState *state)
{
DecodedXLogRecord *record;
XLogRecPtr next_lsn;

if (!state->record)
return;
return InvalidXLogRecPtr;

/*
* Remove it from the decoded record queue. It must be the oldest item
* decoded, decode_queue_head.
*/
record = state->record;
next_lsn = record->next_lsn;
Assert(record == state->decode_queue_head);
state->record = NULL;
state->decode_queue_head = record->next;
Expand Down Expand Up @@ -336,6 +338,8 @@ XLogReleasePreviousRecord(XLogReaderState *state)
state->decode_buffer_tail = state->decode_buffer;
}
}

return next_lsn;
}

/*
Expand Down Expand Up @@ -907,6 +911,17 @@ XLogDecodeNextRecord(XLogReaderState *state, bool nonblocking)
*/
state->abortedRecPtr = RecPtr;
state->missingContrecPtr = targetPagePtr;

/*
* If we got here without reporting an error, report one now so that
* XLogPrefetcherReadRecord() doesn't bring us back a second time and
* clobber the above state. Otherwise, the existing error takes
* precedence.
*/
if (!state->errormsg_buf[0])
report_invalid_record(state,
"missing contrecord at %X/%X",
LSN_FORMAT_ARGS(RecPtr));
}

if (decoded && decoded->oversized)
Expand Down
2 changes: 1 addition & 1 deletion src/include/access/xlogreader.h
Expand Up @@ -363,7 +363,7 @@ extern DecodedXLogRecord *XLogNextRecord(XLogReaderState *state,
char **errormsg);

/* Release the previously returned record, if necessary. */
extern void XLogReleasePreviousRecord(XLogReaderState *state);
extern XLogRecPtr XLogReleasePreviousRecord(XLogReaderState *state);

/* Try to read ahead, if there is data and space. */
extern DecodedXLogRecord *XLogReadAhead(XLogReaderState *state,
Expand Down

0 comments on commit adb4661

Please sign in to comment.