Skip to content

Commit

Permalink
Use larger segment file names for pg_notify
Browse files Browse the repository at this point in the history
This avoids the wraparound in async.c and removes the corresponding code
complexity. The maximum amount of allocated SLRU pages for NOTIFY / LISTEN
queue is now determined by the max_notify_queue_pages GUC. The default
value is 1048576. It allows to consume up to 8 GB of disk space which is
exactly the limit we had previously.

Author: Maxim Orlov, Aleksander Alekseev, Alexander Korotkov, Teodor Sigaev
Author: Nikita Glukhov, Pavel Borisov, Yura Sokolov
Reviewed-by: Jacob Champion, Heikki Linnakangas, Alexander Korotkov
Reviewed-by: Japin Li, Pavel Borisov, Tom Lane, Peter Eisentraut, Andres Freund
Reviewed-by: Andrey Borodin, Dilip Kumar, Aleksander Alekseev
Discussion: https://postgr.es/m/CACG%3DezZe1NQSCnfHOr78AtAZxJZeCvxrts0ygrxYwe%3DpyyjVWA%40mail.gmail.com
Discussion: https://postgr.es/m/CAJ7c6TPDOYBYrnCAeyndkBktO0WG2xSdYduTF0nxq%2BvfkmTF5Q%40mail.gmail.com
  • Loading branch information
akorotkov committed Nov 28, 2023
1 parent 4ed8f09 commit 2cdf131
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 92 deletions.
16 changes: 16 additions & 0 deletions doc/src/sgml/config.sgml
Expand Up @@ -2151,6 +2151,22 @@ include_dir 'conf.d'
</listitem>
</varlistentry>

<varlistentry id="guc-max-notify-queue-pages" xreflabel="max_notify_queue_pages">
<term><varname>max_notify_queue_pages</varname> (<type>integer</type>)
<indexterm>
<primary><varname>max_notify_queue_pages</varname> configuration parameter</primary>
</indexterm>
</term>
<listitem>
<para>
Specifies the maximum amount of allocated pages for
<xref linkend="sql-notify"/> / <xref linkend="sql-listen"/> queue.
The default value is 1048576. For 8 KB pages it allows to consume
up to 8 GB of disk space.
</para>
</listitem>
</varlistentry>

</variablelist>
</sect2>

Expand Down
1 change: 1 addition & 0 deletions doc/src/sgml/ref/listen.sgml
Expand Up @@ -148,6 +148,7 @@ Asynchronous notification "virtual" received from server process with PID 8448.
<simplelist type="inline">
<member><xref linkend="sql-notify"/></member>
<member><xref linkend="sql-unlisten"/></member>
<member><xref linkend="guc-max-notify-queue-pages"/></member>
</simplelist>
</refsect1>
</refentry>
1 change: 1 addition & 0 deletions doc/src/sgml/ref/notify.sgml
Expand Up @@ -228,6 +228,7 @@ Asynchronous notification "foo" with payload "payload" received from server proc
<simplelist type="inline">
<member><xref linkend="sql-listen"/></member>
<member><xref linkend="sql-unlisten"/></member>
<member><xref linkend="guc-max-notify-queue-pages"/></member>
</simplelist>
</refsect1>
</refentry>
122 changes: 30 additions & 92 deletions src/backend/commands/async.c
Expand Up @@ -103,12 +103,11 @@
* until we reach either a notification from an uncommitted transaction or
* the head pointer's position.
*
* 6. To avoid SLRU wraparound and limit disk space consumption, the tail
* pointer needs to be advanced so that old pages can be truncated.
* This is relatively expensive (notably, it requires an exclusive lock),
* so we don't want to do it often. We make sending backends do this work
* if they advanced the queue head into a new page, but only once every
* QUEUE_CLEANUP_DELAY pages.
* 6. To limit disk space consumption, the tail pointer needs to be advanced
* so that old pages can be truncated. This is relatively expensive
* (notably, it requires an exclusive lock), so we don't want to do it
* often. We make sending backends do this work if they advanced the queue
* head into a new page, but only once every QUEUE_CLEANUP_DELAY pages.
*
* An application that listens on the same channel it notifies will get
* NOTIFY messages for its own NOTIFYs. These can be ignored, if not useful,
Expand All @@ -120,7 +119,7 @@
* The amount of shared memory used for notify management (NUM_NOTIFY_BUFFERS)
* can be varied without affecting anything but performance. The maximum
* amount of notification data that can be queued at one time is determined
* by slru.c's wraparound limit; see QUEUE_MAX_PAGE below.
* by max_notify_queue_pages GUC.
*-------------------------------------------------------------------------
*/

