Skip to content

Commit

Permalink
Refactor XLogReadRecord(), adding XLogBeginRead() function.
Browse files Browse the repository at this point in the history
The signature of XLogReadRecord() required the caller to pass the starting
WAL position as argument, or InvalidXLogRecPtr to continue reading at the
end of previous record. That's slightly awkward to the callers, as most
of them don't want to randomly jump around in the WAL stream, but start
reading at one position and then read everything from that point onwards.
Remove the 'RecPtr' argument and add a new function XLogBeginRead() to
specify the starting position instead. That's more convenient for the
callers. Also, xlogreader holds state that is reset when you change the
starting position, so having a separate function for doing that feels like
a more natural fit.

This changes XLogFindNextRecord() function so that it doesn't reset the
xlogreader's state to what it was before the call anymore. Instead, it
positions the xlogreader to the found record, like XLogBeginRead().

Reviewed-by: Kyotaro Horiguchi, Alvaro Herrera
Discussion: https://www.postgresql.org/message-id/5382a7a3-debe-be31-c860-cb810c08f366%40iki.fi
  • Loading branch information
hlinnaka committed Jan 26, 2020
1 parent 1001368 commit 38a9573
Show file tree
Hide file tree
Showing 10 changed files with 102 additions and 97 deletions.
3 changes: 2 additions & 1 deletion src/backend/access/transam/twophase.c
Original file line number Diff line number Diff line change
Expand Up @@ -1338,7 +1338,8 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
errmsg("out of memory"),
errdetail("Failed while allocating a WAL reading processor.")));

