From 023f1020ecb07af3bb0ddbf4622e1a3c3fa276a4 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Mon, 24 Jun 2024 12:20:29 +0300 Subject: [PATCH] Persist pgstat file --- src/backend/access/heap/rewriteheap.c | 43 +----------- src/backend/access/transam/xlog.c | 14 +++- src/backend/replication/logical/message.c | 86 +++++++++++++++++++++++ src/backend/utils/activity/pgstat.c | 10 ++- src/backend/utils/misc/guc.c | 11 +++ src/include/pgstat.h | 3 +- src/include/replication/message.h | 4 ++ 7 files changed, 124 insertions(+), 47 deletions(-) diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c index 4d3591af532..e14f1c64f64 100644 --- a/src/backend/access/heap/rewriteheap.c +++ b/src/backend/access/heap/rewriteheap.c @@ -786,41 +786,6 @@ raw_heap_insert(RewriteState state, HeapTuple tup) * ------------------------------------------------------------------------ */ -/* - * NEON: we need to persist mapping file in WAL - */ -static void -wallog_mapping_file(char const* path, int fd) -{ - char prefix[MAXPGPATH]; - - /* Do not wallog AUX file at replica */ - if (!XLogInsertAllowed()) - return; - - snprintf(prefix, sizeof(prefix), "neon-file:%s", path); - if (fd < 0) - { - elog(DEBUG1, "neon: deleting contents of rewrite file %s", path); - /* unlink file */ - LogLogicalMessage(prefix, NULL, 0, false); - } - else - { - off_t size = lseek(fd, 0, SEEK_END); - char* buf; - elog(DEBUG1, "neon: writing contents of rewrite file %s, size %ld", path, (long)size); - if (size < 0) - elog(ERROR, "Failed to get size of mapping file: %m"); - buf = palloc((size_t)size); - lseek(fd, 0, SEEK_SET); - if (read(fd, buf, (size_t)size) != size) - elog(ERROR, "Failed to read mapping file: %m"); - LogLogicalMessage(prefix, buf, (size_t)size, false); - pfree(buf); - } -} - /* * Do preparations for logging logical mappings during a rewrite if * necessary. If we detect that we don't need to log anything we'll prevent @@ -956,7 +921,7 @@ logical_heap_rewrite_flush_mappings(RewriteState state) errmsg("could not write to file \"%s\", wrote %d of %d: %m", src->path, written, len))); src->off += len; - wallog_mapping_file(src->path, FileGetRawDesc(src->vfd)); + wallog_file_descriptor(src->path, FileGetRawDesc(src->vfd), PG_UINT64_MAX); XLogBeginInsert(); XLogRegisterData((char *) (&xlrec), sizeof(xlrec)); @@ -1209,8 +1174,6 @@ heap_xlog_logical_rewrite(XLogReaderState *r) errmsg("could not fsync file \"%s\": %m", path))); pgstat_report_wait_end(); - wallog_mapping_file(path, fd); - if (CloseTransientFile(fd) != 0) ereport(ERROR, (errcode_for_file_access(), @@ -1286,7 +1249,7 @@ CheckPointLogicalRewriteHeap(void) ereport(ERROR, (errcode_for_file_access(), errmsg("could not remove file \"%s\": %m", path))); - wallog_mapping_file(path, -1); + wallog_file_removal(path); } else { @@ -1315,7 +1278,7 @@ CheckPointLogicalRewriteHeap(void) errmsg("could not fsync file \"%s\": %m", path))); pgstat_report_wait_end(); - wallog_mapping_file(path, fd); + wallog_file_descriptor(path, fd, PG_UINT64_MAX); if (CloseTransientFile(fd) != 0) ereport(ERROR, diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index e0ca8da06d9..958908fb49d 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -6716,7 +6716,7 @@ CreateCheckPoint(int flags) SyncPreCheckpoint(); /* - * NEON: perform checkpiont action requiring write to the WAL before we determine the REDO pointer. + * NEON: perform checkpoint action requiring write to the WAL before we determine the REDO pointer. */ PreCheckPointGuts(flags); @@ -7235,7 +7235,7 @@ CreateOverwriteContrecordRecord(XLogRecPtr aborted_lsn, XLogRecPtr pagePtr, } static void -CheckPointReplicationState(void) +CheckPointReplicationState() { CheckPointRelationMap(); CheckPointReplicationSlots(); @@ -7247,13 +7247,21 @@ CheckPointReplicationState(void) /* * NEON: we use logical records to persist information of about slots, origins, relation map... * If it is done inside shutdown checkpoint, then Postgres panics: "concurrent write-ahead log activity while database system is shutting down" - * So it before checkpoint REDO position is determined. + * So do it before checkpoint REDO position is determined. */ static void PreCheckPointGuts(int flags) { if (flags & CHECKPOINT_IS_SHUTDOWN) + { CheckPointReplicationState(); + /* + * pgstat_write_statsfile will be called later by before_shmem_exit() hook, but by then it's too late + * to write WAL records. In Neon, pgstat_write_statsfile() writes the pgstats file to the WAL, so we have + * to call it earlier. (The call that happens later is useless, but it doesn't do any harm either) + */ + pgstat_write_statsfile(); + } } /* diff --git a/src/backend/replication/logical/message.c b/src/backend/replication/logical/message.c index 1c34912610e..5c999edff5c 100644 --- a/src/backend/replication/logical/message.c +++ b/src/backend/replication/logical/message.c @@ -31,6 +31,9 @@ #include "postgres.h" +#include +#include + #include "access/xact.h" #include "access/xloginsert.h" #include "miscadmin.h" @@ -87,3 +90,86 @@ logicalmsg_redo(XLogReaderState *record) /* This is only interesting for logical decoding, see decode.c. */ } + +/* + * NEON: remove AUX object + */ +void +wallog_file_removal(char const* path) +{ + char prefix[MAXPGPATH]; + + /* Do not wallog AUX file at replica */ + if (!XLogInsertAllowed()) + return; + + snprintf(prefix, sizeof(prefix), "neon-file:%s", path); + elog(DEBUG1, "neon: deleting contents of file %s", path); + + /* unlink file */ + XLogFlush(LogLogicalMessage(prefix, NULL, 0, false)); +} + +/* + * NEON: persist file in WAL to save it in persistent storage. + */ +void +wallog_file_descriptor(char const* path, int fd, uint64_t limit) +{ + char prefix[MAXPGPATH]; + off_t size; + struct stat stat; + + Assert(fd >= 0); + + /* Do not wallog AUX file at replica */ + if (!XLogInsertAllowed()) + return; + + if (fstat(fd, &stat)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not stat file \"%s\": %m", path))); + size = stat.st_size; + + if (size < 0) + elog(ERROR, "Failed to get size of file %s: %m", path); + elog(DEBUG1, "neon: writing contents of file %s, size %ld", path, (long)size); + + if ((uint64_t)size > limit) + { + elog(WARNING, "Size of file %s %ld is larger than limit %ld", path, (long)size, (long)limit); + wallog_file_removal(path); + } + else + { + char* buf = palloc((size_t)size); /* palloc is not able to allocate more than MaxAllocSize (1Gb) but we don't expect such large AUX files */ + size_t offs = 0; + while (offs < size) { + ssize_t rc = pread(fd, buf + offs, (size_t)size - offs, offs); + if (rc <= 0) + elog(ERROR, "Failed to read file %s: %m", path); + offs += rc; + } + snprintf(prefix, sizeof(prefix), "neon-file:%s", path); + XLogFlush(LogLogicalMessage(prefix, buf, (size_t)size, false)); + pfree(buf); + } +} + +void +wallog_file(char const* path, uint64_t limit) +{ + int fd = OpenTransientFile(path, O_RDONLY | PG_BINARY); + if (fd < 0) + { + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", path))); + } + else + { + wallog_file_descriptor(path, fd, limit); + CloseTransientFile(fd); + } +} diff --git a/src/backend/utils/activity/pgstat.c b/src/backend/utils/activity/pgstat.c index cd83c48d1da..10ee77c66ed 100644 --- a/src/backend/utils/activity/pgstat.c +++ b/src/backend/utils/activity/pgstat.c @@ -97,6 +97,7 @@ #include "lib/dshash.h" #include "pgstat.h" #include "port/atomics.h" +#include "replication/message.h" #include "storage/fd.h" #include "storage/ipc.h" #include "storage/lwlock.h" @@ -164,7 +165,6 @@ typedef struct PgStat_SnapshotEntry * ---------- */ -static void pgstat_write_statsfile(void); static void pgstat_read_statsfile(void); static void pgstat_reset_after_failure(void); @@ -185,7 +185,7 @@ static inline bool pgstat_is_kind_valid(int ikind); bool pgstat_track_counts = false; int pgstat_fetch_consistency = PGSTAT_FETCH_CONSISTENCY_CACHE; - +int neon_pgstat_file_size_limit; /* ---------- * state shared with pgstat_*.c @@ -1284,7 +1284,7 @@ write_chunk(FILE *fpout, void *ptr, size_t len) * This function is called in the last process that is accessing the shared * stats so locking is not required. */ -static void +void pgstat_write_statsfile(void) { FILE *fpout; @@ -1444,6 +1444,10 @@ pgstat_write_statsfile(void) tmpfile, statfile))); unlink(tmpfile); } + else if (XLogInsertAllowed() && neon_pgstat_file_size_limit != 0) + { + wallog_file(statfile, (uint64_t)neon_pgstat_file_size_limit * 1024); + } } /* helpers for pgstat_read_statsfile() */ diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 161bb35fa51..f18f6e11ba1 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -3662,6 +3662,17 @@ static struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"neon_pgstat_file_size_limit", PGC_SIGHUP, STATS_CUMULATIVE, + gettext_noop("Maximal size of pg_stat file saved in Neon storage."), + NULL, + GUC_UNIT_KB + }, + &neon_pgstat_file_size_limit, + 0, 0, 1000000, /* disabled by default */ + NULL, NULL, NULL + }, + { {"gin_pending_list_limit", PGC_USERSET, CLIENT_CONN_STATEMENT, gettext_noop("Sets the maximum size of the pending list for GIN index."), diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 44279ea3d70..ab8bb47cf1e 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -412,6 +412,7 @@ extern void StatsShmemInit(void); extern void pgstat_restore_stats(void); extern void pgstat_discard_stats(void); extern void pgstat_before_server_shutdown(int code, Datum arg); +extern void pgstat_write_statsfile(void); /* Functions for backend initialization */ extern void pgstat_initialize(void); @@ -650,7 +651,7 @@ extern PgStat_WalStats *pgstat_fetch_stat_wal(void); extern PGDLLIMPORT bool pgstat_track_counts; extern PGDLLIMPORT int pgstat_track_functions; extern PGDLLIMPORT int pgstat_fetch_consistency; - +extern PGDLLIMPORT int neon_pgstat_file_size_limit; /* * Variables in pgstat_bgwriter.c diff --git a/src/include/replication/message.h b/src/include/replication/message.h index 0b396c56693..ed4db52dffd 100644 --- a/src/include/replication/message.h +++ b/src/include/replication/message.h @@ -32,6 +32,10 @@ typedef struct xl_logical_message extern XLogRecPtr LogLogicalMessage(const char *prefix, const char *message, size_t size, bool transactional); +extern void wallog_file(char const* path, uint64_t limit); +extern void wallog_file_descriptor(char const* path, int fd, uint64_t limit); +extern void wallog_file_removal(char const* path); + /* RMGR API */ #define XLOG_LOGICAL_MESSAGE 0x00 extern void logicalmsg_redo(XLogReaderState *record);