Skip to content
This repository has been archived by the owner on Nov 6, 2019. It is now read-only.

Commit

Permalink
Introduce latches. A latch is a boolean variable, with the capability to
Browse files Browse the repository at this point in the history
wait until it is set. Latches can be used to reliably wait until a signal
arrives, which is hard otherwise because signals don't interrupt select()
on some platforms, and even when they do, there's race conditions.

On Unix, latches use the so called self-pipe trick under the covers to
implement the sleep until the latch is set, without race conditions. On
Windows, Windows events are used.

Use the new latch abstraction to sleep in walsender, so that as soon as
a transaction finishes, walsender is woken up to immediately send the WAL
to the standby. This reduces the latency between master and standby, which
is good.

Preliminary work by Fujii Masao. The latch implementation is by me, with
helpful comments from many people.
  • Loading branch information
Heikki Linnakangas committed Sep 11, 2010
1 parent 91d8007 commit 3799b67
Show file tree
Hide file tree
Showing 13 changed files with 919 additions and 30 deletions.
10 changes: 9 additions & 1 deletion configure
Expand Up @@ -27773,6 +27773,13 @@ _ACEOF
SHMEM_IMPLEMENTATION="src/backend/port/win32_shmem.c" SHMEM_IMPLEMENTATION="src/backend/port/win32_shmem.c"
fi fi


# Select latch implementation type.
if test "$PORTNAME" != "win32"; then
LATCH_IMPLEMENTATION="src/backend/port/unix_latch.c"
else
LATCH_IMPLEMENTATION="src/backend/port/win32_latch.c"
fi

# If not set in template file, set bytes to use libc memset() # If not set in template file, set bytes to use libc memset()
if test x"$MEMSET_LOOP_LIMIT" = x"" ; then if test x"$MEMSET_LOOP_LIMIT" = x"" ; then
MEMSET_LOOP_LIMIT=1024 MEMSET_LOOP_LIMIT=1024
Expand Down Expand Up @@ -29098,7 +29105,7 @@ fi
ac_config_files="$ac_config_files GNUmakefile src/Makefile.global" ac_config_files="$ac_config_files GNUmakefile src/Makefile.global"




ac_config_links="$ac_config_links src/backend/port/dynloader.c:src/backend/port/dynloader/${template}.c src/backend/port/pg_sema.c:${SEMA_IMPLEMENTATION} src/backend/port/pg_shmem.c:${SHMEM_IMPLEMENTATION} src/include/dynloader.h:src/backend/port/dynloader/${template}.h src/include/pg_config_os.h:src/include/port/${template}.h src/Makefile.port:src/makefiles/Makefile.${template}" ac_config_links="$ac_config_links src/backend/port/dynloader.c:src/backend/port/dynloader/${template}.c src/backend/port/pg_sema.c:${SEMA_IMPLEMENTATION} src/backend/port/pg_shmem.c:${SHMEM_IMPLEMENTATION} src/backend/port/pg_latch.c:${LATCH_IMPLEMENTATION} src/include/dynloader.h:src/backend/port/dynloader/${template}.h src/include/pg_config_os.h:src/include/port/${template}.h src/Makefile.port:src/makefiles/Makefile.${template}"