record = XLogReadRecord(xlogreader, lsn, &errormsg);
XLogBeginRead(xlogreader, lsn);
record = XLogReadRecord(xlogreader, &errormsg);
if (record == NULL)
ereport(ERROR,
(errcode_for_file_access(),
Expand Down
36 changes: 19 additions & 17 deletions src/backend/access/transam/xlog.c
Original file line number Diff line number Diff line change
Expand Up @@ -897,7 +897,7 @@ static void UpdateLastRemovedPtr(char *filename);
static void ValidateXLOGDirectoryStructure(void);
static void CleanupBackupHistory(void);
static void UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force);
static XLogRecord *ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr,
static XLogRecord *ReadRecord(XLogReaderState *xlogreader,
int emode, bool fetching_ckpt);
static void CheckRecoveryConsistency(void);
static XLogRecord *ReadCheckpointRecord(XLogReaderState *xlogreader,
Expand Down Expand Up @@ -4246,17 +4246,17 @@ CleanupBackupHistory(void)
}

/*
* Attempt to read an XLOG record.
* Attempt to read the next XLOG record.
*
* If RecPtr is valid, try to read a record at that position. Otherwise
* try to read a record just after the last one previously read.
* Before first call, the reader needs to be positioned to the first record
* by calling XLogBeginRead().
*
* If no valid record is available, returns NULL, or fails if emode is PANIC.
* (emode must be either PANIC, LOG). In standby mode, retries until a valid
* record is available.
*/
static XLogRecord *
ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode,
ReadRecord(XLogReaderState *xlogreader, int emode,
bool fetching_ckpt)
{
XLogRecord *record;
Expand All @@ -4265,7 +4265,7 @@ ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode,
/* Pass through parameters to XLogPageRead */
private->fetching_ckpt = fetching_ckpt;
private->emode = emode;
private->randAccess = (RecPtr != InvalidXLogRecPtr);
private->randAccess = (xlogreader->ReadRecPtr != InvalidXLogRecPtr);

/* This is the first attempt to read this page. */
lastSourceFailed = false;
Expand All @@ -4274,7 +4274,7 @@ ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode,
{
char *errormsg;

record = XLogReadRecord(xlogreader, RecPtr, &errormsg);
record = XLogReadRecord(xlogreader, &errormsg);
ReadRecPtr = xlogreader->ReadRecPtr;
EndRecPtr = xlogreader->EndRecPtr;
if (record == NULL)
Expand All @@ -4292,8 +4292,7 @@ ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode,
* shouldn't loop anymore in that case.
*/
if (errormsg)
ereport(emode_for_corrupt_record(emode,
RecPtr ? RecPtr : EndRecPtr),
ereport(emode_for_corrupt_record(emode, EndRecPtr),
(errmsg_internal("%s", errormsg) /* already translated */ ));
}

Expand All @@ -4311,8 +4310,7 @@ ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode,
wal_segment_size);
XLogFileName(fname, xlogreader->seg.ws_tli, segno,
wal_segment_size);
ereport(emode_for_corrupt_record(emode,
RecPtr ? RecPtr : EndRecPtr),
ereport(emode_for_corrupt_record(emode, EndRecPtr),
(errmsg("unexpected timeline ID %u in log segment %s, offset %u",
xlogreader->latestPageTLI,
fname,
Expand Down Expand Up @@ -6427,7 +6425,8 @@ StartupXLOG(void)
*/
if (checkPoint.redo < checkPointLoc)
{
if (!ReadRecord(xlogreader, checkPoint.redo, LOG, false))
XLogBeginRead(xlogreader, checkPoint.redo);
if (!ReadRecord(xlogreader, LOG, false))
ereport(FATAL,
(errmsg("could not find redo location referenced by checkpoint record"),
errhint("If you are restoring from a backup, touch \"%s/recovery.signal\" and add required recovery options.\n"
Expand Down Expand Up @@ -7034,12 +7033,13 @@ StartupXLOG(void)
if (checkPoint.redo < RecPtr)
{
/* back up to find the record */
record = ReadRecord(xlogreader, checkPoint.redo, PANIC, false);
XLogBeginRead(xlogreader, checkPoint.redo);
record = ReadRecord(xlogreader, PANIC, false);
}
else
{
/* just have to read next record after CheckPoint */
record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false);
record = ReadRecord(xlogreader, LOG, false);
}

if (record != NULL)
Expand Down Expand Up @@ -7263,7 +7263,7 @@ StartupXLOG(void)
}

/* Else, try to fetch the next WAL record */
record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false);
record = ReadRecord(xlogreader, LOG, false);
} while (record != NULL);

/*
Expand Down Expand Up @@ -7365,7 +7365,8 @@ StartupXLOG(void)
* Re-fetch the last valid or last applied record, so we can identify the
* exact endpoint of what we consider the valid portion of WAL.
*/
record = ReadRecord(xlogreader, LastRec, PANIC, false);
XLogBeginRead(xlogreader, LastRec);
record = ReadRecord(xlogreader, PANIC, false);
EndOfLog = EndRecPtr;

/*
Expand Down Expand Up @@ -8094,7 +8095,8 @@ ReadCheckpointRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr,
return NULL;
}

record = ReadRecord(xlogreader, RecPtr, LOG, true);
XLogBeginRead(xlogreader, RecPtr);
record = ReadRecord(xlogreader, LOG, true);

if (record == NULL)
{
Expand Down
79 changes: 50 additions & 29 deletions src/backend/access/transam/xlogreader.c
Original file line number Diff line number Diff line change
Expand Up @@ -218,11 +218,34 @@ WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
snprintf(segcxt->ws_dir, MAXPGPATH, "%s", waldir);
}

/*
* Begin reading WAL at 'RecPtr'.
*
* 'RecPtr' should point to the beginnning of a valid WAL record. Pointing at
* the beginning of a page is also OK, if there is a new record right after
* the page header, i.e. not a continuation.
*
* This does not make any attempt to read the WAL yet, and hence cannot fail.
* If the starting address is not correct, the first call to XLogReadRecord()
* will error out.
*/
void
XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
{
Assert(!XLogRecPtrIsInvalid(RecPtr));

ResetDecoder(state);

/* Begin at the passed-in record pointer. */
state->EndRecPtr = RecPtr;
state->ReadRecPtr = InvalidXLogRecPtr;
}

/*
* Attempt to read an XLOG record.
*
* If RecPtr is valid, try to read a record at that position. Otherwise
* try to read a record just after the last one previously read.
* XLogBeginRead() or XLogFindNextRecord() must be called before the first call
* to XLogReadRecord().
*
* If the read_page callback fails to read the requested data, NULL is
* returned. The callback is expected to have reported the error; errormsg
Expand All @@ -235,8 +258,9 @@ WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
* valid until the next call to XLogReadRecord.
*/
XLogRecord *
XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
XLogReadRecord(XLogReaderState *state, char **errormsg)
{
XLogRecPtr RecPtr;
XLogRecord *record;
XLogRecPtr targetPagePtr;
bool randAccess;
Expand All @@ -260,28 +284,26 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)

ResetDecoder(state);

if (RecPtr == InvalidXLogRecPtr)
{
/* No explicit start point; read the record after the one we just read */
RecPtr = state->EndRecPtr;
RecPtr = state->EndRecPtr;

if (state->ReadRecPtr == InvalidXLogRecPtr)
randAccess = true;
if (state->ReadRecPtr != InvalidXLogRecPtr)
{
/* read the record after the one we just read */

/*
* RecPtr is pointing to end+1 of the previous WAL record. If we're
* at a page boundary, no more records can fit on the current page. We
* must skip over the page header, but we can't do that until we've
* read in the page, since the header size is variable.
* EndRecPtr is pointing to end+1 of the previous WAL record. If
* we're at a page boundary, no more records can fit on the current
* page. We must skip over the page header, but we can't do that until
* we've read in the page, since the header size is variable.
*/
}
else
{
/*
* Caller supplied a position to start at.
*
* In this case, the passed-in record pointer should already be
* pointing to a valid record starting position.
* In this case, EndRecPtr should already be pointing to a valid
* record starting position.
*/
Assert(XRecOffIsValid(RecPtr));
randAccess = true;
Expand Down Expand Up @@ -899,14 +921,17 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
/*
* Find the first record with an lsn >= RecPtr.
*
* Useful for checking whether RecPtr is a valid xlog address for reading, and
* to find the first valid address after some address when dumping records for
* debugging purposes.
* This is different from XLogBeginRead() in that RecPtr doesn't need to point
* to a valid record boundary. Useful for checking whether RecPtr is a valid
* xlog address for reading, and to find the first valid address after some
* address when dumping records for debugging purposes.
*
* This positions the reader, like XLogBeginRead(), so that the next call to
* XLogReadRecord() will read the next valid record.
*/
XLogRecPtr
XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
{
XLogReaderState saved_state = *state;
XLogRecPtr tmpRecPtr;
XLogRecPtr found = InvalidXLogRecPtr;
XLogPageHeader header;
Expand Down Expand Up @@ -991,27 +1016,23 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
* because either we're at the first record after the beginning of a page
* or we just jumped over the remaining data of a continuation.
*/
while (XLogReadRecord(state, tmpRecPtr, &errormsg) != NULL)
XLogBeginRead(state, tmpRecPtr);
while (XLogReadRecord(state, &errormsg) != NULL)
{
/* continue after the record */
tmpRecPtr = InvalidXLogRecPtr;

/* past the record we've found, break out */
if (RecPtr <= state->ReadRecPtr)
{
/* Rewind the reader to the beginning of the last record. */
found = state->ReadRecPtr;
goto out;
XLogBeginRead(state, found);
return found;
}
}

err:
out:
/* Reset state to what we had before finding the record */
state->ReadRecPtr = saved_state.ReadRecPtr;
state->EndRecPtr = saved_state.EndRecPtr;
XLogReaderInvalReadState(state);

return found;
return InvalidXLogRecPtr;
}

#endif /* FRONTEND */
Expand Down
7 changes: 2 additions & 5 deletions src/backend/replication/logical/logical.c
Original file line number Diff line number Diff line change
Expand Up @@ -461,11 +461,10 @@ DecodingContextReady(LogicalDecodingContext *ctx)
void
DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
{
XLogRecPtr startptr;
ReplicationSlot *slot = ctx->slot;

/* Initialize from where to start reading WAL. */
startptr = slot->data.restart_lsn;
XLogBeginRead(ctx->reader, slot->data.restart_lsn);

elog(DEBUG1, "searching for logical decoding starting point, starting at %X/%X",
(uint32) (slot->data.restart_lsn >> 32),
Expand All @@ -478,14 +477,12 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
char *err = NULL;

/* the read_page callback waits for new WAL */
record = XLogReadRecord(ctx->reader, startptr, &err);
record = XLogReadRecord(ctx->reader, &err);
if (err)
elog(ERROR, "%s", err);
if (!record)
elog(ERROR, "no record found"); /* shouldn't happen */

startptr = InvalidXLogRecPtr;

LogicalDecodingProcessRecord(ctx, ctx->reader);

/* only continue till we found a consistent spot */
Expand Down
14 changes: 3 additions & 11 deletions src/backend/replication/logical/logicalfuncs.c
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
MemoryContext per_query_ctx;
MemoryContext oldcontext;
XLogRecPtr end_of_wal;
XLogRecPtr startptr;
LogicalDecodingContext *ctx;
ResourceOwner old_resowner = CurrentResourceOwner;
ArrayType *arr;
Expand Down Expand Up @@ -269,28 +268,21 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
* xacts that committed after the slot's confirmed_flush can be
* accumulated into reorder buffers.
*/
startptr = MyReplicationSlot->data.restart_lsn;
XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn);

/* invalidate non-timetravel entries */
InvalidateSystemCaches();

/* Decode until we run out of records */
while ((startptr != InvalidXLogRecPtr && startptr < end_of_wal) ||
(ctx->reader->EndRecPtr != InvalidXLogRecPtr && ctx->reader->EndRecPtr < end_of_wal))
while (ctx->reader->EndRecPtr < end_of_wal)
{
XLogRecord *record;
char *errm = NULL;

record = XLogReadRecord(ctx->reader, startptr, &errm);
record = XLogReadRecord(ctx->reader, &errm);
if (errm)
elog(ERROR, "%s", errm);

/*
* Now that we've set up the xlog reader state, subsequent calls
* pass InvalidXLogRecPtr to say "continue from last record"
*/
startptr = InvalidXLogRecPtr;

/*
* The {begin_txn,change,commit_txn}_wrapper callbacks above will
* store the description into our tuplestore.
Expand Down
13 changes: 3 additions & 10 deletions src/backend/replication/slotfuncs.c
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,6 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
{
LogicalDecodingContext *ctx;
ResourceOwner old_resowner = CurrentResourceOwner;
XLogRecPtr startlsn;
XLogRecPtr retlsn;

PG_TRY();
Expand All @@ -411,7 +410,7 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
* Start reading at the slot's restart_lsn, which we know to point to
* a valid record.
*/
startlsn = MyReplicationSlot->data.restart_lsn;
XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn);

/* Initialize our return value in case we don't do anything */
retlsn = MyReplicationSlot->data.confirmed_flush;
Expand All @@ -420,10 +419,7 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
InvalidateSystemCaches();

/* Decode at least one record, until we run out of records */
while ((!XLogRecPtrIsInvalid(startlsn) &&
startlsn < moveto) ||
(!XLogRecPtrIsInvalid(ctx->reader->EndRecPtr) &&
ctx->reader->EndRecPtr < moveto))
while (ctx->reader->EndRecPtr < moveto)
{
char *errm = NULL;
XLogRecord *record;
Expand All @@ -432,13 +428,10 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
* Read records. No changes are generated in fast_forward mode,
* but snapbuilder/slot statuses are updated properly.
*/
record = XLogReadRecord(ctx->reader, startlsn, &errm);
record = XLogReadRecord(ctx->reader, &errm);
if (errm)
elog(ERROR, "%s", errm);

/* Read sequentially from now on */
startlsn = InvalidXLogRecPtr;

/*
* Process the record. Storage-level changes are ignored in
* fast_forward mode, but other modules (such as snapbuilder)
Expand Down

0 comments on commit 38a9573

Please sign in to comment.