Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 3 additions & 40 deletions src/backend/access/heap/rewriteheap.c
Original file line number Diff line number Diff line change
Expand Up @@ -752,41 +752,6 @@ raw_heap_insert(RewriteState state, HeapTuple tup)
* ------------------------------------------------------------------------
*/

/*
* NEON: we need to persist mapping file in WAL
*/
static void
wallog_mapping_file(char const* path, int fd)
{
char prefix[MAXPGPATH];

/* Do not wallog AUX file at replica */
if (!XLogInsertAllowed())
return;

snprintf(prefix, sizeof(prefix), "neon-file:%s", path);
if (fd < 0)
{
elog(DEBUG1, "neon: deleting contents of rewrite file %s", path);
/* unlink file */
LogLogicalMessage(prefix, NULL, 0, false, false);
}
else
{
off_t size = lseek(fd, 0, SEEK_END);
char* buf;
elog(DEBUG1, "neon: writing contents of rewrite file %s, size %ld", path, (long)size);
if (size < 0)
elog(ERROR, "Failed to get size of mapping file: %m");
buf = palloc((size_t)size);
lseek(fd, 0, SEEK_SET);
if (read(fd, buf, (size_t)size) != size)
elog(ERROR, "Failed to read mapping file: %m");
LogLogicalMessage(prefix, buf, (size_t)size, false, false);
pfree(buf);
}
}

/*
* Do preparations for logging logical mappings during a rewrite if
* necessary. If we detect that we don't need to log anything we'll prevent
Expand Down Expand Up @@ -922,7 +887,7 @@ logical_heap_rewrite_flush_mappings(RewriteState state)
errmsg("could not write to file \"%s\", wrote %d of %d: %m", src->path,
written, len)));
src->off += len;
wallog_mapping_file(src->path, FileGetRawDesc(src->vfd));
wallog_file_descriptor(src->path, FileGetRawDesc(src->vfd), PG_UINT64_MAX);

XLogBeginInsert();
XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
Expand Down Expand Up @@ -1173,8 +1138,6 @@ heap_xlog_logical_rewrite(XLogReaderState *r)
errmsg("could not fsync file \"%s\": %m", path)));
pgstat_report_wait_end();

wallog_mapping_file(path, fd);

if (CloseTransientFile(fd) != 0)
ereport(ERROR,
(errcode_for_file_access(),
Expand Down Expand Up @@ -1252,7 +1215,7 @@ CheckPointLogicalRewriteHeap(void)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not remove file \"%s\": %m", path)));
wallog_mapping_file(path, -1);
wallog_file_removal(path);
}
else
{
Expand Down Expand Up @@ -1281,7 +1244,7 @@ CheckPointLogicalRewriteHeap(void)
errmsg("could not fsync file \"%s\": %m", path)));
pgstat_report_wait_end();

wallog_mapping_file(path, fd);
wallog_file_descriptor(path, fd, PG_UINT64_MAX);

if (CloseTransientFile(fd) != 0)
ereport(ERROR,
Expand Down
10 changes: 9 additions & 1 deletion src/backend/access/transam/xlog.c
Original file line number Diff line number Diff line change
Expand Up @@ -7473,7 +7473,7 @@ CreateCheckPoint(int flags)
SyncPreCheckpoint();

/*
* NEON: perform checkpiont action requiring write to the WAL before we determine the REDO pointer.
* NEON: perform checkpoint action requiring write to the WAL before we determine the REDO pointer.
*/
PreCheckPointGuts(flags);

Expand Down Expand Up @@ -8063,7 +8063,15 @@ static void
PreCheckPointGuts(int flags)
{
if (flags & CHECKPOINT_IS_SHUTDOWN)
{
CheckPointReplicationState(flags);
/*
* pgstat_write_statsfile will be called later by before_shmem_exit() hook, but by then it's too late
* to write WAL records. In Neon, pgstat_write_statsfile() writes the pgstats file to the WAL, so we have
* to call it earlier. (The call that happens later is useless, but it doesn't do any harm either)
*/
pgstat_write_statsfile();
}
}

/*
Expand Down
87 changes: 87 additions & 0 deletions src/backend/replication/logical/message.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,14 @@

#include "postgres.h"

#include <unistd.h>
#include <sys/stat.h>

#include "access/xact.h"
#include "access/xloginsert.h"
#include "miscadmin.h"
#include "replication/message.h"
#include "storage/fd.h"

/*
* Write logical decoding message into XLog.
Expand Down Expand Up @@ -93,3 +97,86 @@ logicalmsg_redo(XLogReaderState *record)

/* This is only interesting for logical decoding, see decode.c. */
}

/*
* NEON: remove AUX object
*/
void
wallog_file_removal(char const* path)
{
char prefix[MAXPGPATH];

/* Do not wallog AUX file at replica */
if (!XLogInsertAllowed())
return;

snprintf(prefix, sizeof(prefix), "neon-file:%s", path);
elog(DEBUG1, "neon: deleting contents of file %s", path);

/* unlink file */
LogLogicalMessage(prefix, NULL, 0, false, true);
}