if test "$PORTNAME" = "win32"; then if test "$PORTNAME" = "win32"; then
Expand Down Expand Up @@ -29722,6 +29729,7 @@ do
"src/backend/port/dynloader.c") CONFIG_LINKS="$CONFIG_LINKS src/backend/port/dynloader.c:src/backend/port/dynloader/${template}.c" ;; "src/backend/port/dynloader.c") CONFIG_LINKS="$CONFIG_LINKS src/backend/port/dynloader.c:src/backend/port/dynloader/${template}.c" ;;
"src/backend/port/pg_sema.c") CONFIG_LINKS="$CONFIG_LINKS src/backend/port/pg_sema.c:${SEMA_IMPLEMENTATION}" ;; "src/backend/port/pg_sema.c") CONFIG_LINKS="$CONFIG_LINKS src/backend/port/pg_sema.c:${SEMA_IMPLEMENTATION}" ;;
"src/backend/port/pg_shmem.c") CONFIG_LINKS="$CONFIG_LINKS src/backend/port/pg_shmem.c:${SHMEM_IMPLEMENTATION}" ;; "src/backend/port/pg_shmem.c") CONFIG_LINKS="$CONFIG_LINKS src/backend/port/pg_shmem.c:${SHMEM_IMPLEMENTATION}" ;;
"src/backend/port/pg_latch.c") CONFIG_LINKS="$CONFIG_LINKS src/backend/port/pg_latch.c:${LATCH_IMPLEMENTATION}" ;;
"src/include/dynloader.h") CONFIG_LINKS="$CONFIG_LINKS src/include/dynloader.h:src/backend/port/dynloader/${template}.h" ;; "src/include/dynloader.h") CONFIG_LINKS="$CONFIG_LINKS src/include/dynloader.h:src/backend/port/dynloader/${template}.h" ;;
"src/include/pg_config_os.h") CONFIG_LINKS="$CONFIG_LINKS src/include/pg_config_os.h:src/include/port/${template}.h" ;; "src/include/pg_config_os.h") CONFIG_LINKS="$CONFIG_LINKS src/include/pg_config_os.h:src/include/port/${template}.h" ;;
"src/Makefile.port") CONFIG_LINKS="$CONFIG_LINKS src/Makefile.port:src/makefiles/Makefile.${template}" ;; "src/Makefile.port") CONFIG_LINKS="$CONFIG_LINKS src/Makefile.port:src/makefiles/Makefile.${template}" ;;
Expand Down
8 changes: 8 additions & 0 deletions configure.in
Expand Up @@ -1700,6 +1700,13 @@ else
SHMEM_IMPLEMENTATION="src/backend/port/win32_shmem.c" SHMEM_IMPLEMENTATION="src/backend/port/win32_shmem.c"
fi fi


# Select latch implementation type.
if test "$PORTNAME" != "win32"; then
LATCH_IMPLEMENTATION="src/backend/port/unix_latch.c"
else
LATCH_IMPLEMENTATION="src/backend/port/win32_latch.c"
fi

