Skip to content

Commit

Permalink
Avoid passing function pointers across process boundaries.
Browse files Browse the repository at this point in the history
This back-patches commit 3247082
into 9.6, primarily to make buildfarm member culicidae happy.
Unlike the HEAD patch, avoid changing the existing API of
CreateParallelContext; instead we just switch to using
CreateParallelContextForExternalFunction, even for core functions.

Petr Jelinek, with a bunch of basically-cosmetic adjustments by me

Discussion: https://postgr.es/m/548f9c1d-eafa-e3fa-9da8-f0cc2f654e60@2ndquadrant.com
  • Loading branch information
tglsfdc committed Apr 15, 2017
1 parent d512794 commit 9c225ac
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 92 deletions.
2 changes: 1 addition & 1 deletion src/backend/access/transam/README.parallel
Expand Up @@ -198,7 +198,7 @@ pattern looks like this:

EnterParallelMode(); /* prohibit unsafe state changes */

pcxt = CreateParallelContext(entrypoint, nworkers);
pcxt = CreateParallelContextForExternalFunction("library_name", "function_name", nworkers);

/* Allow space for application-specific data here. */
shm_toc_estimate_chunk(&pcxt->estimator, size);
Expand Down
145 changes: 104 additions & 41 deletions src/backend/access/transam/parallel.c
Expand Up @@ -19,6 +19,7 @@
#include "access/xlog.h"
#include "catalog/namespace.h"
#include "commands/async.h"
#include "executor/execParallel.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "libpq/pqmq.h"
Expand Down Expand Up @@ -60,7 +61,7 @@
#define PARALLEL_KEY_TRANSACTION_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0006)
#define PARALLEL_KEY_ACTIVE_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0007)
#define PARALLEL_KEY_TRANSACTION_STATE UINT64CONST(0xFFFFFFFFFFFF0008)
#define PARALLEL_KEY_EXTENSION_TRAMPOLINE UINT64CONST(0xFFFFFFFFFFFF0009)
#define PARALLEL_KEY_ENTRYPOINT UINT64CONST(0xFFFFFFFFFFFF0009)

/* Fixed-size parallel state. */
typedef struct FixedParallelState
Expand All @@ -76,7 +77,7 @@ typedef struct FixedParallelState
pid_t parallel_master_pid;
BackendId parallel_master_backend_id;

/* Entrypoint for parallel workers. */
/* Entrypoint for parallel workers (deprecated)! */
parallel_worker_main_type entrypoint;

/* Mutex protects remaining fields. */
Expand Down Expand Up @@ -106,16 +107,36 @@ static FixedParallelState *MyFixedParallelState;
/* List of active parallel contexts. */
static dlist_head pcxt_list = DLIST_STATIC_INIT(pcxt_list);

/*
* List of internal parallel worker entry points. We need this for
* reasons explained in LookupParallelWorkerFunction(), below.
*/
static const struct
{
const char *fn_name;
parallel_worker_main_type fn_addr;
} InternalParallelWorkers[] =

{
{
"ParallelQueryMain", ParallelQueryMain
}
};

/* Private functions. */
static void HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg);
static void ParallelExtensionTrampoline(dsm_segment *seg, shm_toc *toc);
static void WaitForParallelWorkersToExit(ParallelContext *pcxt);
static parallel_worker_main_type LookupParallelWorkerFunction(char *libraryname, char *funcname);


/*
* Establish a new parallel context. This should be done after entering
* parallel mode, and (unless there is an error) the context should be
* destroyed before exiting the current subtransaction.
*
* NB: specifying the entrypoint as a function address is unportable.
* This will go away in Postgres 10, in favor of the API provided by
* CreateParallelContextForExternalFunction; in the meantime use that.
*/
ParallelContext *
CreateParallelContext(parallel_worker_main_type entrypoint, int nworkers)
Expand Down Expand Up @@ -163,9 +184,9 @@ CreateParallelContext(parallel_worker_main_type entrypoint, int nworkers)
}