Expand Down Expand Up @@ -312,23 +311,8 @@ static SlruCtlData NotifyCtlData;

#define NotifyCtl (&NotifyCtlData)
#define QUEUE_PAGESIZE BLCKSZ
#define QUEUE_FULL_WARN_INTERVAL 5000 /* warn at most once every 5s */

/*
* Use segments 0000 through FFFF. Each contains SLRU_PAGES_PER_SEGMENT pages
* which gives us the pages from 0 to SLRU_PAGES_PER_SEGMENT * 0x10000 - 1.
* We could use as many segments as SlruScanDirectory() allows, but this gives
* us so much space already that it doesn't seem worth the trouble.
*
* The most data we can have in the queue at a time is QUEUE_MAX_PAGE/2
* pages, because more than that would confuse slru.c into thinking there
* was a wraparound condition. With the default BLCKSZ this means there
* can be up to 8GB of queued-and-not-read data.
*
* Note: it's possible to redefine QUEUE_MAX_PAGE with a smaller multiple of
* SLRU_PAGES_PER_SEGMENT, for easier testing of queue-full behaviour.
*/
#define QUEUE_MAX_PAGE (SLRU_PAGES_PER_SEGMENT * 0x10000 - 1)
#define QUEUE_FULL_WARN_INTERVAL 5000 /* warn at most once every 5s */

/*
* listenChannels identifies the channels we are actually listening to
Expand Down Expand Up @@ -439,12 +423,15 @@ static bool amRegisteredListener = false;
/* have we advanced to a page that's a multiple of QUEUE_CLEANUP_DELAY? */
static bool tryAdvanceTail = false;

/* GUC parameter */
/* GUC parameters */
bool Trace_notify = false;

/* For 8 KB pages this gives 8 GB of disk space */
int max_notify_queue_pages = 1048576;

/* local function prototypes */
static int64 asyncQueuePageDiff(int64 p, int64 q);
static bool asyncQueuePagePrecedes(int64 p, int64 q);
static inline int64 asyncQueuePageDiff(int64 p, int64 q);
static inline bool asyncQueuePagePrecedes(int64 p, int64 q);
static void queue_listen(ListenActionKind action, const char *channel);
static void Async_UnlistenOnExit(int code, Datum arg);
static void Exec_ListenPreCommit(void);
Expand Down Expand Up @@ -474,39 +461,23 @@ static int notification_match(const void *key1, const void *key2, Size keysize);
static void ClearPendingActionsAndNotifies(void);

/*
* Compute the difference between two queue page numbers (i.e., p - q),
* accounting for wraparound.
* Compute the difference between two queue page numbers.
* Previously this function accounted for a wraparound.
*/
static int64
static inline int64
asyncQueuePageDiff(int64 p, int64 q)
{
int64 diff;

/*
* We have to compare modulo (QUEUE_MAX_PAGE+1)/2. Both inputs should be
* in the range 0..QUEUE_MAX_PAGE.
*/
Assert(p >= 0 && p <= QUEUE_MAX_PAGE);
Assert(q >= 0 && q <= QUEUE_MAX_PAGE);

diff = p - q;
if (diff >= ((QUEUE_MAX_PAGE + 1) / 2))
diff -= QUEUE_MAX_PAGE + 1;
else if (diff < -((QUEUE_MAX_PAGE + 1) / 2))
diff += QUEUE_MAX_PAGE + 1;
return diff;
return p - q;
}

/*
* Is p < q, accounting for wraparound?
*
* Since asyncQueueIsFull() blocks creation of a page that could precede any
* extant page, we need not assess entries within a page.
* Determines whether p precedes q.
* Previously this function accounted for a wraparound.
*/
static bool
static inline bool
asyncQueuePagePrecedes(int64 p, int64 q)
{
return asyncQueuePageDiff(p, q) < 0;
return p < q;
}

/*
Expand Down Expand Up @@ -566,12 +537,13 @@ AsyncShmemInit(void)
}

/*
* Set up SLRU management of the pg_notify data.
* Set up SLRU management of the pg_notify data. Note that long segment
* names are used in order to avoid wraparound.
*/
NotifyCtl->PagePrecedes = asyncQueuePagePrecedes;
SimpleLruInit(NotifyCtl, "Notify", NUM_NOTIFY_BUFFERS, 0,
NotifySLRULock, "pg_notify", LWTRANCHE_NOTIFY_BUFFER,
SYNC_HANDLER_NONE, false);
SYNC_HANDLER_NONE, true);