# If not set in template file, set bytes to use libc memset() # If not set in template file, set bytes to use libc memset()
if test x"$MEMSET_LOOP_LIMIT" = x"" ; then if test x"$MEMSET_LOOP_LIMIT" = x"" ; then
MEMSET_LOOP_LIMIT=1024 MEMSET_LOOP_LIMIT=1024
Expand Down Expand Up @@ -1841,6 +1848,7 @@ AC_CONFIG_LINKS([
src/backend/port/dynloader.c:src/backend/port/dynloader/${template}.c src/backend/port/dynloader.c:src/backend/port/dynloader/${template}.c
src/backend/port/pg_sema.c:${SEMA_IMPLEMENTATION} src/backend/port/pg_sema.c:${SEMA_IMPLEMENTATION}
src/backend/port/pg_shmem.c:${SHMEM_IMPLEMENTATION} src/backend/port/pg_shmem.c:${SHMEM_IMPLEMENTATION}
src/backend/port/pg_latch.c:${LATCH_IMPLEMENTATION}
src/include/dynloader.h:src/backend/port/dynloader/${template}.h src/include/dynloader.h:src/backend/port/dynloader/${template}.h
src/include/pg_config_os.h:src/include/port/${template}.h src/include/pg_config_os.h:src/include/port/${template}.h
src/Makefile.port:src/makefiles/Makefile.${template} src/Makefile.port:src/makefiles/Makefile.${template}
Expand Down
22 changes: 22 additions & 0 deletions src/backend/access/transam/twophase.c
Expand Up @@ -55,6 +55,7 @@
#include "miscadmin.h" #include "miscadmin.h"
#include "pg_trace.h" #include "pg_trace.h"
#include "pgstat.h" #include "pgstat.h"
#include "replication/walsender.h"
#include "storage/fd.h" #include "storage/fd.h"
#include "storage/procarray.h" #include "storage/procarray.h"
#include "storage/sinvaladt.h" #include "storage/sinvaladt.h"
Expand Down Expand Up @@ -1025,6 +1026,13 @@ EndPrepare(GlobalTransaction gxact)


/* If we crash now, we have prepared: WAL replay will fix things */ /* If we crash now, we have prepared: WAL replay will fix things */


/*
* Wake up all walsenders to send WAL up to the PREPARE record
* immediately if replication is enabled
*/
if (max_wal_senders > 0)
WalSndWakeup();

/* write correct CRC and close file */ /* write correct CRC and close file */
if ((write(fd, &statefile_crc, sizeof(pg_crc32))) != sizeof(pg_crc32)) if ((write(fd, &statefile_crc, sizeof(pg_crc32))) != sizeof(pg_crc32))
{ {
Expand Down Expand Up @@ -2005,6 +2013,13 @@ RecordTransactionCommitPrepared(TransactionId xid,
/* Flush XLOG to disk */ /* Flush XLOG to disk */
XLogFlush(recptr); XLogFlush(recptr);


/*
* Wake up all walsenders to send WAL up to the COMMIT PREPARED record
* immediately if replication is enabled
*/
if (max_wal_senders > 0)
WalSndWakeup();

/* Mark the transaction committed in pg_clog */ /* Mark the transaction committed in pg_clog */
TransactionIdCommitTree(xid, nchildren, children); TransactionIdCommitTree(xid, nchildren, children);


Expand Down Expand Up @@ -2077,6 +2092,13 @@ RecordTransactionAbortPrepared(TransactionId xid,
/* Always flush, since we're about to remove the 2PC state file */ /* Always flush, since we're about to remove the 2PC state file */
XLogFlush(recptr); XLogFlush(recptr);


/*
* Wake up all walsenders to send WAL up to the ABORT PREPARED record
* immediately if replication is enabled
*/
if (max_wal_senders > 0)
WalSndWakeup();

/* /*
* Mark the transaction aborted in clog. This is not absolutely necessary * Mark the transaction aborted in clog. This is not absolutely necessary
* but we may as well do it while we are here. * but we may as well do it while we are here.
Expand Down
8 changes: 8 additions & 0 deletions src/backend/access/transam/xact.c
Expand Up @@ -36,6 +36,7 @@
#include "libpq/be-fsstubs.h" #include "libpq/be-fsstubs.h"
#include "miscadmin.h" #include "miscadmin.h"
#include "pgstat.h" #include "pgstat.h"
#include "replication/walsender.h"
#include "storage/bufmgr.h" #include "storage/bufmgr.h"
#include "storage/fd.h" #include "storage/fd.h"
#include "storage/lmgr.h" #include "storage/lmgr.h"
Expand Down Expand Up @@ -1067,6 +1068,13 @@ RecordTransactionCommit(void)


XLogFlush(XactLastRecEnd); XLogFlush(XactLastRecEnd);


/*
* Wake up all walsenders to send WAL up to the COMMIT record
* immediately if replication is enabled
*/
if (max_wal_senders > 0)
WalSndWakeup();

/* /*
* Now we may update the CLOG, if we wrote a COMMIT record above * Now we may update the CLOG, if we wrote a COMMIT record above
*/ */
Expand Down
2 changes: 1 addition & 1 deletion src/backend/port/Makefile
Expand Up @@ -21,7 +21,7 @@ subdir = src/backend/port
top_builddir = ../../.. top_builddir = ../../..
include $(top_builddir)/src/Makefile.global include $(top_builddir)/src/Makefile.global


OBJS = dynloader.o pg_sema.o pg_shmem.o $(TAS) OBJS = dynloader.o pg_sema.o pg_shmem.o pg_latch.o $(TAS)


ifeq ($(PORTNAME), darwin) ifeq ($(PORTNAME), darwin)
SUBDIRS += darwin SUBDIRS += darwin
Expand Down

0 comments on commit 3799b67

Please sign in to comment.