/*
* Establish a new parallel context that calls a function provided by an
* extension. This works around the fact that the library might get mapped
* at a different address in each backend.
* Establish a new parallel context that calls a function specified by name.
* Unlike CreateParallelContext, this is robust against possible differences
* in address space layout between different processes.
*/
ParallelContext *
CreateParallelContextForExternalFunction(char *library_name,
Expand All @@ -179,7 +200,7 @@ CreateParallelContextForExternalFunction(char *library_name,
oldcontext = MemoryContextSwitchTo(TopTransactionContext);

/* Create the context. */
pcxt = CreateParallelContext(ParallelExtensionTrampoline, nworkers);
pcxt = CreateParallelContext(NULL, nworkers);
pcxt->library_name = pstrdup(library_name);
pcxt->function_name = pstrdup(function_name);

Expand Down Expand Up @@ -248,10 +269,9 @@ InitializeParallelDSM(ParallelContext *pcxt)
pcxt->nworkers));
shm_toc_estimate_keys(&pcxt->estimator, 1);

/* Estimate how much we'll need for extension entrypoint info. */
/* Estimate how much we'll need for entrypoint info. */
if (pcxt->library_name != NULL)
{
Assert(pcxt->entrypoint == ParallelExtensionTrampoline);
Assert(pcxt->function_name != NULL);
shm_toc_estimate_chunk(&pcxt->estimator, strlen(pcxt->library_name)
+ strlen(pcxt->function_name) + 2);
Expand Down Expand Up @@ -367,7 +387,7 @@ InitializeParallelDSM(ParallelContext *pcxt)
}
shm_toc_insert(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, error_queue_space);

/* Serialize extension entrypoint information. */
/* Serialize entrypoint information. */
if (pcxt->library_name != NULL)
{
Size lnamelen = strlen(pcxt->library_name);
Expand All @@ -377,7 +397,7 @@ InitializeParallelDSM(ParallelContext *pcxt)
+ strlen(pcxt->function_name) + 2);
strcpy(extensionstate, pcxt->library_name);
strcpy(extensionstate + lnamelen + 1, pcxt->function_name);
shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXTENSION_TRAMPOLINE,
shm_toc_insert(pcxt->toc, PARALLEL_KEY_ENTRYPOINT,
extensionstate);
}
}
Expand Down Expand Up @@ -669,6 +689,10 @@ DestroyParallelContext(ParallelContext *pcxt)
}

/* Free memory. */
if (pcxt->library_name)
pfree(pcxt->library_name);
if (pcxt->function_name)
pfree(pcxt->function_name);
pfree(pcxt);
}

Expand Down Expand Up @@ -939,6 +963,8 @@ ParallelWorkerMain(Datum main_arg)
shm_mq *mq;
shm_mq_handle *mqh;
char *libraryspace;
char *entrypointstate;
parallel_worker_main_type entrypt;
char *gucspace;
char *combocidspace;
char *tsnapspace;
Expand Down Expand Up @@ -1038,6 +1064,25 @@ ParallelWorkerMain(Datum main_arg)
Assert(libraryspace != NULL);
RestoreLibraryState(libraryspace);

/*
* Identify the entry point to be called. In theory this could result in
* loading an additional library, though most likely the entry point is in
* the core backend or in a library we just loaded.
*/
entrypointstate = shm_toc_lookup(toc, PARALLEL_KEY_ENTRYPOINT);
if (entrypointstate != NULL)
{
char *library_name;
char *function_name;

library_name = entrypointstate;
function_name = entrypointstate + strlen(library_name) + 1;

entrypt = LookupParallelWorkerFunction(library_name, function_name);
}
else
entrypt = fps->entrypoint;

/* Restore database connection. */
BackgroundWorkerInitializeConnectionByOid(fps->database_id,
fps->authenticated_user_id);
Expand Down Expand Up @@ -1101,10 +1146,11 @@ ParallelWorkerMain(Datum main_arg)
/*
* Time to do the real work: invoke the caller-supplied code.
*
* If you get a crash at this line, see the comments for
* ParallelExtensionTrampoline.
* If you get a crash at this line, try using
* CreateParallelContextForExternalFunction instead of
* CreateParallelContext.
*/
fps->entrypoint(seg, toc);
entrypt(seg, toc);