if (!found)
{
Expand Down Expand Up @@ -1305,27 +1277,11 @@ asyncQueueUnregister(void)
static bool
asyncQueueIsFull(void)
{
int nexthead;
int boundary;
int headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
int tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
int occupied = headPage - tailPage;

/*
* The queue is full if creating a new head page would create a page that
* logically precedes the current global tail pointer, ie, the head
* pointer would wrap around compared to the tail. We cannot create such
* a head page for fear of confusing slru.c. For safety we round the tail
* pointer back to a segment boundary (truncation logic in
* asyncQueueAdvanceTail does not do this, so doing it here is optional).
*
* Note that this test is *not* dependent on how much space there is on
* the current head page. This is necessary because asyncQueueAddEntries
* might try to create the next head page in any case.
*/
nexthead = QUEUE_POS_PAGE(QUEUE_HEAD) + 1;
if (nexthead > QUEUE_MAX_PAGE)
nexthead = 0; /* wrap around */
boundary = QUEUE_STOP_PAGE;
boundary -= boundary % SLRU_PAGES_PER_SEGMENT;
return asyncQueuePagePrecedes(nexthead, boundary);
return occupied >= max_notify_queue_pages;
}

/*
Expand Down Expand Up @@ -1355,8 +1311,6 @@ asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
if (offset + QUEUEALIGN(AsyncQueueEntryEmptySize) > QUEUE_PAGESIZE)
{
pageno++;
if (pageno > QUEUE_MAX_PAGE)
pageno = 0; /* wrap around */
offset = 0;
pageJump = true;
}
Expand Down Expand Up @@ -1433,9 +1387,6 @@ asyncQueueAddEntries(ListCell *nextNotify)
* If this is the first write since the postmaster started, we need to
* initialize the first page of the async SLRU. Otherwise, the current
* page should be initialized already, so just fetch it.
*
* (We could also take the first path when the SLRU position has just
* wrapped around, but re-zeroing the page is harmless in that case.)
*/
pageno = QUEUE_POS_PAGE(queue_head);
if (QUEUE_POS_IS_ZERO(queue_head))
Expand Down Expand Up @@ -1548,20 +1499,12 @@ asyncQueueUsage(void)
{
int headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
int tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
int occupied;

occupied = headPage - tailPage;
int occupied = headPage - tailPage;

if (occupied == 0)
return (double) 0; /* fast exit for common case */

if (occupied < 0)
{
/* head has wrapped around, tail not yet */
occupied += QUEUE_MAX_PAGE + 1;
}

return (double) occupied / (double) ((QUEUE_MAX_PAGE + 1) / 2);
return (double) occupied / (double) max_notify_queue_pages;
}

/*
Expand Down Expand Up @@ -2209,11 +2152,6 @@ asyncQueueAdvanceTail(void)
*/
SimpleLruTruncate(NotifyCtl, newtailpage);

/*
* Update QUEUE_STOP_PAGE. This changes asyncQueueIsFull()'s verdict
* for the segment immediately prior to the old tail, allowing fresh
* data into that segment.
*/
LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
QUEUE_STOP_PAGE = newtailpage;
LWLockRelease(NotifyQueueLock);
Expand Down
10 changes: 10 additions & 0 deletions src/backend/utils/misc/guc_tables.c
Expand Up @@ -2687,6 +2687,16 @@ struct config_int ConfigureNamesInt[] =
NULL, NULL, NULL
},

{
{"max_notify_queue_pages", PGC_POSTMASTER, RESOURCES_DISK,
gettext_noop("Sets the maximum number of allocated pages for NOTIFY / LISTEN queue."),
NULL,
},
&max_notify_queue_pages,
1048576, 64, INT_MAX,
NULL, NULL, NULL
},

{
{"wal_decode_buffer_size", PGC_POSTMASTER, WAL_RECOVERY,
gettext_noop("Buffer size for reading ahead in the WAL during recovery."),
Expand Down
3 changes: 3 additions & 0 deletions src/backend/utils/misc/postgresql.conf.sample
Expand Up @@ -166,6 +166,9 @@
#temp_file_limit = -1 # limits per-process temp file space
# in kilobytes, or -1 for no limit

#max_notify_queue_pages = 1048576 # limits the number of SLRU pages allocated
# for NOTIFY / LISTEN queue

# - Kernel Resources -

#max_files_per_process = 1000 # min 64
Expand Down
1 change: 1 addition & 0 deletions src/include/commands/async.h
Expand Up @@ -21,6 +21,7 @@
#define NUM_NOTIFY_BUFFERS 8

extern PGDLLIMPORT bool Trace_notify;
extern PGDLLIMPORT int max_notify_queue_pages;
extern PGDLLIMPORT volatile sig_atomic_t notifyInterruptPending;

extern Size AsyncShmemSize(void);
Expand Down

0 comments on commit 2cdf131

Please sign in to comment.