Skip to content

Commit 75e82b2

Browse files
committed
Optimize shared memory usage for WaitLSNProcInfo
We need separate pairing heaps for different WaitLSNType's, because there might be waiters for different LSN's at the same time. However, one process can wait only for one type of LSN at a time. So, no need for inHeap and heapNode fields to be arrays. Discussion: https://postgr.es/m/CAPpHfdsBR-7sDtXFJ1qpJtKiohfGoj%3DvqzKVjWxtWsWidx7G_A%40mail.gmail.com Author: Alexander Korotkov <aekorotkov@gmail.com> Reviewed-by: Xuneng Zhou <xunengzhou@gmail.com>
1 parent 694b4ab commit 75e82b2

File tree

2 files changed

+29
-27
lines changed

2 files changed

+29
-27
lines changed

src/backend/access/transam/xlogwait.c

Lines changed: 19 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ WaitLSNShmemInit(void)
9090
for (i = 0; i < WAIT_LSN_TYPE_COUNT; i++)
9191
{
9292
pg_atomic_init_u64(&waitLSNState->minWaitedLSN[i], PG_UINT64_MAX);
93-
pairingheap_initialize(&waitLSNState->waitersHeap[i], waitlsn_cmp, (void *) (uintptr_t) i);
93+
pairingheap_initialize(&waitLSNState->waitersHeap[i], waitlsn_cmp, NULL);
9494
}
9595

9696
/* Initialize process info array */
@@ -106,9 +106,8 @@ WaitLSNShmemInit(void)
106106
static int
107107
waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
108108
{
109-
int i = (uintptr_t) arg;
110-
const WaitLSNProcInfo *aproc = pairingheap_const_container(WaitLSNProcInfo, heapNode[i], a);
111-
const WaitLSNProcInfo *bproc = pairingheap_const_container(WaitLSNProcInfo, heapNode[i], b);
109+
const WaitLSNProcInfo *aproc = pairingheap_const_container(WaitLSNProcInfo, heapNode, a);
110+
const WaitLSNProcInfo *bproc = pairingheap_const_container(WaitLSNProcInfo, heapNode, b);
112111

113112
if (aproc->waitLSN < bproc->waitLSN)
114113
return 1;
@@ -132,7 +131,7 @@ updateMinWaitedLSN(WaitLSNType lsnType)
132131
if (!pairingheap_is_empty(&waitLSNState->waitersHeap[i]))
133132
{
134133
pairingheap_node *node = pairingheap_first(&waitLSNState->waitersHeap[i]);
135-
WaitLSNProcInfo *procInfo = pairingheap_container(WaitLSNProcInfo, heapNode[i], node);
134+
WaitLSNProcInfo *procInfo = pairingheap_container(WaitLSNProcInfo, heapNode, node);
136135

137136
minWaitedLSN = procInfo->waitLSN;
138137
}
@@ -154,10 +153,11 @@ addLSNWaiter(XLogRecPtr lsn, WaitLSNType lsnType)
154153

155154
procInfo->procno = MyProcNumber;
156155
procInfo->waitLSN = lsn;
156+
procInfo->lsnType = lsnType;
157157

158-
Assert(!procInfo->inHeap[i]);
159-
pairingheap_add(&waitLSNState->waitersHeap[i], &procInfo->heapNode[i]);
160-
procInfo->inHeap[i] = true;
158+
Assert(!procInfo->inHeap);
159+
pairingheap_add(&waitLSNState->waitersHeap[i], &procInfo->heapNode);
160+
procInfo->inHeap = true;
161161
updateMinWaitedLSN(lsnType);
162162

163163
LWLockRelease(WaitLSNLock);
@@ -176,10 +176,12 @@ deleteLSNWaiter(WaitLSNType lsnType)
176176

177177
LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
178178

179-
if (procInfo->inHeap[i])
179+
Assert(procInfo->lsnType == lsnType);
180+
181+
if (procInfo->inHeap)
180182
{
181-
pairingheap_remove(&waitLSNState->waitersHeap[i], &procInfo->heapNode[i]);
182-
procInfo->inHeap[i] = false;
183+
pairingheap_remove(&waitLSNState->waitersHeap[i], &procInfo->heapNode);
184+
procInfo->inHeap = false;
183185
updateMinWaitedLSN(lsnType);
184186
}
185187

@@ -228,7 +230,7 @@ wakeupWaiters(WaitLSNType lsnType, XLogRecPtr currentLSN)
228230
WaitLSNProcInfo *procInfo;
229231

230232
/* Get procInfo using appropriate heap node */
231-
procInfo = pairingheap_container(WaitLSNProcInfo, heapNode[i], node);
233+
procInfo = pairingheap_container(WaitLSNProcInfo, heapNode, node);
232234

233235
if (XLogRecPtrIsValid(currentLSN) && procInfo->waitLSN > currentLSN)
234236
break;
@@ -238,7 +240,7 @@ wakeupWaiters(WaitLSNType lsnType, XLogRecPtr currentLSN)
238240
(void) pairingheap_remove_first(&waitLSNState->waitersHeap[i]);
239241

240242
/* Update appropriate flag */
241-
procInfo->inHeap[i] = false;
243+
procInfo->inHeap = false;
242244

243245
if (numWakeUpProcs == WAKEUP_PROC_STATIC_ARRAY_SIZE)
244246
break;
@@ -289,20 +291,14 @@ WaitLSNCleanup(void)
289291
{
290292
if (waitLSNState)
291293
{
292-
int i;
293-
294294
/*
295-
* We do a fast-path check of the heap flags without the lock. These
296-
* flags are set to true only by the process itself. So, it's only
295+
* We do a fast-path check of the inHeap flag without the lock. This
296+
* flag is set to true only by the process itself. So, it's only
297297
* possible to get a false positive. But that will be eliminated by a
298298
* recheck inside deleteLSNWaiter().
299299
*/
300-
301-
for (i = 0; i < (int) WAIT_LSN_TYPE_COUNT; i++)
302-
{
303-
if (waitLSNState->procInfos[MyProcNumber].inHeap[i])
304-
deleteLSNWaiter((WaitLSNType) i);
305-
}
300+
if (waitLSNState->procInfos[MyProcNumber].inHeap)
301+
deleteLSNWaiter(waitLSNState->procInfos[MyProcNumber].lsnType);
306302
}
307303
}
308304

src/include/access/xlogwait.h

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,14 +50,20 @@ typedef struct WaitLSNProcInfo
5050
/* LSN, which this process is waiting for */
5151
XLogRecPtr waitLSN;
5252

53+
/* The type of LSN to wait */
54+
WaitLSNType lsnType;
55+
5356
/* Process to wake up once the waitLSN is reached */
5457
ProcNumber procno;
5558

56-
/* Heap membership flags for LSN types */
57-
bool inHeap[WAIT_LSN_TYPE_COUNT];
59+
/*
60+
* Heap membership flag. A process can wait for only one LSN type at a
61+
* time, so a single flag suffices (tracked by the lsnType field).
62+
*/
63+
bool inHeap;
5864

59-
/* Heap nodes for LSN types */
60-
pairingheap_node heapNode[WAIT_LSN_TYPE_COUNT];
65+
/* Pairing heap node for the waiters' heap (one per process) */
66+
pairingheap_node heapNode;
6167
} WaitLSNProcInfo;
6268

6369
/*

0 commit comments

Comments
 (0)