/*
* NEON: persist file in WAL to save it in persistent storage.
*/
void
wallog_file_descriptor(char const* path, int fd, uint64_t limit)
{
char prefix[MAXPGPATH];
off_t size;
struct stat stat;

Assert(fd >= 0);

/* Do not wallog AUX file at replica */
if (!XLogInsertAllowed())
return;

if (fstat(fd, &stat))
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not stat file \"%s\": %m", path)));
size = stat.st_size;

if (size < 0)
elog(ERROR, "Failed to get size of file %s: %m", path);
elog(DEBUG1, "neon: writing contents of file %s, size %ld", path, (long)size);

if ((uint64_t)size > limit)
{
elog(WARNING, "Size of file %s %ld is larger than limit %ld", path, (long)size, (long)limit);
wallog_file_removal(path);
}
else
{
char* buf = palloc((size_t)size); /* palloc is not able to allocate more than MaxAllocSize (1Gb) but we don't expect such large AUX files */
size_t offs = 0;
while (offs < size) {
ssize_t rc = pread(fd, buf + offs, (size_t)size - offs, offs);
if (rc <= 0)
elog(ERROR, "Failed to read file %s: %m", path);
offs += rc;
}
snprintf(prefix, sizeof(prefix), "neon-file:%s", path);
LogLogicalMessage(prefix, buf, (size_t)size, false, true);
pfree(buf);
}
}

void
wallog_file(char const* path, uint64_t limit)
{
int fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
if (fd < 0)
{
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not open file \"%s\": %m", path)));
}
else
{
wallog_file_descriptor(path, fd, limit);
CloseTransientFile(fd);
}
}
10 changes: 7 additions & 3 deletions src/backend/utils/activity/pgstat.c
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
#include "lib/dshash.h"
#include "pgstat.h"
#include "port/atomics.h"
#include "replication/message.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/lwlock.h"
Expand Down Expand Up @@ -162,7 +163,6 @@ typedef struct PgStat_SnapshotEntry
* ----------
*/

static void pgstat_write_statsfile(void);
static void pgstat_read_statsfile(void);

static void pgstat_reset_after_failure(void);
Expand All @@ -183,7 +183,7 @@ static inline bool pgstat_is_kind_valid(int ikind);

bool pgstat_track_counts = false;
int pgstat_fetch_consistency = PGSTAT_FETCH_CONSISTENCY_CACHE;

int neon_pgstat_file_size_limit;

/* ----------
* state shared with pgstat_*.c
Expand Down Expand Up @@ -1307,7 +1307,7 @@ write_chunk(FILE *fpout, void *ptr, size_t len)
* This function is called in the last process that is accessing the shared
* stats so locking is not required.
*/
static void
void
pgstat_write_statsfile(void)
{
FILE *fpout;
Expand Down Expand Up @@ -1473,6 +1473,10 @@ pgstat_write_statsfile(void)
tmpfile, statfile)));
unlink(tmpfile);
}
else if (XLogInsertAllowed() && neon_pgstat_file_size_limit != 0)
{
wallog_file(statfile, (uint64_t)neon_pgstat_file_size_limit * 1024);
}
}

/* helpers for pgstat_read_statsfile() */
Expand Down
11 changes: 11 additions & 0 deletions src/backend/utils/misc/guc_tables.c
Original file line number Diff line number Diff line change
Expand Up @@ -3650,6 +3650,17 @@ struct config_int ConfigureNamesInt[] =
NULL, NULL, NULL
},

{
{"neon_pgstat_file_size_limit", PGC_SIGHUP, STATS_CUMULATIVE,
gettext_noop("Maximal size of pg_stat file saved in Neon storage."),
NULL,
GUC_UNIT_KB
},
&neon_pgstat_file_size_limit,
0, 0, 1000000, /* disabled by default */
NULL, NULL, NULL
},

{
{"gin_pending_list_limit", PGC_USERSET, CLIENT_CONN_STATEMENT,
gettext_noop("Sets the maximum size of the pending list for GIN index."),
Expand Down
3 changes: 2 additions & 1 deletion src/include/pgstat.h
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,7 @@ extern void StatsShmemInit(void);
extern void pgstat_restore_stats(void);
extern void pgstat_discard_stats(void);
extern void pgstat_before_server_shutdown(int code, Datum arg);
extern void pgstat_write_statsfile(void);

/* Functions for backend initialization */
extern void pgstat_initialize(void);
Expand Down Expand Up @@ -729,7 +730,7 @@ extern PgStat_WalStats *pgstat_fetch_stat_wal(void);
extern PGDLLIMPORT bool pgstat_track_counts;
extern PGDLLIMPORT int pgstat_track_functions;
extern PGDLLIMPORT int pgstat_fetch_consistency;

extern PGDLLIMPORT int neon_pgstat_file_size_limit;

/*
* Variables in pgstat_bgwriter.c
Expand Down
4 changes: 4 additions & 0 deletions src/include/replication/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ extern XLogRecPtr LogLogicalMessage(const char *prefix, const char *message,
size_t size, bool transactional,
bool flush);

extern void wallog_file(char const* path, uint64_t limit);
extern void wallog_file_descriptor(char const* path, int fd, uint64_t limit);
extern void wallog_file_removal(char const* path);

/* RMGR API */
#define XLOG_LOGICAL_MESSAGE 0x00
extern void logicalmsg_redo(XLogReaderState *record);
Expand Down