Permalink
Browse files

Add parallel pg_dump option.

New infrastructure is added which creates a set number of workers
(threads on Windows, forked processes on Unix). Jobs are then
handed out to these workers by the master process as needed.
pg_restore is adjusted to use this new infrastructure in place of the
old setup which created a new worker for each step on the fly. Parallel
dumps acquire a snapshot clone in order to stay consistent, if
available.

The parallel option is selected by the -j / --jobs command line
parameter of pg_dump.

Joachim Wieland, lightly editorialized by Andrew Dunstan.
  • Loading branch information...
1 parent 3b91fe1 commit 9e257a181cc1dc5e19eb5d770ce09cc98f470f5f @adunstan adunstan committed Mar 24, 2013
View
18 doc/src/sgml/backup.sgml
@@ -310,6 +310,24 @@ pg_restore -d <replaceable class="parameter">dbname</replaceable> <replaceable c
with one of the other two approaches.
</para>
+ <formalpara>
+ <title>Use <application>pg_dump</>'s parallel dump feature.</title>
+ <para>
+ To speed up the dump of a large database, you can use
+ <application>pg_dump</application>'s parallel mode. This will dump
+ multiple tables at the same time. You can control the degree of
+ parallelism with the <command>-j</command> parameter. Parallel dumps
+ are only supported for the "directory" archive format.
+
+<programlisting>
+pg_dump -j <replaceable class="parameter">num</replaceable> -F d -f <replaceable class="parameter">out.dir</replaceable> <replaceable class="parameter">dbname</replaceable>
+</programlisting>
+
+ You can use <command>pg_restore -j</command> to restore a dump in parallel.
+ This will work for any archive of either the "custom" or the "directory"
+ archive mode, whether or not it has been created with <command>pg_dump -j</command>.
+ </para>
+ </formalpara>
</sect2>
</sect1>
View
9 doc/src/sgml/perform.sgml
@@ -1435,6 +1435,15 @@ SELECT * FROM x, y, a, b, c WHERE something AND somethingelse;
</listitem>
<listitem>
<para>
+ Experiment with the parallel dump and restore modes of both
+ <application>pg_dump</> and <application>pg_restore</> and find the
+ optimal number of concurrent jobs to use. Dumping and restoring in
+ parallel by means of the <option>-j</> option should give you a
+ significantly higher performance over the serial mode.
+ </para>
+ </listitem>
+ <listitem>
+ <para>
Consider whether the whole dump should be restored as a single
transaction. To do that, pass the <option>-1</> or
<option>--single-transaction</> command-line option to
View
89 doc/src/sgml/ref/pg_dump.sgml
@@ -73,10 +73,12 @@ PostgreSQL documentation
transfer mechanism. <application>pg_dump</application> can be used to
backup an entire database, then <application>pg_restore</application>
can be used to examine the archive and/or select which parts of the
- database are to be restored. The most flexible output file format is
- the <quote>custom</quote> format (<option>-Fc</option>). It allows
- for selection and reordering of all archived items, and is compressed
- by default.
+ database are to be restored. The most flexible output file formats are
+ the <quote>custom</quote> format (<option>-Fc</option>) and the
+ <quote>directory</quote> format(<option>-Fd</option>). They allow
+ for selection and reordering of all archived items, support parallel
+ restoration, and are compressed by default. The <quote>directory</quote>
+ format is the only format that supports parallel dumps.
</para>
<para>
@@ -251,7 +253,8 @@ PostgreSQL documentation
can read. A directory format archive can be manipulated with
standard Unix tools; for example, files in an uncompressed archive
can be compressed with the <application>gzip</application> tool.
- This format is compressed by default.
+ This format is compressed by default and also supports parallel
+ dumps.
</para>
</listitem>
</varlistentry>
@@ -286,6 +289,62 @@ PostgreSQL documentation
</varlistentry>
<varlistentry>
+ <term><option>-j <replaceable class="parameter">njobs</replaceable></></term>
+ <term><option>--jobs=<replaceable class="parameter">njobs</replaceable></></term>
+ <listitem>
+ <para>
+ Run the dump in parallel by dumping <replaceable class="parameter">njobs</replaceable>
+ tables simultaneously. This option reduces the time of the dump but it also
+ increases the load on the database server. You can only use this option with the
+ directory output format because this is the only output format where multiple processes
+ can write their data at the same time.
+ </para>
+ <para>
+ <application>pg_dump</> will open <replaceable class="parameter">njobs</replaceable>
+ + 1 connections to the database, so make sure your <xref linkend="guc-max-connections">
+ setting is high enough to accommodate all connections.
+ </para>
+ <para>
+ Requesting exclusive locks on database objects while running a parallel dump could
+ cause the dump to fail. The reason is that the <application>pg_dump</> master process
+ requests shared locks on the objects that the worker processes are going to dump later
+ in order to
+ make sure that nobody deletes them and makes them go away while the dump is running.
+ If another client then requests an exclusive lock on a table, that lock will not be
+ granted but will be queued waiting for the shared lock of the master process to be
+ released.. Consequently any other access to the table will not be granted either and
+ will queue after the exclusive lock request. This includes the worker process trying
+ to dump the table. Without any precautions this would be a classic deadlock situation.
+ To detect this conflict, the <application>pg_dump</> worker process requests another
+ shared lock using the <literal>NOWAIT</> option. If the worker process is not granted
+ this shared lock, somebody else must have requested an exclusive lock in the meantime
+ and there is no way to continue with the dump, so <application>pg_dump</> has no choice
+ but to abort the dump.
+ </para>
+ <para>
+ For a consistent backup, the database server needs to support synchronized snapshots,
+ a feature that was introduced in <productname>PostgreSQL</productname> 9.2. With this
+ feature, database clients can ensure they see the same dataset even though they use
+ different connections. <command>pg_dump -j</command> uses multiple database
+ connections; it connects to the database once with the master process and
+ once again for each worker job. Without the sychronized snapshot feature, the
+ different worker jobs wouldn't be guaranteed to see the same data in each connection,
+ which could lead to an inconsistent backup.
+ </para>
+ <para>
+ If you want to run a parallel dump of a pre-9.2 server, you need to make sure that the
+ database content doesn't change from between the time the master connects to the
+ database until the last worker job has connected to the database. The easiest way to
+ do this is to halt any data modifying processes (DDL and DML) accessing the database
+ before starting the backup. You also need to specify the
+ <option>--no-synchronized-snapshots</option> parameter when running
+ <command>pg_dump -j</command> against a pre-9.2 <productname>PostgreSQL</productname>
+ server.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
<term><option>-n <replaceable class="parameter">schema</replaceable></option></term>
<term><option>--schema=<replaceable class="parameter">schema</replaceable></option></term>
<listitem>
@@ -691,6 +750,17 @@ PostgreSQL documentation
</varlistentry>
<varlistentry>
+ <term><option>--no-synchronized-snapshots</></term>
+ <listitem>
+ <para>
+ This option allows running <command>pg_dump -j</> against a pre-9.2
+ server, see the documentation of the <option>-j</option> parameter
+ for more details.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
<term><option>--no-tablespaces</option></term>
<listitem>
<para>
@@ -1083,6 +1153,15 @@ CREATE DATABASE foo WITH TEMPLATE template0;
</para>
<para>
+ To dump a database into a directory-format archive in parallel with
+ 5 worker jobs:
+
+<screen>
+<prompt>$</prompt> <userinput>pg_dump -Fd mydb -j 5 -f dumpdir</userinput>
+</screen>
+ </para>
+
+ <para>
To reload an archive file into a (freshly created) database named
<literal>newdb</>:
View
2 src/bin/pg_dump/Makefile
@@ -19,7 +19,7 @@ include $(top_builddir)/src/Makefile.global
override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)
OBJS= pg_backup_archiver.o pg_backup_db.o pg_backup_custom.o \
- pg_backup_null.o pg_backup_tar.o \
+ pg_backup_null.o pg_backup_tar.o parallel.o \
pg_backup_directory.o dumputils.o compress_io.o $(WIN32RES)
KEYWRDOBJS = keywords.o kwlookup.o
View
10 src/bin/pg_dump/compress_io.c
@@ -54,6 +54,7 @@
#include "compress_io.h"
#include "dumputils.h"
+#include "parallel.h"
/*----------------------
* Compressor API
@@ -182,6 +183,9 @@ size_t
WriteDataToArchive(ArchiveHandle *AH, CompressorState *cs,
const void *data, size_t dLen)
{
+ /* Are we aborting? */
+ checkAborting(AH);
+
switch (cs->comprAlg)
{
case COMPR_ALG_LIBZ:
@@ -351,6 +355,9 @@ ReadDataFromArchiveZlib(ArchiveHandle *AH, ReadFunc readF)
/* no minimal chunk size for zlib */
while ((cnt = readF(AH, &buf, &buflen)))
{
+ /* Are we aborting? */
+ checkAborting(AH);
+
zp->next_in = (void *) buf;
zp->avail_in = cnt;
@@ -411,6 +418,9 @@ ReadDataFromArchiveNone(ArchiveHandle *AH, ReadFunc readF)
while ((cnt = readF(AH, &buf, &buflen)))
{
+ /* Are we aborting? */
+ checkAborting(AH);
+
ahwrite(buf, 1, cnt, AH);
}
View
86 src/bin/pg_dump/dumputils.c
@@ -38,6 +38,7 @@ static struct
} on_exit_nicely_list[MAX_ON_EXIT_NICELY];
static int on_exit_nicely_index;
+void (*on_exit_msg_func) (const char *modulename, const char *fmt, va_list ap) = vwrite_msg;
#define supports_grant_options(version) ((version) >= 70400)
@@ -48,11 +49,21 @@ static bool parseAclItem(const char *item, const char *type,
static char *copyAclUserName(PQExpBuffer output, char *input);
static void AddAcl(PQExpBuffer aclbuf, const char *keyword,
const char *subname);
+static PQExpBuffer getThreadLocalPQExpBuffer(void);
#ifdef WIN32
+static void shutdown_parallel_dump_utils(int code, void *unused);
static bool parallel_init_done = false;
static DWORD tls_index;
static DWORD mainThreadId;
+
+static void
+shutdown_parallel_dump_utils(int code, void *unused)
+{
+ /* Call the cleanup function only from the main thread */
+ if (mainThreadId == GetCurrentThreadId())
+ WSACleanup();
+}
#endif
void
@@ -61,23 +72,29 @@ init_parallel_dump_utils(void)
#ifdef WIN32
if (!parallel_init_done)
{
+ WSADATA wsaData;
+ int err;
+
tls_index = TlsAlloc();
- parallel_init_done = true;
mainThreadId = GetCurrentThreadId();
+ err = WSAStartup(MAKEWORD(2, 2), &wsaData);
+ if (err != 0)
+ {
+ fprintf(stderr, _("WSAStartup failed: %d\n"), err);
+ exit_nicely(1);
+ }
+ on_exit_nicely(shutdown_parallel_dump_utils, NULL);
+ parallel_init_done = true;
}
#endif
}
/*
- * Quotes input string if it's not a legitimate SQL identifier as-is.
- *
- * Note that the returned string must be used before calling fmtId again,
- * since we re-use the same return buffer each time. Non-reentrant but
- * reduces memory leakage. (On Windows the memory leakage will be one buffer
- * per thread, which is at least better than one per call).
+ * Non-reentrant but reduces memory leakage. (On Windows the memory leakage
+ * will be one buffer per thread, which is at least better than one per call).
*/
-const char *
-fmtId(const char *rawid)
+static PQExpBuffer
+getThreadLocalPQExpBuffer(void)
{
/*
* The Tls code goes awry if we use a static var, so we provide for both
@@ -86,9 +103,6 @@ fmtId(const char *rawid)
static PQExpBuffer s_id_return = NULL;
PQExpBuffer id_return;
- const char *cp;
- bool need_quotes = false;
-
#ifdef WIN32
if (parallel_init_done)
id_return = (PQExpBuffer) TlsGetValue(tls_index); /* 0 when not set */
@@ -118,6 +132,23 @@ fmtId(const char *rawid)
}
+ return id_return;
+}
+
+/*
+ * Quotes input string if it's not a legitimate SQL identifier as-is.
+ *
+ * Note that the returned string must be used before calling fmtId again,
+ * since we re-use the same return buffer each time.
+ */
+const char *
+fmtId(const char *rawid)
+{
+ PQExpBuffer id_return = getThreadLocalPQExpBuffer();
+
+ const char *cp;
+ bool need_quotes = false;
+
/*
* These checks need to match the identifier production in scan.l. Don't
* use islower() etc.
@@ -185,6 +216,35 @@ fmtId(const char *rawid)
return id_return->data;
}
+/*
+ * fmtQualifiedId - convert a qualified name to the proper format for
+ * the source database.
+ *
+ * Like fmtId, use the result before calling again.
+ *
+ * Since we call fmtId and it also uses getThreadLocalPQExpBuffer() we cannot
+ * use it until we're finished with calling fmtId().
+ */
+const char *
+fmtQualifiedId(int remoteVersion, const char *schema, const char *id)
+{
+ PQExpBuffer id_return;
+ PQExpBuffer lcl_pqexp = createPQExpBuffer();
+
+ /* Suppress schema name if fetching from pre-7.3 DB */
+ if (remoteVersion >= 70300 && schema && *schema)
+ {
+ appendPQExpBuffer(lcl_pqexp, "%s.", fmtId(schema));
+ }
+ appendPQExpBuffer(lcl_pqexp, "%s", fmtId(id));
+
+ id_return = getThreadLocalPQExpBuffer();
+
+ appendPQExpBuffer(id_return, "%s", lcl_pqexp->data);
+ destroyPQExpBuffer(lcl_pqexp);
+
+ return id_return->data;
+}
/*
* Convert a string value to an SQL string literal and append it to
@@ -1315,7 +1375,7 @@ exit_horribly(const char *modulename, const char *fmt,...)
va_list ap;
va_start(ap, fmt);
- vwrite_msg(modulename, fmt, ap);
+ on_exit_msg_func(modulename, fmt, ap);
va_end(ap);
exit_nicely(1);
View
13 src/bin/pg_dump/dumputils.h
@@ -29,14 +29,14 @@ typedef enum /* bits returned by set_dump_section */
typedef struct SimpleStringListCell
{
- struct SimpleStringListCell *next;
- char val[1]; /* VARIABLE LENGTH FIELD */
+ struct SimpleStringListCell *next;
+ char val[1]; /* VARIABLE LENGTH FIELD */
} SimpleStringListCell;
typedef struct SimpleStringList
{
- SimpleStringListCell *head;
- SimpleStringListCell *tail;
+ SimpleStringListCell *head;
+ SimpleStringListCell *tail;
} SimpleStringList;
@@ -47,6 +47,8 @@ extern const char *progname;
extern void init_parallel_dump_utils(void);
extern const char *fmtId(const char *identifier);
+extern const char *fmtQualifiedId(int remoteVersion,
+ const char *schema, const char *id);
extern void appendStringLiteral(PQExpBuffer buf, const char *str,
int encoding, bool std_strings);
extern void appendStringLiteralConn(PQExpBuffer buf, const char *str,
@@ -85,11 +87,12 @@ __attribute__((format(PG_PRINTF_ATTRIBUTE, 2, 0)));
extern void
exit_horribly(const char *modulename, const char *fmt,...)
__attribute__((format(PG_PRINTF_ATTRIBUTE, 2, 3), noreturn));
+extern void (*on_exit_msg_func) (const char *modulename, const char *fmt, va_list ap)
+ __attribute__((format(PG_PRINTF_ATTRIBUTE, 2, 0)));
extern void on_exit_nicely(on_exit_nicely_callback function, void *arg);
extern void exit_nicely(int code) __attribute__((noreturn));
extern void simple_string_list_append(SimpleStringList *list, const char *val);
extern bool simple_string_list_member(SimpleStringList *list, const char *val);
-
#endif /* DUMPUTILS_H */
View
1,293 src/bin/pg_dump/parallel.c
@@ -0,0 +1,1293 @@
+/*-------------------------------------------------------------------------
+ *
+ * parallel.c
+ *
+ * Parallel support for the pg_dump archiver
+ *
+ * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * The author is not responsible for loss or damages that may
+ * result from its use.
+ *
+ * IDENTIFICATION
+ * src/bin/pg_dump/parallel.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "pg_backup_db.h"
+
+#include "dumputils.h"
+#include "parallel.h"
+
+#ifndef WIN32
+#include <sys/types.h>
+#include <sys/wait.h>
+#include "signal.h"
+#include <unistd.h>
+#include <fcntl.h>
+#endif
+
+#define PIPE_READ 0
+#define PIPE_WRITE 1
+
+/* file-scope variables */
+#ifdef WIN32
+static unsigned int tMasterThreadId = 0;
+static HANDLE termEvent = INVALID_HANDLE_VALUE;
+static int pgpipe(int handles[2]);
+static int piperead(int s, char *buf, int len);
+
+/*
+ * Structure to hold info passed by _beginthreadex() to the function it calls
+ * via its single allowed argument.
+ */
+typedef struct
+{
+ ArchiveHandle *AH;
+ RestoreOptions *ropt;
+ int worker;
+ int pipeRead;
+ int pipeWrite;
+} WorkerInfo;
+
+#define pipewrite(a,b,c) send(a,b,c,0)
+#else
+/*
+ * aborting is only ever used in the master, the workers are fine with just
+ * wantAbort.
+ */
+static bool aborting = false;
+static volatile sig_atomic_t wantAbort = 0;
+
+#define pgpipe(a) pipe(a)
+#define piperead(a,b,c) read(a,b,c)
+#define pipewrite(a,b,c) write(a,b,c)
+#endif
+
+typedef struct ShutdownInformation
+{
+ ParallelState *pstate;
+ Archive *AHX;
+} ShutdownInformation;
+
+static ShutdownInformation shutdown_info;
+
+static const char *modulename = gettext_noop("parallel archiver");
+
+static ParallelSlot *GetMyPSlot(ParallelState *pstate);
+static void
+parallel_exit_msg_func(const char *modulename,
+ const char *fmt, va_list ap)
+__attribute__((format(PG_PRINTF_ATTRIBUTE, 2, 0)));
+static void
+parallel_msg_master(ParallelSlot *slot, const char *modulename,
+ const char *fmt, va_list ap)
+__attribute__((format(PG_PRINTF_ATTRIBUTE, 3, 0)));
+static void archive_close_connection(int code, void *arg);
+static void ShutdownWorkersHard(ParallelState *pstate);
+static void WaitForTerminatingWorkers(ParallelState *pstate);
+
+#ifndef WIN32
+static void sigTermHandler(int signum);
+#endif
+static void SetupWorker(ArchiveHandle *AH, int pipefd[2], int worker,
+ RestoreOptions *ropt);
+static bool HasEveryWorkerTerminated(ParallelState *pstate);
+
+static void lockTableNoWait(ArchiveHandle *AH, TocEntry *te);
+static void WaitForCommands(ArchiveHandle *AH, int pipefd[2]);
+static char *getMessageFromMaster(int pipefd[2]);
+static void sendMessageToMaster(int pipefd[2], const char *str);
+static int select_loop(int maxFd, fd_set *workerset);
+static char *getMessageFromWorker(ParallelState *pstate,
+ bool do_wait, int *worker);
+static void sendMessageToWorker(ParallelState *pstate,
+ int worker, const char *str);
+static char *readMessageFromPipe(int fd);
+
+#define messageStartsWith(msg, prefix) \
+ (strncmp(msg, prefix, strlen(prefix)) == 0)
+#define messageEquals(msg, pattern) \
+ (strcmp(msg, pattern) == 0)
+
+static ParallelSlot *
+GetMyPSlot(ParallelState *pstate)
+{
+ int i;
+
+ for (i = 0; i < pstate->numWorkers; i++)
+#ifdef WIN32
+ if (pstate->parallelSlot[i].threadId == GetCurrentThreadId())
+#else
+ if (pstate->parallelSlot[i].pid == getpid())
+#endif
+ return &(pstate->parallelSlot[i]);
+
+ return NULL;
+}
+
+/*
+ * This is the function that will be called from exit_horribly() to print the
+ * error message. If the worker process does exit_horribly(), we forward its
+ * last words to the master process. The master process then does
+ * exit_horribly() with this error message itself and prints it normally.
+ * After printing the message, exit_horribly() on the master will shut down
+ * the remaining worker processes.
+ */
+static void
+parallel_exit_msg_func(const char *modulename, const char *fmt, va_list ap)
+{
+ ParallelState *pstate = shutdown_info.pstate;
+ ParallelSlot *slot;
+
+ Assert(pstate);
+
+ slot = GetMyPSlot(pstate);
+
+ if (!slot)
+ /* We're the parent, just write the message out */
+ vwrite_msg(modulename, fmt, ap);
+ else
+ /* If we're a worker process, send the msg to the master process */
+ parallel_msg_master(slot, modulename, fmt, ap);
+}
+
+/* Sends the error message from the worker to the master process */
+static void
+parallel_msg_master(ParallelSlot *slot, const char *modulename,
+ const char *fmt, va_list ap)
+{
+ char buf[512];
+ int pipefd[2];
+
+ pipefd[PIPE_READ] = slot->pipeRevRead;
+ pipefd[PIPE_WRITE] = slot->pipeRevWrite;
+
+ strcpy(buf, "ERROR ");
+ vsnprintf(buf + strlen("ERROR "),
+ sizeof(buf) - strlen("ERROR "), fmt, ap);
+
+ sendMessageToMaster(pipefd, buf);
+}
+
+/*
+ * pg_dump and pg_restore register the Archive pointer for the exit handler
+ * (called from exit_horribly). This function mainly exists so that we can
+ * keep shutdown_info in file scope only.
+ */
+void
+on_exit_close_archive(Archive *AHX)
+{
+ shutdown_info.AHX = AHX;
+ on_exit_nicely(archive_close_connection, &shutdown_info);
+}
+
+/*
+ * This function can close archives in both the parallel and non-parallel
+ * case.
+ */
+static void
+archive_close_connection(int code, void *arg)
+{
+ ShutdownInformation *si = (ShutdownInformation *) arg;
+
+ if (si->pstate)
+ {
+ ParallelSlot *slot = GetMyPSlot(si->pstate);
+
+ if (!slot)
+ {
+ /*
+ * We're the master: We have already printed out the message
+ * passed to exit_horribly() either from the master itself or from
+ * a worker process. Now we need to close our own database
+ * connection (only open during parallel dump but not restore) and
+ * shut down the remaining workers.
+ */
+ DisconnectDatabase(si->AHX);
+#ifndef WIN32
+
+ /*
+ * Setting aborting to true switches to best-effort-mode
+ * (send/receive but ignore errors) in communicating with our
+ * workers.
+ */
+ aborting = true;
+#endif
+ ShutdownWorkersHard(si->pstate);
+ }
+ else if (slot->args->AH)
+ DisconnectDatabase(&(slot->args->AH->public));
+ }
+ else if (si->AHX)
+ DisconnectDatabase(si->AHX);
+}
+
+/*
+ * If we have one worker that terminates for some reason, we'd like the other
+ * threads to terminate as well (and not finish with their 70 GB table dump
+ * first...). Now in UNIX we can just kill these processes, and let the signal
+ * handler set wantAbort to 1. In Windows we set a termEvent and this serves
+ * as the signal for everyone to terminate.
+ */
+void
+checkAborting(ArchiveHandle *AH)
+{
+#ifdef WIN32
+ if (WaitForSingleObject(termEvent, 0) == WAIT_OBJECT_0)
+#else
+ if (wantAbort)
+#endif
+ exit_horribly(modulename, "worker is terminating\n");
+}
+
+/*
+ * Shut down any remaining workers, this has an implicit do_wait == true.
+ *
+ * The fastest way we can make the workers terminate gracefully is when
+ * they are listening for new commands and we just tell them to terminate.
+ */
+static void
+ShutdownWorkersHard(ParallelState *pstate)
+{
+#ifndef WIN32
+ int i;
+
+ signal(SIGPIPE, SIG_IGN);
+
+ /*
+ * Close our write end of the sockets so that the workers know they can
+ * exit.
+ */
+ for (i = 0; i < pstate->numWorkers; i++)
+ closesocket(pstate->parallelSlot[i].pipeWrite);
+
+ for (i = 0; i < pstate->numWorkers; i++)
+ kill(pstate->parallelSlot[i].pid, SIGTERM);
+#else
+ /* The workers monitor this event via checkAborting(). */
+ SetEvent(termEvent);
+#endif
+
+ WaitForTerminatingWorkers(pstate);
+}
+
+/*
+ * Wait for the termination of the processes using the OS-specific method.
+ */
+static void
+WaitForTerminatingWorkers(ParallelState *pstate)
+{
+ while (!HasEveryWorkerTerminated(pstate))
+ {
+ ParallelSlot *slot = NULL;
+ int j;
+
+#ifndef WIN32
+ int status;
+ pid_t pid = wait(&status);
+
+ for (j = 0; j < pstate->numWorkers; j++)
+ if (pstate->parallelSlot[j].pid == pid)
+ slot = &(pstate->parallelSlot[j]);
+#else
+ uintptr_t hThread;
+ DWORD ret;
+ uintptr_t *lpHandles = pg_malloc(sizeof(HANDLE) * pstate->numWorkers);
+ int nrun = 0;
+
+ for (j = 0; j < pstate->numWorkers; j++)
+ if (pstate->parallelSlot[j].workerStatus != WRKR_TERMINATED)
+ {
+ lpHandles[nrun] = pstate->parallelSlot[j].hThread;
+ nrun++;
+ }
+ ret = WaitForMultipleObjects(nrun, (HANDLE *) lpHandles, false, INFINITE);
+ Assert(ret != WAIT_FAILED);
+ hThread = lpHandles[ret - WAIT_OBJECT_0];
+
+ for (j = 0; j < pstate->numWorkers; j++)
+ if (pstate->parallelSlot[j].hThread == hThread)
+ slot = &(pstate->parallelSlot[j]);
+
+ free(lpHandles);
+#endif
+ Assert(slot);
+
+ slot->workerStatus = WRKR_TERMINATED;
+ }
+ Assert(HasEveryWorkerTerminated(pstate));
+}
+
+#ifndef WIN32
+/* Signal handling (UNIX only) */
+static void
+sigTermHandler(int signum)
+{
+ wantAbort = 1;
+}
+#endif
+
+/*
+ * This function is called by both UNIX and Windows variants to set up a
+ * worker process.
+ */
+static void
+SetupWorker(ArchiveHandle *AH, int pipefd[2], int worker,
+ RestoreOptions *ropt)
+{
+ /*
+ * Call the setup worker function that's defined in the ArchiveHandle.
+ *
+ * We get the raw connection only for the reason that we can close it
+ * properly when we shut down. This happens only that way when it is
+ * brought down because of an error.
+ */
+ (AH->SetupWorkerPtr) ((Archive *) AH, ropt);
+
+ Assert(AH->connection != NULL);
+
+ WaitForCommands(AH, pipefd);
+
+ closesocket(pipefd[PIPE_READ]);
+ closesocket(pipefd[PIPE_WRITE]);
+}
+
+#ifdef WIN32
+static unsigned __stdcall
+init_spawned_worker_win32(WorkerInfo *wi)
+{
+ ArchiveHandle *AH;
+ int pipefd[2] = {wi->pipeRead, wi->pipeWrite};
+ int worker = wi->worker;
+ RestoreOptions *ropt = wi->ropt;
+
+ AH = CloneArchive(wi->AH);
+
+ free(wi);
+ SetupWorker(AH, pipefd, worker, ropt);
+
+ DeCloneArchive(AH);
+ _endthreadex(0);
+ return 0;
+}
+#endif
+
+/*
+ * This function starts the parallel dump or restore by spawning off the
+ * worker processes in both Unix and Windows. For Windows, it creates a number
+ * of threads while it does a fork() on Unix.
+ */
+ParallelState *
+ParallelBackupStart(ArchiveHandle *AH, RestoreOptions *ropt)
+{
+ ParallelState *pstate;
+ int i;
+ const size_t slotSize = AH->public.numWorkers * sizeof(ParallelSlot);
+
+ Assert(AH->public.numWorkers > 0);
+
+ /* Ensure stdio state is quiesced before forking */
+ fflush(NULL);
+
+ pstate = (ParallelState *) pg_malloc(sizeof(ParallelState));
+
+ pstate->numWorkers = AH->public.numWorkers;
+ pstate->parallelSlot = NULL;
+
+ if (AH->public.numWorkers == 1)
+ return pstate;
+
+ pstate->parallelSlot = (ParallelSlot *) pg_malloc(slotSize);
+ memset((void *) pstate->parallelSlot, 0, slotSize);
+
+ /*
+ * Set the pstate in the shutdown_info. The exit handler uses pstate if
+ * set and falls back to AHX otherwise.
+ */
+ shutdown_info.pstate = pstate;
+ on_exit_msg_func = parallel_exit_msg_func;
+
+#ifdef WIN32
+ tMasterThreadId = GetCurrentThreadId();
+ termEvent = CreateEvent(NULL, true, false, "Terminate");
+#else
+ signal(SIGTERM, sigTermHandler);
+ signal(SIGINT, sigTermHandler);
+ signal(SIGQUIT, sigTermHandler);
+#endif
+
+ for (i = 0; i < pstate->numWorkers; i++)
+ {
+#ifdef WIN32
+ WorkerInfo *wi;
+ uintptr_t handle;
+#else
+ pid_t pid;
+#endif
+ int pipeMW[2],
+ pipeWM[2];
+
+ if (pgpipe(pipeMW) < 0 || pgpipe(pipeWM) < 0)
+ exit_horribly(modulename,
+ "Cannot create communication channels: %s\n",
+ strerror(errno));
+
+ pstate->parallelSlot[i].workerStatus = WRKR_IDLE;
+ pstate->parallelSlot[i].args = (ParallelArgs *) pg_malloc(sizeof(ParallelArgs));
+ pstate->parallelSlot[i].args->AH = NULL;
+ pstate->parallelSlot[i].args->te = NULL;
+#ifdef WIN32
+ /* Allocate a new structure for every worker */
+ wi = (WorkerInfo *) pg_malloc(sizeof(WorkerInfo));
+
+ wi->ropt = ropt;
+ wi->worker = i;
+ wi->AH = AH;
+ wi->pipeRead = pstate->parallelSlot[i].pipeRevRead = pipeMW[PIPE_READ];
+ wi->pipeWrite = pstate->parallelSlot[i].pipeRevWrite = pipeWM[PIPE_WRITE];
+
+ handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32,
+ wi, 0, &(pstate->parallelSlot[i].threadId));
+ pstate->parallelSlot[i].hThread = handle;
+#else
+ pid = fork();
+ if (pid == 0)
+ {
+ /* we are the worker */
+ int j;
+ int pipefd[2] = {pipeMW[PIPE_READ], pipeWM[PIPE_WRITE]};
+
+ /*
+ * Store the fds for the reverse communication in pstate. Actually
+ * we only use this in case of an error and don't use pstate
+ * otherwise in the worker process. On Windows we write to the
+ * global pstate, in Unix we write to our process-local copy but
+ * that's also where we'd retrieve this information back from.
+ */
+ pstate->parallelSlot[i].pipeRevRead = pipefd[PIPE_READ];
+ pstate->parallelSlot[i].pipeRevWrite = pipefd[PIPE_WRITE];
+ pstate->parallelSlot[i].pid = getpid();
+
+ /*
+ * Call CloneArchive on Unix as well even though technically we
+ * don't need to because fork() gives us a copy in our own address
+ * space already. But CloneArchive resets the state information
+ * and also clones the database connection (for parallel dump)
+ * which both seem kinda helpful.
+ */
+ pstate->parallelSlot[i].args->AH = CloneArchive(AH);
+
+ /* close read end of Worker -> Master */
+ closesocket(pipeWM[PIPE_READ]);
+ /* close write end of Master -> Worker */
+ closesocket(pipeMW[PIPE_WRITE]);
+
+ /*
+ * Close all inherited fds for communication of the master with
+ * the other workers.
+ */
+ for (j = 0; j < i; j++)
+ {
+ closesocket(pstate->parallelSlot[j].pipeRead);
+ closesocket(pstate->parallelSlot[j].pipeWrite);
+ }
+
+ SetupWorker(pstate->parallelSlot[i].args->AH, pipefd, i, ropt);
+
+ exit(0);
+ }
+ else if (pid < 0)
+ /* fork failed */
+ exit_horribly(modulename,
+ "could not create worker process: %s\n",
+ strerror(errno));
+
+ /* we are the Master, pid > 0 here */
+ Assert(pid > 0);
+
+ /* close read end of Master -> Worker */
+ closesocket(pipeMW[PIPE_READ]);
+ /* close write end of Worker -> Master */
+ closesocket(pipeWM[PIPE_WRITE]);
+
+ pstate->parallelSlot[i].pid = pid;
+#endif
+
+ pstate->parallelSlot[i].pipeRead = pipeWM[PIPE_READ];
+ pstate->parallelSlot[i].pipeWrite = pipeMW[PIPE_WRITE];
+ }
+
+ return pstate;
+}
+
+/*
+ * Tell all of our workers to terminate.
+ *
+ * Pretty straightforward routine, first we tell everyone to terminate, then
+ * we listen to the workers' replies and finally close the sockets that we
+ * have used for communication.
+ */
+void
+ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate)
+{
+ int i;
+
+ if (pstate->numWorkers == 1)
+ return;
+
+ Assert(IsEveryWorkerIdle(pstate));
+
+ /* close the sockets so that the workers know they can exit */
+ for (i = 0; i < pstate->numWorkers; i++)
+ {
+ closesocket(pstate->parallelSlot[i].pipeRead);
+ closesocket(pstate->parallelSlot[i].pipeWrite);
+ }
+ WaitForTerminatingWorkers(pstate);
+
+ /*
+ * Remove the pstate again, so the exit handler in the parent will now
+ * again fall back to closing AH->connection (if connected).
+ */
+ shutdown_info.pstate = NULL;
+
+ free(pstate->parallelSlot);
+ free(pstate);
+}
+
+
+/*
+ * The sequence is the following (for dump, similar for restore):
+ *
+ * The master process starts the parallel backup in ParllelBackupStart, this
+ * forks the worker processes which enter WaitForCommand().
+ *
+ * The master process dispatches an individual work item to one of the worker
+ * processes in DispatchJobForTocEntry(). It calls
+ * AH->MasterStartParallelItemPtr, a routine of the output format. This
+ * function's arguments are the parents archive handle AH (containing the full
+ * catalog information), the TocEntry that the worker should work on and a
+ * T_Action act indicating whether this is a backup or a restore item. The
+ * function then converts the TocEntry assignment into a string that is then
+ * sent over to the worker process. In the simplest case that would be
+ * something like "DUMP 1234", with 1234 being the TocEntry id.
+ *
+ * The worker receives the message in the routine pointed to by
+ * WorkerJobDumpPtr or WorkerJobRestorePtr. These are also pointers to
+ * corresponding routines of the respective output format, e.g.
+ * _WorkerJobDumpDirectory().
+ *
+ * Remember that we have forked off the workers only after we have read in the
+ * catalog. That's why our worker processes can also access the catalog
+ * information. Now they re-translate the textual representation to a TocEntry
+ * on their side and do the required action (restore or dump).
+ *
+ * The result is again a textual string that is sent back to the master and is
+ * interpreted by AH->MasterEndParallelItemPtr. This function can update state
+ * or catalog information on the master's side, depending on the reply from
+ * the worker process. In the end it returns status which is 0 for successful
+ * execution.
+ *
+ * ---------------------------------------------------------------------
+ * Master Worker
+ *
+ * enters WaitForCommands()
+ * DispatchJobForTocEntry(...te...)
+ *
+ * [ Worker is IDLE ]
+ *
+ * arg = (MasterStartParallelItemPtr)()
+ * send: DUMP arg
+ * receive: DUMP arg
+ * str = (WorkerJobDumpPtr)(arg)
+ * [ Worker is WORKING ] ... gets te from arg ...
+ * ... dump te ...
+ * send: OK DUMP info
+ *
+ * In ListenToWorkers():
+ *
+ * [ Worker is FINISHED ]
+ * receive: OK DUMP info
+ * status = (MasterEndParallelItemPtr)(info)
+ *
+ * In ReapWorkerStatus(&ptr):
+ * *ptr = status;
+ * [ Worker is IDLE ]
+ * ---------------------------------------------------------------------
+ */
+void
+DispatchJobForTocEntry(ArchiveHandle *AH, ParallelState *pstate, TocEntry *te,
+ T_Action act)
+{
+ int worker;
+ char *arg;
+
+ /* our caller makes sure that at least one worker is idle */
+ Assert(GetIdleWorker(pstate) != NO_SLOT);
+ worker = GetIdleWorker(pstate);
+ Assert(worker != NO_SLOT);
+
+ arg = (AH->MasterStartParallelItemPtr) (AH, te, act);
+
+ sendMessageToWorker(pstate, worker, arg);
+
+ pstate->parallelSlot[worker].workerStatus = WRKR_WORKING;
+ pstate->parallelSlot[worker].args->te = te;
+}
+
+/*
+ * Find the first free parallel slot (if any).
+ */
+int
+GetIdleWorker(ParallelState *pstate)
+{
+ int i;
+
+ for (i = 0; i < pstate->numWorkers; i++)
+ if (pstate->parallelSlot[i].workerStatus == WRKR_IDLE)
+ return i;
+ return NO_SLOT;
+}
+
+/*
+ * Return true iff every worker process is in the WRKR_TERMINATED state.
+ */
+static bool
+HasEveryWorkerTerminated(ParallelState *pstate)
+{
+ int i;
+
+ for (i = 0; i < pstate->numWorkers; i++)
+ if (pstate->parallelSlot[i].workerStatus != WRKR_TERMINATED)
+ return false;
+ return true;
+}
+
+/*
+ * Return true iff every worker is in the WRKR_IDLE state.
+ */
+bool
+IsEveryWorkerIdle(ParallelState *pstate)
+{
+ int i;
+
+ for (i = 0; i < pstate->numWorkers; i++)
+ if (pstate->parallelSlot[i].workerStatus != WRKR_IDLE)
+ return false;
+ return true;
+}
+
+/*
+ * ---------------------------------------------------------------------
+ * One danger of the parallel backup is a possible deadlock:
+ *
+ * 1) Master dumps the schema and locks all tables in ACCESS SHARE mode.
+ * 2) Another process requests an ACCESS EXCLUSIVE lock (which is not granted
+ * because the master holds a conflicting ACCESS SHARE lock).
+ * 3) The worker process also requests an ACCESS SHARE lock to read the table.
+ * The worker's not granted that lock but is enqueued behind the ACCESS
+ * EXCLUSIVE lock request.
+ * ---------------------------------------------------------------------
+ *
+ * Now what we do here is to just request a lock in ACCESS SHARE but with
+ * NOWAIT in the worker prior to touching the table. If we don't get the lock,
+ * then we know that somebody else has requested an ACCESS EXCLUSIVE lock and
+ * are good to just fail the whole backup because we have detected a deadlock.
+ */
+static void
+lockTableNoWait(ArchiveHandle *AH, TocEntry *te)
+{
+ Archive *AHX = (Archive *) AH;
+ const char *qualId;
+ PQExpBuffer query = createPQExpBuffer();
+ PGresult *res;
+
+ Assert(AH->format == archDirectory);
+ Assert(strcmp(te->desc, "BLOBS") != 0);
+
+ appendPQExpBuffer(query,
+ "SELECT pg_namespace.nspname,"
+ " pg_class.relname "
+ " FROM pg_class "
+ " JOIN pg_namespace on pg_namespace.oid = relnamespace "
+ " WHERE pg_class.oid = %d", te->catalogId.oid);
+
+ res = PQexec(AH->connection, query->data);
+
+ if (!res || PQresultStatus(res) != PGRES_TUPLES_OK)
+ exit_horribly(modulename,
+ "could not get relation name for oid %d: %s\n",
+ te->catalogId.oid, PQerrorMessage(AH->connection));
+
+ resetPQExpBuffer(query);
+
+ qualId = fmtQualifiedId(AHX->remoteVersion,
+ PQgetvalue(res, 0, 0),
+ PQgetvalue(res, 0, 1));
+
+ appendPQExpBuffer(query, "LOCK TABLE %s IN ACCESS SHARE MODE NOWAIT",
+ qualId);
+ PQclear(res);
+
+ res = PQexec(AH->connection, query->data);
+
+ if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
+ exit_horribly(modulename,
+ "could not obtain lock on relation \"%s\". This "
+ "usually means that someone requested an ACCESS EXCLUSIVE lock "
+ "on the table after the pg_dump parent process has gotten the "
+ "initial ACCESS SHARE lock on the table.\n", qualId);
+
+ PQclear(res);
+ destroyPQExpBuffer(query);
+}
+
+/*
+ * That's the main routine for the worker.
+ * When it starts up it enters this routine and waits for commands from the
+ * master process. After having processed a command it comes back to here to
+ * wait for the next command. Finally it will receive a TERMINATE command and
+ * exit.
+ */
+static void
+WaitForCommands(ArchiveHandle *AH, int pipefd[2])
+{
+ char *command;
+ DumpId dumpId;
+ int nBytes;
+ char *str = NULL;
+ TocEntry *te;
+
+ for (;;)
+ {
+ if (!(command = getMessageFromMaster(pipefd)))
+ {
+ PQfinish(AH->connection);
+ AH->connection = NULL;
+ return;
+ }
+
+ if (messageStartsWith(command, "DUMP "))
+ {
+ Assert(AH->format == archDirectory);
+ sscanf(command + strlen("DUMP "), "%d%n", &dumpId, &nBytes);
+ Assert(nBytes == strlen(command) - strlen("DUMP "));
+
+ te = getTocEntryByDumpId(AH, dumpId);
+ Assert(te != NULL);
+
+ /*
+ * Lock the table but with NOWAIT. Note that the parent is already
+ * holding a lock. If we cannot acquire another ACCESS SHARE MODE
+ * lock, then somebody else has requested an exclusive lock in the
+ * meantime. lockTableNoWait dies in this case to prevent a
+ * deadlock.
+ */
+ if (strcmp(te->desc, "BLOBS") != 0)
+ lockTableNoWait(AH, te);
+
+ /*
+ * The message we return here has been pg_malloc()ed and we are
+ * responsible for free()ing it.
+ */
+ str = (AH->WorkerJobDumpPtr) (AH, te);
+ Assert(AH->connection != NULL);
+ sendMessageToMaster(pipefd, str);
+ free(str);
+ }
+ else if (messageStartsWith(command, "RESTORE "))
+ {
+ Assert(AH->format == archDirectory || AH->format == archCustom);
+ Assert(AH->connection != NULL);
+
+ sscanf(command + strlen("RESTORE "), "%d%n", &dumpId, &nBytes);
+ Assert(nBytes == strlen(command) - strlen("RESTORE "));
+
+ te = getTocEntryByDumpId(AH, dumpId);
+ Assert(te != NULL);
+
+ /*
+ * The message we return here has been pg_malloc()ed and we are
+ * responsible for free()ing it.
+ */
+ str = (AH->WorkerJobRestorePtr) (AH, te);
+ Assert(AH->connection != NULL);
+ sendMessageToMaster(pipefd, str);
+ free(str);
+ }
+ else
+ exit_horribly(modulename,
+ "Unknown command on communication channel: %s\n",
+ command);
+ }
+}
+
+/*
+ * ---------------------------------------------------------------------
+ * Note the status change:
+ *
+ * DispatchJobForTocEntry WRKR_IDLE -> WRKR_WORKING
+ * ListenToWorkers WRKR_WORKING -> WRKR_FINISHED / WRKR_TERMINATED
+ * ReapWorkerStatus WRKR_FINISHED -> WRKR_IDLE
+ * ---------------------------------------------------------------------
+ *
+ * Just calling ReapWorkerStatus() when all workers are working might or might
+ * not give you an idle worker because you need to call ListenToWorkers() in
+ * between and only thereafter ReapWorkerStatus(). This is necessary in order
+ * to get and deal with the status (=result) of the worker's execution.
+ */
+void
+ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
+{
+ int worker;
+ char *msg;
+
+ msg = getMessageFromWorker(pstate, do_wait, &worker);
+
+ if (!msg)
+ {
+ if (do_wait)
+ exit_horribly(modulename, "A worker process died unexpectedly\n");
+ return;
+ }
+
+ if (messageStartsWith(msg, "OK "))
+ {
+ char *statusString;
+ TocEntry *te;
+
+ pstate->parallelSlot[worker].workerStatus = WRKR_FINISHED;
+ te = pstate->parallelSlot[worker].args->te;
+ if (messageStartsWith(msg, "OK RESTORE "))
+ {
+ statusString = msg + strlen("OK RESTORE ");
+ pstate->parallelSlot[worker].status =
+ (AH->MasterEndParallelItemPtr)
+ (AH, te, statusString, ACT_RESTORE);
+ }
+ else if (messageStartsWith(msg, "OK DUMP "))
+ {
+ statusString = msg + strlen("OK DUMP ");
+ pstate->parallelSlot[worker].status =
+ (AH->MasterEndParallelItemPtr)
+ (AH, te, statusString, ACT_DUMP);
+ }
+ else
+ exit_horribly(modulename,
+ "Invalid message received from worker: %s\n", msg);
+ }
+ else if (messageStartsWith(msg, "ERROR "))
+ {
+ Assert(AH->format == archDirectory || AH->format == archCustom);
+ pstate->parallelSlot[worker].workerStatus = WRKR_TERMINATED;
+ exit_horribly(modulename, "%s", msg + strlen("ERROR "));
+ }
+ else
+ exit_horribly(modulename, "Invalid message received from worker: %s\n", msg);
+
+ /* both Unix and Win32 return pg_malloc()ed space, so we free it */
+ free(msg);
+}
+
+/*
+ * This function is executed in the master process.
+ *
+ * This function is used to get the return value of a terminated worker
+ * process. If a process has terminated, its status is stored in *status and
+ * the id of the worker is returned.
+ */
+int
+ReapWorkerStatus(ParallelState *pstate, int *status)
+{
+ int i;
+
+ for (i = 0; i < pstate->numWorkers; i++)
+ {
+ if (pstate->parallelSlot[i].workerStatus == WRKR_FINISHED)
+ {
+ *status = pstate->parallelSlot[i].status;
+ pstate->parallelSlot[i].status = 0;
+ pstate->parallelSlot[i].workerStatus = WRKR_IDLE;
+ return i;
+ }
+ }
+ return NO_SLOT;
+}
+
+/*
+ * This function is executed in the master process.
+ *
+ * It looks for an idle worker process and only returns if there is one.
+ */
+void
+EnsureIdleWorker(ArchiveHandle *AH, ParallelState *pstate)
+{
+ int ret_worker;
+ int work_status;
+
+ for (;;)
+ {
+ int nTerm = 0;
+
+ while ((ret_worker = ReapWorkerStatus(pstate, &work_status)) != NO_SLOT)
+ {
+ if (work_status != 0)
+ exit_horribly(modulename, "Error processing a parallel work item.\n");
+
+ nTerm++;
+ }
+
+ /*
+ * We need to make sure that we have an idle worker before dispatching
+ * the next item. If nTerm > 0 we already have that (quick check).
+ */
+ if (nTerm > 0)
+ return;
+
+ /* explicit check for an idle worker */
+ if (GetIdleWorker(pstate) != NO_SLOT)
+ return;
+
+ /*
+ * If we have no idle worker, read the result of one or more workers
+ * and loop the loop to call ReapWorkerStatus() on them
+ */
+ ListenToWorkers(AH, pstate, true);
+ }
+}
+
+/*
+ * This function is executed in the master process.
+ *
+ * It waits for all workers to terminate.
+ */
+void
+EnsureWorkersFinished(ArchiveHandle *AH, ParallelState *pstate)
+{
+ int work_status;
+
+ if (!pstate || pstate->numWorkers == 1)
+ return;
+
+ /* Waiting for the remaining worker processes to finish */
+ while (!IsEveryWorkerIdle(pstate))
+ {
+ if (ReapWorkerStatus(pstate, &work_status) == NO_SLOT)
+ ListenToWorkers(AH, pstate, true);
+ else if (work_status != 0)
+ exit_horribly(modulename,
+ "Error processing a parallel work item\n");
+ }
+}
+
+/*
+ * This function is executed in the worker process.
+ *
+ * It returns the next message on the communication channel, blocking until it
+ * becomes available.
+ */
+static char *
+getMessageFromMaster(int pipefd[2])
+{
+ return readMessageFromPipe(pipefd[PIPE_READ]);
+}
+
+/*
+ * This function is executed in the worker process.
+ *
+ * It sends a message to the master on the communication channel.
+ */
+static void
+sendMessageToMaster(int pipefd[2], const char *str)
+{
+ int len = strlen(str) + 1;
+
+ if (pipewrite(pipefd[PIPE_WRITE], str, len) != len)
+ exit_horribly(modulename,
+ "Error writing to the communication channel: %s\n",
+ strerror(errno));
+}
+
+/*
+ * A select loop that repeats calling select until a descriptor in the read
+ * set becomes readable. On Windows we have to check for the termination event
+ * from time to time, on Unix we can just block forever.
+ */
+static int
+select_loop(int maxFd, fd_set *workerset)
+{
+ int i;
+ fd_set saveSet = *workerset;
+
+#ifdef WIN32
+ /* should always be the master */
+ Assert(tMasterThreadId == GetCurrentThreadId());
+
+ for (;;)
+ {
+ /*
+ * sleep a quarter of a second before checking if we should terminate.
+ */
+ struct timeval tv = {0, 250000};
+
+ *workerset = saveSet;
+ i = select(maxFd + 1, workerset, NULL, NULL, &tv);
+
+ if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR)
+ continue;
+ if (i)
+ break;
+ }
+
+#else /* UNIX */
+
+ for (;;)
+ {
+ *workerset = saveSet;
+ i = select(maxFd + 1, workerset, NULL, NULL, NULL);
+
+ /*
+ * If we Ctrl-C the master process , it's likely that we interrupt
+ * select() here. The signal handler will set wantAbort == true and
+ * the shutdown journey starts from here. Note that we'll come back
+ * here later when we tell all workers to terminate and read their
+ * responses. But then we have aborting set to true.
+ */
+ if (wantAbort && !aborting)
+ exit_horribly(modulename, "terminated by user\n");
+
+ if (i < 0 && errno == EINTR)
+ continue;
+ break;
+ }
+
+#endif
+
+ return i;
+}
+
+
+/*
+ * This function is executed in the master process.
+ *
+ * It returns the next message from the worker on the communication channel,
+ * optionally blocking (do_wait) until it becomes available.
+ *
+ * The id of the worker is returned in *worker.
+ */
+static char *
+getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker)
+{
+ int i;
+ fd_set workerset;
+ int maxFd = -1;
+ struct timeval nowait = {0, 0};
+
+ FD_ZERO(&workerset);
+
+ for (i = 0; i < pstate->numWorkers; i++)
+ {
+ if (pstate->parallelSlot[i].workerStatus == WRKR_TERMINATED)
+ continue;
+ FD_SET(pstate->parallelSlot[i].pipeRead, &workerset);
+ /* actually WIN32 ignores the first parameter to select()... */
+ if (pstate->parallelSlot[i].pipeRead > maxFd)
+ maxFd = pstate->parallelSlot[i].pipeRead;
+ }
+
+ if (do_wait)
+ {
+ i = select_loop(maxFd, &workerset);
+ Assert(i != 0);
+ }
+ else
+ {
+ if ((i = select(maxFd + 1, &workerset, NULL, NULL, &nowait)) == 0)
+ return NULL;
+ }
+
+ if (i < 0)
+ exit_horribly(modulename, "Error in ListenToWorkers(): %s", strerror(errno));
+
+ for (i = 0; i < pstate->numWorkers; i++)
+ {
+ char *msg;
+
+ if (!FD_ISSET(pstate->parallelSlot[i].pipeRead, &workerset))
+ continue;
+
+ msg = readMessageFromPipe(pstate->parallelSlot[i].pipeRead);
+ *worker = i;
+ return msg;
+ }
+ Assert(false);
+ return NULL;
+}
+
+/*
+ * This function is executed in the master process.
+ *
+ * It sends a message to a certain worker on the communication channel.
+ */
+static void
+sendMessageToWorker(ParallelState *pstate, int worker, const char *str)
+{
+ int len = strlen(str) + 1;
+
+ if (pipewrite(pstate->parallelSlot[worker].pipeWrite, str, len) != len)
+ {
+ /*
+ * If we're already aborting anyway, don't care if we succeed or not.
+ * The child might have gone already.
+ */
+#ifndef WIN32
+ if (!aborting)
+#endif
+ exit_horribly(modulename,
+ "Error writing to the communication channel: %s\n",
+ strerror(errno));
+ }
+}
+
+/*
+ * The underlying function to read a message from the communication channel
+ * (fd) with optional blocking (do_wait).
+ */
+static char *
+readMessageFromPipe(int fd)
+{
+ char *msg;
+ int msgsize,
+ bufsize;
+ int ret;
+
+ /*
+ * The problem here is that we need to deal with several possibilites: we
+ * could receive only a partial message or several messages at once. The
+ * caller expects us to return exactly one message however.
+ *
+ * We could either read in as much as we can and keep track of what we
+ * delivered back to the caller or we just read byte by byte. Once we see
+ * (char) 0, we know that it's the message's end. This would be quite
+ * inefficient for more data but since we are reading only on the command
+ * channel, the performance loss does not seem worth the trouble of
+ * keeping internal states for different file descriptors.
+ */
+ bufsize = 64; /* could be any number */
+ msg = (char *) pg_malloc(bufsize);
+
+ msgsize = 0;
+ for (;;)
+ {
+ Assert(msgsize <= bufsize);
+ ret = piperead(fd, msg + msgsize, 1);
+
+ /* worker has closed the connection or another error happened */
+ if (ret <= 0)
+ return NULL;
+
+ Assert(ret == 1);
+
+ if (msg[msgsize] == '\0')
+ return msg;
+
+ msgsize++;
+ if (msgsize == bufsize)
+ {
+ /* could be any number */
+ bufsize += 16;
+ msg = (char *) realloc(msg, bufsize);
+ }
+ }
+}
+
+#ifdef WIN32
+/*
+ * This is a replacement version of pipe for Win32 which allows returned
+ * handles to be used in select(). Note that read/write calls must be replaced
+ * with recv/send.
+ */
+static int
+pgpipe(int handles[2])
+{
+ SOCKET s;
+ struct sockaddr_in serv_addr;
+ int len = sizeof(serv_addr);
+
+ handles[0] = handles[1] = INVALID_SOCKET;
+
+ if ((s = socket(AF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET)
+ {
+ write_msg(modulename, "pgpipe could not create socket: %ui",
+ WSAGetLastError());
+ return -1;
+ }
+
+ memset((void *) &serv_addr, 0, sizeof(serv_addr));
+ serv_addr.sin_family = AF_INET;
+ serv_addr.sin_port = htons(0);
+ serv_addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+ if (bind(s, (SOCKADDR *) & serv_addr, len) == SOCKET_ERROR)
+ {
+ write_msg(modulename, "pgpipe could not bind: %ui",
+ WSAGetLastError());
+ closesocket(s);
+ return -1;
+ }
+ if (listen(s, 1) == SOCKET_ERROR)
+ {
+ write_msg(modulename, "pgpipe could not listen: %ui",
+ WSAGetLastError());
+ closesocket(s);
+ return -1;
+ }
+ if (getsockname(s, (SOCKADDR *) & serv_addr, &len) == SOCKET_ERROR)
+ {
+ write_msg(modulename, "pgpipe could not getsockname: %ui",
+ WSAGetLastError());
+ closesocket(s);
+ return -1;
+ }
+ if ((handles[1] = socket(PF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET)
+ {
+ write_msg(modulename, "pgpipe could not create socket 2: %ui",
+ WSAGetLastError());
+ closesocket(s);
+ return -1;
+ }
+
+ if (connect(handles[1], (SOCKADDR *) & serv_addr, len) == SOCKET_ERROR)
+ {
+ write_msg(modulename, "pgpipe could not connect socket: %ui",
+ WSAGetLastError());
+ closesocket(s);
+ return -1;
+ }
+ if ((handles[0] = accept(s, (SOCKADDR *) & serv_addr, &len)) == INVALID_SOCKET)
+ {
+ write_msg(modulename, "pgpipe could not accept socket: %ui",
+ WSAGetLastError());
+ closesocket(handles[1]);
+ handles[1] = INVALID_SOCKET;
+ closesocket(s);
+ return -1;
+ }
+ closesocket(s);
+ return 0;
+}
+
+static int
+piperead(int s, char *buf, int len)
+{
+ int ret = recv(s, buf, len, 0);
+
+ if (ret < 0 && WSAGetLastError() == WSAECONNRESET)
+ /* EOF on the pipe! (win32 socket based implementation) */
+ ret = 0;
+ return ret;
+}
+
+#endif
View
85 src/bin/pg_dump/parallel.h
@@ -0,0 +1,85 @@
+/*-------------------------------------------------------------------------
+ *
+ * parallel.h
+ *
+ * Parallel support header file for the pg_dump archiver
+ *
+ * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * The author is not responsible for loss or damages that may
+ * result from its use.
+ *
+ * IDENTIFICATION
+ * src/bin/pg_dump/parallel.h
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "pg_backup_db.h"
+
+struct _archiveHandle;
+struct _tocEntry;
+
+typedef enum
+{
+ WRKR_TERMINATED = 0,
+ WRKR_IDLE,
+ WRKR_WORKING,
+ WRKR_FINISHED
+} T_WorkerStatus;
+
+typedef enum T_Action
+{
+ ACT_DUMP,
+ ACT_RESTORE,
+} T_Action;
+
+/* Arguments needed for a worker process */
+typedef struct ParallelArgs
+{
+ struct _archiveHandle *AH;
+ struct _tocEntry *te;
+} ParallelArgs;
+
+/* State for each parallel activity slot */
+typedef struct ParallelSlot
+{
+ ParallelArgs *args;
+ T_WorkerStatus workerStatus;
+ int status;
+ int pipeRead;
+ int pipeWrite;
+ int pipeRevRead;
+ int pipeRevWrite;
+#ifdef WIN32
+ uintptr_t hThread;
+ unsigned int threadId;
+#else
+ pid_t pid;
+#endif
+} ParallelSlot;
+
+#define NO_SLOT (-1)
+
+typedef struct ParallelState
+{
+ int numWorkers;
+ ParallelSlot *parallelSlot;
+} ParallelState;
+
+extern int GetIdleWorker(ParallelState *pstate);
+extern bool IsEveryWorkerIdle(ParallelState *pstate);
+extern void ListenToWorkers(struct _archiveHandle * AH, ParallelState *pstate, bool do_wait);
+extern int ReapWorkerStatus(ParallelState *pstate, int *status);
+extern void EnsureIdleWorker(struct _archiveHandle * AH, ParallelState *pstate);
+extern void EnsureWorkersFinished(struct _archiveHandle * AH, ParallelState *pstate);
+
+extern ParallelState *ParallelBackupStart(struct _archiveHandle * AH,
+ RestoreOptions *ropt);
+extern void DispatchJobForTocEntry(struct _archiveHandle * AH,
+ ParallelState *pstate,
+ struct _tocEntry * te, T_Action act);
+extern void ParallelBackupEnd(struct _archiveHandle * AH, ParallelState *pstate);
+
+extern void checkAborting(struct _archiveHandle * AH);
View
11 src/bin/pg_dump/pg_backup.h
@@ -82,9 +82,14 @@ struct Archive
int minRemoteVersion; /* allowable range */
int maxRemoteVersion;
+ int numWorkers; /* number of parallel processes */
+ char *sync_snapshot_id; /* sync snapshot id for parallel
+ * operation */
+
/* info needed for string escaping */
int encoding; /* libpq code for client_encoding */
bool std_strings; /* standard_conforming_strings */
+ char *use_role; /* Issue SET ROLE to this */
/* error handling */
bool exit_on_error; /* whether to exit on SQL errors... */
@@ -142,11 +147,12 @@ typedef struct _restoreOptions
int suppressDumpWarnings; /* Suppress output of WARNING entries
* to stderr */
bool single_txn;
- int number_of_jobs;
bool *idWanted; /* array showing which dump IDs to emit */
} RestoreOptions;
+typedef void (*SetupWorkerPtr) (Archive *AH, RestoreOptions *ropt);
+
/*
* Main archiver interface.
*/
@@ -189,7 +195,8 @@ extern Archive *OpenArchive(const char *FileSpec, const ArchiveFormat fmt);
/* Create a new archive */
extern Archive *CreateArchive(const char *FileSpec, const ArchiveFormat fmt,
- const int compression, ArchiveMode mode);
+ const int compression, ArchiveMode mode,
+ SetupWorkerPtr setupDumpWorker);
/* The --list option */
extern void PrintTOCSummary(Archive *AH, RestoreOptions *ropt);
View
735 src/bin/pg_dump/pg_backup_archiver.c
@@ -22,8 +22,10 @@
#include "pg_backup_db.h"
#include "dumputils.h"
+#include "parallel.h"
#include <ctype.h>
+#include <fcntl.h>
#include <unistd.h>
#include <sys/stat.h>
#include <sys/types.h>
@@ -35,72 +37,6 @@
#include "libpq/libpq-fs.h"
-/*
- * Special exit values from worker children. We reserve 0 for normal
- * success; 1 and other small values should be interpreted as crashes.
- */
-#define WORKER_CREATE_DONE 10
-#define WORKER_INHIBIT_DATA 11
-#define WORKER_IGNORED_ERRORS 12
-
-/*
- * Unix uses exit to return result from worker child, so function is void.
- * Windows thread result comes via function return.
- */
-#ifndef WIN32
-#define parallel_restore_result void
-#else
-#define parallel_restore_result DWORD
-#endif
-
-/* IDs for worker children are either PIDs or thread handles */
-#ifndef WIN32
-#define thandle pid_t
-#else
-#define thandle HANDLE
-#endif
-
-typedef struct ParallelStateEntry
-{
-#ifdef WIN32
- unsigned int threadId;
-#else
- pid_t pid;
-#endif
- ArchiveHandle *AH;
-} ParallelStateEntry;
-
-typedef struct ParallelState
-{
- int numWorkers;
- ParallelStateEntry *pse;
-} ParallelState;
-
-/* Arguments needed for a worker child */
-typedef struct _restore_args
-{
- ArchiveHandle *AH;
- TocEntry *te;
- ParallelStateEntry *pse;
-} RestoreArgs;
-
-/* State for each parallel activity slot */
-typedef struct _parallel_slot
-{
- thandle child_id;
- RestoreArgs *args;
-} ParallelSlot;
-
-typedef struct ShutdownInformation
-{
- ParallelState *pstate;
- Archive *AHX;
-} ShutdownInformation;
-
-static ShutdownInformation shutdown_info;
-
-#define NO_SLOT (-1)
-
#define TEXT_DUMP_HEADER "--\n-- PostgreSQL database dump\n--\n\n"
#define TEXT_DUMPALL_HEADER "--\n-- PostgreSQL database cluster dump\n--\n\n"
@@ -116,7 +52,7 @@ static const char *modulename = gettext_noop("archiver");
static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt,
- const int compression, ArchiveMode mode);
+ const int compression, ArchiveMode mode, SetupWorkerPtr setupWorkerPtr);
static void _getObjectDescription(PQExpBuffer buf, TocEntry *te,
ArchiveHandle *AH);
static void _printTocEntry(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt, bool isData, bool acl_pass);
@@ -136,7 +72,6 @@ static bool _tocEntryIsACL(TocEntry *te);
static void _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt);
static void _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt);
static void buildTocEntryArrays(ArchiveHandle *AH);
-static TocEntry *getTocEntryByDumpId(ArchiveHandle *AH, DumpId id);
static void _moveBefore(ArchiveHandle *AH, TocEntry *pos, TocEntry *te);
static int _discoverArchiveFormat(ArchiveHandle *AH);
@@ -149,21 +84,19 @@ static void RestoreOutput(ArchiveHandle *AH, OutputContext savedContext);
static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te,
RestoreOptions *ropt, bool is_parallel);
-static void restore_toc_entries_parallel(ArchiveHandle *AH);
-static thandle spawn_restore(RestoreArgs *args);
-static thandle reap_child(ParallelSlot *slots, int n_slots, int *work_status);
-static bool work_in_progress(ParallelSlot *slots, int n_slots);
-static int get_next_slot(ParallelSlot *slots, int n_slots);
+static void restore_toc_entries_prefork(ArchiveHandle *AH);
+static void restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
+ TocEntry *pending_list);
+static void restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list);
static void par_list_header_init(TocEntry *l);
static void par_list_append(TocEntry *l, TocEntry *te);
static void par_list_remove(TocEntry *te);
static TocEntry *get_next_work_item(ArchiveHandle *AH,
TocEntry *ready_list,
- ParallelSlot *slots, int n_slots);
-static parallel_restore_result parallel_restore(RestoreArgs *args);
+ ParallelState *pstate);
static void mark_work_done(ArchiveHandle *AH, TocEntry *ready_list,
- thandle worker, int status,
- ParallelSlot *slots, int n_slots);
+ int worker, int status,
+ ParallelState *pstate);
static void fix_dependencies(ArchiveHandle *AH);
static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2);
static void repoint_table_dependencies(ArchiveHandle *AH);
@@ -172,14 +105,6 @@ static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
TocEntry *ready_list);
static void mark_create_done(ArchiveHandle *AH, TocEntry *te);
static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te);
-static ArchiveHandle *CloneArchive(ArchiveHandle *AH);
-static void DeCloneArchive(ArchiveHandle *AH);
-
-static void setProcessIdentifier(ParallelStateEntry *pse, ArchiveHandle *AH);
-static void unsetProcessIdentifier(ParallelStateEntry *pse);
-static ParallelStateEntry *GetMyPSEntry(ParallelState *pstate);
-static void archive_close_connection(int code, void *arg);
-
/*
* Wrapper functions.
@@ -189,15 +114,28 @@ static void archive_close_connection(int code, void *arg);
*
*/
+/*
+ * The dump worker setup needs lots of knowledge of the internals of pg_dump,
+ * so It's defined in pg_dump.c and passed into OpenArchive. The restore worker
+ * setup doesn't need to know anything much, so it's defined here.
+ */
+static void
+setupRestoreWorker(Archive *AHX, RestoreOptions *ropt)
+{
+ ArchiveHandle *AH = (ArchiveHandle *) AHX;
+
+ (AH->ReopenPtr) (AH);
+}
+
/* Create a new archive */
/* Public */
Archive *
CreateArchive(const char *FileSpec, const ArchiveFormat fmt,
- const int compression, ArchiveMode mode)
+ const int compression, ArchiveMode mode, SetupWorkerPtr setupDumpWorker)
{
- ArchiveHandle *AH = _allocAH(FileSpec, fmt, compression, mode);
+ ArchiveHandle *AH = _allocAH(FileSpec, fmt, compression, mode, setupDumpWorker);
return (Archive *) AH;
}
@@ -207,7 +145,7 @@ CreateArchive(const char *FileSpec, const ArchiveFormat fmt,
Archive *
OpenArchive(const char *FileSpec, const ArchiveFormat fmt)
{
- ArchiveHandle *AH = _allocAH(FileSpec, fmt, 0, archModeRead);
+ ArchiveHandle *AH = _allocAH(FileSpec, fmt, 0, archModeRead, setupRestoreWorker);
return (Archive *) AH;
}
@@ -311,7 +249,7 @@ RestoreArchive(Archive *AHX)
/*
* If we're going to do parallel restore, there are some restrictions.
*/
- parallel_mode = (ropt->number_of_jobs > 1 && ropt->useDB);
+ parallel_mode = (AH->public.numWorkers > 1 && ropt->useDB);
if (parallel_mode)
{
/* We haven't got round to making this work for all archive formats */
@@ -499,7 +437,25 @@ RestoreArchive(Archive *AHX)
* In parallel mode, turn control over to the parallel-restore logic.
*/
if (parallel_mode)
- restore_toc_entries_parallel(AH);
+ {
+ ParallelState *pstate;
+ TocEntry pending_list;
+
+ par_list_header_init(&pending_list);
+
+ /* This runs PRE_DATA items and then disconnects from the database */
+ restore_toc_entries_prefork(AH);
+ Assert(AH->connection == NULL);
+
+ /* ParallelBackupStart() will actually fork the processes */
+ pstate = ParallelBackupStart(AH, ropt);
+ restore_toc_entries_parallel(AH, pstate, &pending_list);
+ ParallelBackupEnd(AH, pstate);
+
+ /* reconnect the master and see if we missed something */
+ restore_toc_entries_postfork(AH, &pending_list);
+ Assert(AH->connection != NULL);
+ }
else
{
for (te = AH->toc->next; te != AH->toc; te = te->next)
@@ -558,7 +514,7 @@ static int
restore_toc_entry(ArchiveHandle *AH, TocEntry *te,
RestoreOptions *ropt, bool is_parallel)
{
- int retval = 0;
+ int status = WORKER_OK;
teReqs reqs;
bool defnDumped;
@@ -611,7 +567,7 @@ restore_toc_entry(ArchiveHandle *AH, TocEntry *te,
if (ropt->noDataForFailedTables)
{
if (is_parallel)
- retval = WORKER_INHIBIT_DATA;
+ status = WORKER_INHIBIT_DATA;
else
inhibit_data_for_failed_table(AH, te);
}
@@ -626,7 +582,7 @@ restore_toc_entry(ArchiveHandle *AH, TocEntry *te,
* just set the return value.
*/
if (is_parallel)
- retval = WORKER_CREATE_DONE;
+ status = WORKER_CREATE_DONE;
else
mark_create_done(AH, te);
}
@@ -744,7 +700,10 @@ restore_toc_entry(ArchiveHandle *AH, TocEntry *te,
}
}
- return retval;
+ if (AH->public.n_errors > 0 && status == WORKER_OK)
+ status = WORKER_IGNORED_ERRORS;
+
+ return status;
}
/*
@@ -1634,7 +1593,7 @@ buildTocEntryArrays(ArchiveHandle *AH)
}
}
-static TocEntry *
+TocEntry *
getTocEntryByDumpId(ArchiveHandle *AH, DumpId id)
{
/* build index arrays if we didn't already */
@@ -2018,7 +1977,7 @@ _discoverArchiveFormat(ArchiveHandle *AH)
*/
static ArchiveHandle *
_allocAH(const char *FileSpec, const ArchiveFormat fmt,
- const int compression, ArchiveMode mode)
+ const int compression, ArchiveMode mode, SetupWorkerPtr setupWorkerPtr)
{
ArchiveHandle *AH;
@@ -2100,6 +2059,8 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt,
}
#endif
+ AH->SetupWorkerPtr = setupWorkerPtr;
+
if (fmt == archUnknown)
AH->format = _discoverArchiveFormat(AH);
else
@@ -2132,50 +2093,66 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt,
return AH;
}
-
void
-WriteDataChunks(ArchiveHandle *AH)
+WriteDataChunks(ArchiveHandle *AH, ParallelState *pstate)
{
TocEntry *te;
- StartDataPtr startPtr;
- EndDataPtr endPtr;
for (te = AH->toc->next; te != AH->toc; te = te->next)
{
- if (te->dataDumper != NULL && (te->reqs & REQ_DATA) != 0)
- {
- AH->currToc = te;
- /* printf("Writing data for %d (%x)\n", te->id, te); */
-
- if (strcmp(te->desc, "BLOBS") == 0)
- {
- startPtr = AH->StartBlobsPtr;
- endPtr = AH->EndBlobsPtr;
- }
- else
- {
- startPtr = AH->StartDataPtr;
- endPtr = AH->EndDataPtr;
- }
+ if (!te->dataDumper)
+ continue;
- if (startPtr != NULL)
- (*startPtr) (AH, te);
+ if ((te->reqs & REQ_DATA) == 0)
+ continue;
+ if (pstate && pstate->numWorkers > 1)
+ {
/*
- * printf("Dumper arg for %d is %x\n", te->id, te->dataDumperArg);
+ * If we are in a parallel backup, then we are always the master
+ * process.
*/
+ EnsureIdleWorker(AH, pstate);
+ Assert(GetIdleWorker(pstate) != NO_SLOT);
+ DispatchJobForTocEntry(AH, pstate, te, ACT_DUMP);
+ }
+ else
+ WriteDataChunksForTocEntry(AH, te);
+ }
+ EnsureWorkersFinished(AH, pstate);
+}
- /*
- * The user-provided DataDumper routine needs to call
- * AH->WriteData
- */
- (*te->dataDumper) ((Archive *) AH, te->dataDumperArg);
+void
+WriteDataChunksForTocEntry(ArchiveHandle *AH, TocEntry *te)
+{
+ StartDataPtr startPtr;
+ EndDataPtr endPtr;
- if (endPtr != NULL)
- (*endPtr) (AH, te);
- AH->currToc = NULL;
- }
+ AH->currToc = te;
+
+ if (strcmp(te->desc, "BLOBS") == 0)
+ {
+ startPtr = AH->StartBlobsPtr;
+ endPtr = AH->EndBlobsPtr;
}
+ else
+ {
+ startPtr = AH->StartDataPtr;
+ endPtr = AH->EndDataPtr;
+ }
+
+ if (startPtr != NULL)
+ (*startPtr) (AH, te);
+
+ /*
+ * The user-provided DataDumper routine needs to call AH->WriteData
+ */
+ (*te->dataDumper) ((Archive *) AH, te->dataDumperArg);
+
+ if (endPtr != NULL)
+ (*endPtr) (AH, te);
+
+ AH->currToc = NULL;
}
void
@@ -2911,7 +2888,7 @@ _getObjectDescription(PQExpBuffer buf, TocEntry *te, ArchiveHandle *AH)
const char *type = te->desc;
/* Use ALTER TABLE for views and sequences */
- if (strcmp(type, "VIEW") == 0 || strcmp(type, "SEQUENCE") == 0||
+ if (strcmp(type, "VIEW") == 0 || strcmp(type, "SEQUENCE") == 0 ||
strcmp(type, "MATERIALIZED VIEW") == 0)
type = "TABLE";
@@ -3404,67 +3381,6 @@ dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim)
ahprintf(AH, "-- %s %s\n\n", msg, buf);
}
-static void
-setProcessIdentifier(ParallelStateEntry *pse, ArchiveHandle *AH)
-{
-#ifdef WIN32
- pse->threadId = GetCurrentThreadId();
-#else
- pse->pid = getpid();
-#endif
- pse->AH = AH;
-}
-
-static void
-unsetProcessIdentifier(ParallelStateEntry *pse)
-{
-#ifdef WIN32
- pse->threadId = 0;
-#else
- pse->pid = 0;
-#endif
- pse->AH = NULL;
-}
-
-static ParallelStateEntry *
-GetMyPSEntry(ParallelState *pstate)
-{
- int i;
-
- for (i = 0; i < pstate->numWorkers; i++)
-#ifdef WIN32
- if (pstate->pse[i].threadId == GetCurrentThreadId())
-#else
- if (pstate->pse[i].pid == getpid())
-#endif
- return &(pstate->pse[i]);
-
- return NULL;
-}
-
-static void
-archive_close_connection(int code, void *arg)
-{
- ShutdownInformation *si = (ShutdownInformation *) arg;
-
- if (si->pstate)
- {
- ParallelStateEntry *entry = GetMyPSEntry(si->pstate);
-
- if (entry != NULL && entry->AH)
- DisconnectDatabase(&(entry->AH->public));
- }
- else if (si->AHX)
- DisconnectDatabase(si->AHX);
-}
-
-void
-on_exit_close_archive(Archive *AHX)
-{
- shutdown_info.AHX = AHX;
- on_exit_nicely(archive_close_connection, &shutdown_info);
-}
-
/*
* Main engine for parallel restore.
*
@@ -3477,30 +3393,13 @@ on_exit_close_archive(Archive *AHX)
* RestoreArchive).
*/
static void
-restore_toc_entries_parallel(ArchiveHandle *AH)
+restore_toc_entries_prefork(ArchiveHandle *AH)
{
RestoreOptions *ropt = AH->ropt;
- int n_slots = ropt->number_of_jobs;
- ParallelSlot *slots;
- int work_status;
- int next_slot;
bool skipped_some;
- TocEntry pending_list;
- TocEntry ready_list;
TocEntry *next_work_item;
- thandle ret_child;
- TocEntry *te;
- ParallelState *pstate;
- int i;
-
- ahlog(AH, 2, "entering restore_toc_entries_parallel\n");
- slots = (ParallelSlot *) pg_malloc0(n_slots * sizeof(ParallelSlot));
- pstate = (ParallelState *) pg_malloc(sizeof(ParallelState));
- pstate->pse = (ParallelStateEntry *) pg_malloc0(n_slots * sizeof(ParallelStateEntry));
- pstate->numWorkers = ropt->number_of_jobs;
- for (i = 0; i < pstate->numWorkers; i++)
- unsetProcessIdentifier(&(pstate->pse[i]));
+ ahlog(AH, 2, "entering restore_toc_entries_prefork\n");
/* Adjust dependency information */
fix_dependencies(AH);
@@ -3509,7 +3408,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
* Do all the early stuff in a single connection in the parent. There's no
* great point in running it in parallel, in fact it will actually run
* faster in a single connection because we avoid all the connection and
- * setup overhead. Also, pre-9.2 pg_dump versions were not very good
+ * setup overhead. Also, pre-9.2 pg_dump versions were not very good
* about showing all the dependencies of SECTION_PRE_DATA items, so we do
* not risk trying to process them out-of-order.
*
@@ -3561,12 +3460,6 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
*/
DisconnectDatabase(&AH->public);
- /*
- * Set the pstate in the shutdown_info. The exit handler uses pstate if
- * set and falls back to AHX otherwise.
- */
- shutdown_info.pstate = pstate;
-
/* blow away any transient state from the old connection */
if (AH->currUser)
free(AH->currUser);
@@ -3578,17 +3471,42 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
free(AH->currTablespace);
AH->currTablespace = NULL;
AH->currWithOids = -1;
+}
+
+/*
+ * Main engine for parallel restore.
+ *
+ * Work is done in three phases.
+ * First we process all SECTION_PRE_DATA tocEntries, in a single connection,
+ * just as for a standard restore. This is done in restore_toc_entries_prefork().
+ * Second we process the remaining non-ACL steps in parallel worker children
+ * (threads on Windows, processes on Unix), these fork off and set up their
+ * connections before we call restore_toc_entries_parallel_forked.
+ * Finally we process all the ACL entries in a single connection (that happens
+ * back in RestoreArchive).
+ */
+static void
+restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
+ TocEntry *pending_list)
+{
+ int work_status;
+ bool skipped_some;