/* Must exit parallel mode to pop active snapshot. */
ExitParallelMode();
Expand All @@ -1119,33 +1165,6 @@ ParallelWorkerMain(Datum main_arg)
pq_putmessage('X', NULL, 0);
}

/*
* It's unsafe for the entrypoint invoked by ParallelWorkerMain to be a
* function living in a dynamically loaded module, because the module might
* not be loaded in every process, or might be loaded but not at the same
* address. To work around that problem, CreateParallelContextForExtension()
* arranges to call this function rather than calling the extension-provided
* function directly; and this function then looks up the real entrypoint and
* calls it.
*/
static void
ParallelExtensionTrampoline(dsm_segment *seg, shm_toc *toc)
{
char *extensionstate;
char *library_name;
char *function_name;
parallel_worker_main_type entrypt;

extensionstate = shm_toc_lookup(toc, PARALLEL_KEY_EXTENSION_TRAMPOLINE);
Assert(extensionstate != NULL);
library_name = extensionstate;
function_name = extensionstate + strlen(library_name) + 1;

entrypt = (parallel_worker_main_type)
load_external_function(library_name, function_name, true, NULL);
entrypt(seg, toc);
}

/*
* Update shared memory with the ending location of the last WAL record we
* wrote, if it's greater than the value already stored there.
Expand All @@ -1161,3 +1180,47 @@ ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end)
fps->last_xlog_end = last_xlog_end;
SpinLockRelease(&fps->mutex);
}

/*
* Look up (and possibly load) a parallel worker entry point function.
*
* For functions contained in the core code, we use library name "postgres"
* and consult the InternalParallelWorkers array. External functions are
* looked up, and loaded if necessary, using load_external_function().
*
* The point of this is to pass function names as strings across process
* boundaries. We can't pass actual function addresses because of the
* possibility that the function has been loaded at a different address
* in a different process. This is obviously a hazard for functions in
* loadable libraries, but it can happen even for functions in the core code
* on platforms using EXEC_BACKEND (e.g., Windows).
*
* At some point it might be worthwhile to get rid of InternalParallelWorkers[]
* in favor of applying load_external_function() for core functions too;
* but that raises portability issues that are not worth addressing now.
*/
static parallel_worker_main_type
LookupParallelWorkerFunction(char *libraryname, char *funcname)
{
/*
* If the function is to be loaded from postgres itself, search the
* InternalParallelWorkers array.
*/
if (strcmp(libraryname, "postgres") == 0)
{
int i;

for (i = 0; i < lengthof(InternalParallelWorkers); i++)
{
if (strcmp(InternalParallelWorkers[i].fn_name, funcname) == 0)
return InternalParallelWorkers[i].fn_addr;
}

/* We can only reach this by programming error. */
elog(ERROR, "internal function \"%s\" not found", funcname);
}

/* Otherwise load from external library. */
return (parallel_worker_main_type)
load_external_function(libraryname, funcname, true, NULL);
}
7 changes: 3 additions & 4 deletions src/backend/executor/execParallel.c
Expand Up @@ -105,8 +105,7 @@ static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt,
static bool ExecParallelRetrieveInstrumentation(PlanState *planstate,
SharedExecutorInstrumentation *instrumentation);

/* Helper functions that run in the parallel worker. */
static void ParallelQueryMain(dsm_segment *seg, shm_toc *toc);
/* Helper function that runs in the parallel worker. */
static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc);

/*
Expand Down Expand Up @@ -355,7 +354,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
pstmt_data = ExecSerializePlan(planstate->plan, estate);

/* Create a parallel context. */
pcxt = CreateParallelContext(ParallelQueryMain, nworkers);
pcxt = CreateParallelContextForExternalFunction("postgres", "ParallelQueryMain", nworkers);
pei->pcxt = pcxt;

/*
Expand Down Expand Up @@ -720,7 +719,7 @@ ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc)
* to do this are also stored in the dsm_segment and can be accessed through
* the shm_toc.
*/
static void
void
ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
{
BufferUsage *buffer_usage;
Expand Down

0 comments on commit 9c225ac

Please sign in to comment.