Skip to content
Permalink
Browse files
Improve and cleanup ProcArrayAdd(), ProcArrayRemove().
941697c changed ProcArrayAdd()/Remove() substantially. As reported by
zhanyi, I missed that due to the introduction of the PGPROC->pgxactoff
ProcArrayRemove() does not need to search for the position in
procArray->pgprocnos anymore - that's pgxactoff. Remove the search loop.

The removal of the search loop reduces assertion coverage a bit - but we can
easily do better than before by adding assertions to other loops over
pgprocnos elements.

Also do a bit of cleanup, mostly by reducing line lengths by introducing local
helper variables and adding newlines.

Author: zhanyi <w@hidva.com>
Author: Andres Freund <andres@anarazel.de>
Discussion: https://postgr.es/m/tencent_5624AA3B116B3D1C31CA9744@qq.com
  • Loading branch information
anarazel committed Jun 12, 2021
1 parent d120e66 commit d8e950d3ae7b33a2064a4fb39b7829334b0b47bc
Showing 1 changed file with 78 additions and 55 deletions.
@@ -446,6 +446,7 @@ ProcArrayAdd(PGPROC *proc)
{
ProcArrayStruct *arrayP = procArray;
int index;
int movecount;

/* See ProcGlobal comment explaining why both locks are held */
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
@@ -474,33 +475,48 @@ ProcArrayAdd(PGPROC *proc)
*/
for (index = 0; index < arrayP->numProcs; index++)
{
/*
* If we are the first PGPROC or if we have found our right position
* in the array, break
*/
if ((arrayP->pgprocnos[index] == -1) || (arrayP->pgprocnos[index] > proc->pgprocno))
int procno PG_USED_FOR_ASSERTS_ONLY = arrayP->pgprocnos[index];

Assert(procno >= 0 && procno < (arrayP->maxProcs + NUM_AUXILIARY_PROCS));
Assert(allProcs[procno].pgxactoff == index);

/* If we have found our right position in the array, break */
if (arrayP->pgprocnos[index] > proc->pgprocno)
break;
}

memmove(&arrayP->pgprocnos[index + 1], &arrayP->pgprocnos[index],
(arrayP->numProcs - index) * sizeof(*arrayP->pgprocnos));
memmove(&ProcGlobal->xids[index + 1], &ProcGlobal->xids[index],
(arrayP->numProcs - index) * sizeof(*ProcGlobal->xids));
memmove(&ProcGlobal->subxidStates[index + 1], &ProcGlobal->subxidStates[index],
(arrayP->numProcs - index) * sizeof(*ProcGlobal->subxidStates));
memmove(&ProcGlobal->statusFlags[index + 1], &ProcGlobal->statusFlags[index],
(arrayP->numProcs - index) * sizeof(*ProcGlobal->statusFlags));
movecount = arrayP->numProcs - index;
memmove(&arrayP->pgprocnos[index + 1],
&arrayP->pgprocnos[index],
movecount * sizeof(*arrayP->pgprocnos));
memmove(&ProcGlobal->xids[index + 1],
&ProcGlobal->xids[index],
movecount * sizeof(*ProcGlobal->xids));
memmove(&ProcGlobal->subxidStates[index + 1],
&ProcGlobal->subxidStates[index],
movecount * sizeof(*ProcGlobal->subxidStates));
memmove(&ProcGlobal->statusFlags[index + 1],
&ProcGlobal->statusFlags[index],
movecount * sizeof(*ProcGlobal->statusFlags));

arrayP->pgprocnos[index] = proc->pgprocno;
proc->pgxactoff = index;
ProcGlobal->xids[index] = proc->xid;
ProcGlobal->subxidStates[index] = proc->subxidStatus;
ProcGlobal->statusFlags[index] = proc->statusFlags;

arrayP->numProcs++;

/* adjust pgxactoff for all following PGPROCs */
index++;
for (; index < arrayP->numProcs; index++)
{
allProcs[arrayP->pgprocnos[index]].pgxactoff = index;
int procno = arrayP->pgprocnos[index];

Assert(procno >= 0 && procno < (arrayP->maxProcs + NUM_AUXILIARY_PROCS));
Assert(allProcs[procno].pgxactoff == index - 1);

allProcs[procno].pgxactoff = index;
}

/*
@@ -525,7 +541,8 @@ void
ProcArrayRemove(PGPROC *proc, TransactionId latestXid)
{
ProcArrayStruct *arrayP = procArray;
int index;
int myoff;
int movecount;

#ifdef XIDCACHE_DEBUG
/* dump stats at backend shutdown, but not prepared-xact end */
@@ -537,69 +554,75 @@ ProcArrayRemove(PGPROC *proc, TransactionId latestXid)
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
LWLockAcquire(XidGenLock, LW_EXCLUSIVE);

Assert(ProcGlobal->allProcs[arrayP->pgprocnos[proc->pgxactoff]].pgxactoff == proc->pgxactoff);
myoff = proc->pgxactoff;

Assert(myoff >= 0 && myoff < arrayP->numProcs);
Assert(ProcGlobal->allProcs[arrayP->pgprocnos[myoff]].pgxactoff == myoff);

if (TransactionIdIsValid(latestXid))
{
Assert(TransactionIdIsValid(ProcGlobal->xids[proc->pgxactoff]));
Assert(TransactionIdIsValid(ProcGlobal->xids[myoff]));

/* Advance global latestCompletedXid while holding the lock */
MaintainLatestCompletedXid(latestXid);

/* Same with xactCompletionCount */
ShmemVariableCache->xactCompletionCount++;

ProcGlobal->xids[proc->pgxactoff] = 0;
ProcGlobal->subxidStates[proc->pgxactoff].overflowed = false;
ProcGlobal->subxidStates[proc->pgxactoff].count = 0;
ProcGlobal->xids[myoff] = InvalidTransactionId;
ProcGlobal->subxidStates[myoff].overflowed = false;
ProcGlobal->subxidStates[myoff].count = 0;
}
else
{
/* Shouldn't be trying to remove a live transaction here */
Assert(!TransactionIdIsValid(ProcGlobal->xids[proc->pgxactoff]));
Assert(!TransactionIdIsValid(ProcGlobal->xids[myoff]));
}

Assert(TransactionIdIsValid(ProcGlobal->xids[proc->pgxactoff] == 0));
Assert(TransactionIdIsValid(ProcGlobal->subxidStates[proc->pgxactoff].count == 0));
Assert(TransactionIdIsValid(ProcGlobal->subxidStates[proc->pgxactoff].overflowed == false));
ProcGlobal->statusFlags[proc->pgxactoff] = 0;
Assert(!TransactionIdIsValid(ProcGlobal->xids[myoff]));
Assert(ProcGlobal->subxidStates[myoff].count == 0);
Assert(ProcGlobal->subxidStates[myoff].overflowed == false);

ProcGlobal->statusFlags[myoff] = 0;

/* Keep the PGPROC array sorted. See notes above */
movecount = arrayP->numProcs - myoff - 1;
memmove(&arrayP->pgprocnos[myoff],
&arrayP->pgprocnos[myoff + 1],
movecount * sizeof(*arrayP->pgprocnos));
memmove(&ProcGlobal->xids[myoff],
&ProcGlobal->xids[myoff + 1],
movecount * sizeof(*ProcGlobal->xids));
memmove(&ProcGlobal->subxidStates[myoff],
&ProcGlobal->subxidStates[myoff + 1],
movecount * sizeof(*ProcGlobal->subxidStates));
memmove(&ProcGlobal->statusFlags[myoff],
&ProcGlobal->statusFlags[myoff + 1],
movecount * sizeof(*ProcGlobal->statusFlags));

arrayP->pgprocnos[arrayP->numProcs - 1] = -1; /* for debugging */
arrayP->numProcs--;

for (index = 0; index < arrayP->numProcs; index++)
/*
* Adjust pgxactoff of following procs for removed PGPROC (note that
* numProcs already has been decremented).
*/
for (int index = myoff; index < arrayP->numProcs; index++)
{
if (arrayP->pgprocnos[index] == proc->pgprocno)
{
/* Keep the PGPROC array sorted. See notes above */
memmove(&arrayP->pgprocnos[index], &arrayP->pgprocnos[index + 1],
(arrayP->numProcs - index - 1) * sizeof(*arrayP->pgprocnos));
memmove(&ProcGlobal->xids[index], &ProcGlobal->xids[index + 1],
(arrayP->numProcs - index - 1) * sizeof(*ProcGlobal->xids));
memmove(&ProcGlobal->subxidStates[index], &ProcGlobal->subxidStates[index + 1],
(arrayP->numProcs - index - 1) * sizeof(*ProcGlobal->subxidStates));
memmove(&ProcGlobal->statusFlags[index], &ProcGlobal->statusFlags[index + 1],
(arrayP->numProcs - index - 1) * sizeof(*ProcGlobal->statusFlags));

arrayP->pgprocnos[arrayP->numProcs - 1] = -1; /* for debugging */
arrayP->numProcs--;

/* adjust for removed PGPROC */
for (; index < arrayP->numProcs; index++)
allProcs[arrayP->pgprocnos[index]].pgxactoff--;
int procno = arrayP->pgprocnos[index];

/*
* Release in reversed acquisition order, to reduce frequency of
* having to wait for XidGenLock while holding ProcArrayLock.
*/
LWLockRelease(XidGenLock);
LWLockRelease(ProcArrayLock);
return;
}
Assert(procno >= 0 && procno < (arrayP->maxProcs + NUM_AUXILIARY_PROCS));
Assert(allProcs[procno].pgxactoff - 1 == index);

allProcs[procno].pgxactoff = index;
}

/* Oops */
/*
* Release in reversed acquisition order, to reduce frequency of having to
* wait for XidGenLock while holding ProcArrayLock.
*/
LWLockRelease(XidGenLock);
LWLockRelease(ProcArrayLock);

elog(LOG, "failed to find proc %p in ProcArray", proc);
}


0 comments on commit d8e950d

Please sign in to comment.