Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/backend/access/transam/xlogreader.c
Original file line number Diff line number Diff line change
Expand Up @@ -907,6 +907,9 @@ XLogDecodeNextRecord(XLogReaderState *state, bool nonblocking)
SKIP_INVALID_RECORD(RecPtr);
}

if (state->force_record_reassemble)
memcpy(state->readRecordBuf, record, total_len);

state->NextRecPtr = RecPtr + MAXALIGN(total_len);

state->DecodeRecPtr = RecPtr;
Expand Down
76 changes: 75 additions & 1 deletion src/bin/pg_waldump/pg_waldump.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ static volatile sig_atomic_t time_to_stop = false;

static const RelFileLocator emptyRelFileLocator = {0, 0, 0};

static FILE* save_records_file;

typedef struct XLogDumpPrivate
{
TimeLineID timeline;
Expand Down Expand Up @@ -85,6 +87,7 @@ typedef struct XLogDumpConfig

/* save options */
char *save_fullpage_path;
char *save_records_file_path;
} XLogDumpConfig;


Expand Down Expand Up @@ -864,11 +867,35 @@ usage(void)
printf(_(" -z, --stats[=record] show statistics instead of records\n"
" (optionally, show per-record statistics)\n"));
printf(_(" --save-fullpage=DIR save full page images to DIR\n"));
printf(_(" --save-records=FILE save selected WAL records to the file\n"));
printf(_(" -?, --help show this help, then exit\n"));
printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
}


static void
write_pq_int32(FILE* f, int32 val)
{
val = pg_hton32(val);
fwrite(&val, sizeof(val), 1, f);
}

static void
write_pq_int64(FILE* f, int64 val)
{
val = pg_hton64(val);
fwrite(&val, sizeof(val), 1, f);
}

static void
write_pq_message(FILE* f, char tag, uint32 len)
{
fputc(tag, f);
write_pq_int32(f, len + 4);
}


int
main(int argc, char **argv)
{
Expand Down Expand Up @@ -908,6 +935,7 @@ main(int argc, char **argv)
{"version", no_argument, NULL, 'V'},
{"stats", optional_argument, NULL, 'z'},
{"save-fullpage", required_argument, NULL, 1},
{"save-records", required_argument, NULL, 2},
{NULL, 0, NULL, 0}
};

Expand Down Expand Up @@ -961,6 +989,7 @@ main(int argc, char **argv)
config.filter_by_relation_forknum = InvalidForkNumber;
config.filter_by_fpw = false;
config.save_fullpage_path = NULL;
config.save_records_file_path = NULL;
config.stats = false;
config.stats_per_record = false;
config.ignore_format_errors = false;
Expand Down Expand Up @@ -1179,6 +1208,9 @@ main(int argc, char **argv)
case 1:
config.save_fullpage_path = pg_strdup(optarg);
break;
case 2:
config.save_records_file_path = pg_strdup(optarg);
break;
default:
goto bad_argument;
}
Expand Down Expand Up @@ -1279,6 +1311,9 @@ main(int argc, char **argv)
if (config.save_fullpage_path != NULL)
create_fullpage_directory(config.save_fullpage_path);

if (config.save_records_file_path)
save_records_file = fopen(config.save_records_file_path, "wb");

/* parse files as start/end boundaries, extract path if not specified */
if (optind < argc)
{
Expand Down Expand Up @@ -1387,6 +1422,27 @@ main(int argc, char **argv)
if (!xlogreader_state)
pg_fatal("out of memory while allocating a WAL reading processor");

if (save_records_file)
{
/*
* NEON: We dump records in the format recognized by walredo process.
* one character tag + 4 bytes length.
* If relation and block number was specified, then BeginRedoForBlock ('B') record is first
* written, containing relation info and block number. If fork is not specified, then main fork is assumed.
* Then it is followed by ApplyRecord ('A') records which specify record LSN and assembled WAL record raw data.
* Finally GetPage ('G') is written to make walredo to return image of the reconstructed page.
*/
if (config.filter_by_relation_enabled && config.filter_by_relation_block_enabled)
{
write_pq_message(save_records_file, 'B', 17);
fputc(config.filter_by_relation_forknum == InvalidForkNumber ? MAIN_FORKNUM : config.filter_by_relation_forknum, save_records_file);
write_pq_int32(save_records_file, config.filter_by_relation.spcOid);
write_pq_int32(save_records_file, config.filter_by_relation.dbOid);
write_pq_int32(save_records_file, config.filter_by_relation.relNumber);
write_pq_int32(save_records_file, config.filter_by_relation_block);
}
xlogreader_state->force_record_reassemble = true;
}
if(single_file)
{
if(config.ignore_format_errors)
Expand Down Expand Up @@ -1489,7 +1545,12 @@ main(int argc, char **argv)
else
XLogDumpDisplayRecord(&config, xlogreader_state);
}

if (save_records_file)
{
write_pq_message(save_records_file, 'A', record->xl_tot_len + sizeof(XLogRecPtr));
write_pq_int64(save_records_file, xlogreader_state->ReadRecPtr);
fwrite(xlogreader_state->readRecordBuf, record->xl_tot_len, 1, save_records_file);
}
/* save full pages if requested */
if (config.save_fullpage_path != NULL)
XLogRecordSaveFPWs(xlogreader_state, config.save_fullpage_path);
Expand All @@ -1501,6 +1562,19 @@ main(int argc, char **argv)
break;
}

if (save_records_file)
{
if (config.filter_by_relation_enabled && config.filter_by_relation_block_enabled)
{
write_pq_message(save_records_file, 'G', 17);
fputc(config.filter_by_relation_forknum == InvalidForkNumber ? MAIN_FORKNUM : config.filter_by_relation_forknum, save_records_file);
write_pq_int32(save_records_file, config.filter_by_relation.spcOid);
write_pq_int32(save_records_file, config.filter_by_relation.dbOid);
write_pq_int32(save_records_file, config.filter_by_relation.relNumber);
write_pq_int32(save_records_file, config.filter_by_relation_block);
}
fclose(save_records_file);
}
if (config.stats == true && !config.quiet)
XLogDumpDisplayStats(&config, &stats);

Expand Down
1 change: 1 addition & 0 deletions src/include/access/xlogreader.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ struct XLogReaderState
bool skip_page_validation;
bool skip_invalid_records;
bool skip_lsn_checks;
bool force_record_reassemble;

/* ----------------------------------------
* Decoded representation of current record
Expand Down