diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c index c438ae14eed..073190754f8 100644 --- a/src/backend/access/heap/rewriteheap.c +++ b/src/backend/access/heap/rewriteheap.c @@ -752,41 +752,6 @@ raw_heap_insert(RewriteState state, HeapTuple tup) * ------------------------------------------------------------------------ */ -/* - * NEON: we need to persist mapping file in WAL - */ -static void -wallog_mapping_file(char const* path, int fd) -{ - char prefix[MAXPGPATH]; - - /* Do not wallog AUX file at replica */ - if (!XLogInsertAllowed()) - return; - - snprintf(prefix, sizeof(prefix), "neon-file:%s", path); - if (fd < 0) - { - elog(DEBUG1, "neon: deleting contents of rewrite file %s", path); - /* unlink file */ - LogLogicalMessage(prefix, NULL, 0, false, false); - } - else - { - off_t size = lseek(fd, 0, SEEK_END); - char* buf; - elog(DEBUG1, "neon: writing contents of rewrite file %s, size %ld", path, (long)size); - if (size < 0) - elog(ERROR, "Failed to get size of mapping file: %m"); - buf = palloc((size_t)size); - lseek(fd, 0, SEEK_SET); - if (read(fd, buf, (size_t)size) != size) - elog(ERROR, "Failed to read mapping file: %m"); - LogLogicalMessage(prefix, buf, (size_t)size, false, false); - pfree(buf); - } -} - /* * Do preparations for logging logical mappings during a rewrite if * necessary. If we detect that we don't need to log anything we'll prevent @@ -922,7 +887,7 @@ logical_heap_rewrite_flush_mappings(RewriteState state) errmsg("could not write to file \"%s\", wrote %d of %d: %m", src->path, written, len))); src->off += len; - wallog_mapping_file(src->path, FileGetRawDesc(src->vfd)); + wallog_file_descriptor(src->path, FileGetRawDesc(src->vfd), PG_UINT64_MAX); XLogBeginInsert(); XLogRegisterData((char *) (&xlrec), sizeof(xlrec)); @@ -1173,8 +1138,6 @@ heap_xlog_logical_rewrite(XLogReaderState *r) errmsg("could not fsync file \"%s\": %m", path))); pgstat_report_wait_end(); - wallog_mapping_file(path, fd); - if (CloseTransientFile(fd) != 0) ereport(ERROR, (errcode_for_file_access(), @@ -1252,7 +1215,7 @@ CheckPointLogicalRewriteHeap(void) ereport(ERROR, (errcode_for_file_access(), errmsg("could not remove file \"%s\": %m", path))); - wallog_mapping_file(path, -1); + wallog_file_removal(path); } else { @@ -1281,7 +1244,7 @@ CheckPointLogicalRewriteHeap(void) errmsg("could not fsync file \"%s\": %m", path))); pgstat_report_wait_end(); - wallog_mapping_file(path, fd); + wallog_file_descriptor(path, fd, PG_UINT64_MAX); if (CloseTransientFile(fd) != 0) ereport(ERROR, diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index f3ec6db34a3..6356b23df33 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -7473,7 +7473,7 @@ CreateCheckPoint(int flags) SyncPreCheckpoint(); /* - * NEON: perform checkpiont action requiring write to the WAL before we determine the REDO pointer. + * NEON: perform checkpoint action requiring write to the WAL before we determine the REDO pointer. */ PreCheckPointGuts(flags); @@ -8063,7 +8063,15 @@ static void PreCheckPointGuts(int flags) { if (flags & CHECKPOINT_IS_SHUTDOWN) + { CheckPointReplicationState(flags); + /* + * pgstat_write_statsfile will be called later by before_shmem_exit() hook, but by then it's too late + * to write WAL records. In Neon, pgstat_write_statsfile() writes the pgstats file to the WAL, so we have + * to call it earlier. (The call that happens later is useless, but it doesn't do any harm either) + */ + pgstat_write_statsfile(); + } } /* diff --git a/src/backend/replication/logical/message.c b/src/backend/replication/logical/message.c index 9e41aac2813..394c482e65a 100644 --- a/src/backend/replication/logical/message.c +++ b/src/backend/replication/logical/message.c @@ -31,10 +31,14 @@ #include "postgres.h" +#include +#include + #include "access/xact.h" #include "access/xloginsert.h" #include "miscadmin.h" #include "replication/message.h" +#include "storage/fd.h" /* * Write logical decoding message into XLog. @@ -93,3 +97,86 @@ logicalmsg_redo(XLogReaderState *record) /* This is only interesting for logical decoding, see decode.c. */ } + +/* + * NEON: remove AUX object + */ +void +wallog_file_removal(char const* path) +{ + char prefix[MAXPGPATH]; + + /* Do not wallog AUX file at replica */ + if (!XLogInsertAllowed()) + return; + + snprintf(prefix, sizeof(prefix), "neon-file:%s", path); + elog(DEBUG1, "neon: deleting contents of file %s", path); + + /* unlink file */ + LogLogicalMessage(prefix, NULL, 0, false, true); +} + +/* + * NEON: persist file in WAL to save it in persistent storage. + */ +void +wallog_file_descriptor(char const* path, int fd, uint64_t limit) +{ + char prefix[MAXPGPATH]; + off_t size; + struct stat stat; + + Assert(fd >= 0); + + /* Do not wallog AUX file at replica */ + if (!XLogInsertAllowed()) + return; + + if (fstat(fd, &stat)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not stat file \"%s\": %m", path))); + size = stat.st_size; + + if (size < 0) + elog(ERROR, "Failed to get size of file %s: %m", path); + elog(DEBUG1, "neon: writing contents of file %s, size %ld", path, (long)size); + + if ((uint64_t)size > limit) + { + elog(WARNING, "Size of file %s %ld is larger than limit %ld", path, (long)size, (long)limit); + wallog_file_removal(path); + } + else + { + char* buf = palloc((size_t)size); /* palloc is not able to allocate more than MaxAllocSize (1Gb) but we don't expect such large AUX files */ + size_t offs = 0; + while (offs < size) { + ssize_t rc = pread(fd, buf + offs, (size_t)size - offs, offs); + if (rc <= 0) + elog(ERROR, "Failed to read file %s: %m", path); + offs += rc; + } + snprintf(prefix, sizeof(prefix), "neon-file:%s", path); + LogLogicalMessage(prefix, buf, (size_t)size, false, true); + pfree(buf); + } +} + +void +wallog_file(char const* path, uint64_t limit) +{ + int fd = OpenTransientFile(path, O_RDONLY | PG_BINARY); + if (fd < 0) + { + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", path))); + } + else + { + wallog_file_descriptor(path, fd, limit); + CloseTransientFile(fd); + } +} diff --git a/src/backend/utils/activity/pgstat.c b/src/backend/utils/activity/pgstat.c index 4dec9395c0b..95993c4495e 100644 --- a/src/backend/utils/activity/pgstat.c +++ b/src/backend/utils/activity/pgstat.c @@ -97,6 +97,7 @@ #include "lib/dshash.h" #include "pgstat.h" #include "port/atomics.h" +#include "replication/message.h" #include "storage/fd.h" #include "storage/ipc.h" #include "storage/lwlock.h" @@ -162,7 +163,6 @@ typedef struct PgStat_SnapshotEntry * ---------- */ -static void pgstat_write_statsfile(void); static void pgstat_read_statsfile(void); static void pgstat_reset_after_failure(void); @@ -183,7 +183,7 @@ static inline bool pgstat_is_kind_valid(int ikind); bool pgstat_track_counts = false; int pgstat_fetch_consistency = PGSTAT_FETCH_CONSISTENCY_CACHE; - +int neon_pgstat_file_size_limit; /* ---------- * state shared with pgstat_*.c @@ -1307,7 +1307,7 @@ write_chunk(FILE *fpout, void *ptr, size_t len) * This function is called in the last process that is accessing the shared * stats so locking is not required. */ -static void +void pgstat_write_statsfile(void) { FILE *fpout; @@ -1473,6 +1473,10 @@ pgstat_write_statsfile(void) tmpfile, statfile))); unlink(tmpfile); } + else if (XLogInsertAllowed() && neon_pgstat_file_size_limit != 0) + { + wallog_file(statfile, (uint64_t)neon_pgstat_file_size_limit * 1024); + } } /* helpers for pgstat_read_statsfile() */ diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 743be6fcc3e..77bbccacf5d 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -3650,6 +3650,17 @@ struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"neon_pgstat_file_size_limit", PGC_SIGHUP, STATS_CUMULATIVE, + gettext_noop("Maximal size of pg_stat file saved in Neon storage."), + NULL, + GUC_UNIT_KB + }, + &neon_pgstat_file_size_limit, + 0, 0, 1000000, /* disabled by default */ + NULL, NULL, NULL + }, + { {"gin_pending_list_limit", PGC_USERSET, CLIENT_CONN_STATEMENT, gettext_noop("Sets the maximum size of the pending list for GIN index."), diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 2136239710e..e01e58d38bd 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -469,6 +469,7 @@ extern void StatsShmemInit(void); extern void pgstat_restore_stats(void); extern void pgstat_discard_stats(void); extern void pgstat_before_server_shutdown(int code, Datum arg); +extern void pgstat_write_statsfile(void); /* Functions for backend initialization */ extern void pgstat_initialize(void); @@ -729,7 +730,7 @@ extern PgStat_WalStats *pgstat_fetch_stat_wal(void); extern PGDLLIMPORT bool pgstat_track_counts; extern PGDLLIMPORT int pgstat_track_functions; extern PGDLLIMPORT int pgstat_fetch_consistency; - +extern PGDLLIMPORT int neon_pgstat_file_size_limit; /* * Variables in pgstat_bgwriter.c diff --git a/src/include/replication/message.h b/src/include/replication/message.h index d5fb2fe0172..cd7f8163964 100644 --- a/src/include/replication/message.h +++ b/src/include/replication/message.h @@ -33,6 +33,10 @@ extern XLogRecPtr LogLogicalMessage(const char *prefix, const char *message, size_t size, bool transactional, bool flush); +extern void wallog_file(char const* path, uint64_t limit); +extern void wallog_file_descriptor(char const* path, int fd, uint64_t limit); +extern void wallog_file_removal(char const* path); + /* RMGR API */ #define XLOG_LOGICAL_MESSAGE 0x00 extern void logicalmsg_redo(XLogReaderState *record);