diff --git a/src/backend/access/transam/README.parallel b/src/backend/access/transam/README.parallel index db9ac3d504de7..61d54dc55064d 100644 --- a/src/backend/access/transam/README.parallel +++ b/src/backend/access/transam/README.parallel @@ -198,7 +198,7 @@ pattern looks like this: EnterParallelMode(); /* prohibit unsafe state changes */ - pcxt = CreateParallelContext(entrypoint, nworkers); + pcxt = CreateParallelContextForExternalFunction("library_name", "function_name", nworkers); /* Allow space for application-specific data here. */ shm_toc_estimate_chunk(&pcxt->estimator, size); diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index d2bed7270571d..867ccf855d925 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -19,6 +19,7 @@ #include "access/xlog.h" #include "catalog/namespace.h" #include "commands/async.h" +#include "executor/execParallel.h" #include "libpq/libpq.h" #include "libpq/pqformat.h" #include "libpq/pqmq.h" @@ -60,7 +61,7 @@ #define PARALLEL_KEY_TRANSACTION_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0006) #define PARALLEL_KEY_ACTIVE_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0007) #define PARALLEL_KEY_TRANSACTION_STATE UINT64CONST(0xFFFFFFFFFFFF0008) -#define PARALLEL_KEY_EXTENSION_TRAMPOLINE UINT64CONST(0xFFFFFFFFFFFF0009) +#define PARALLEL_KEY_ENTRYPOINT UINT64CONST(0xFFFFFFFFFFFF0009) /* Fixed-size parallel state. */ typedef struct FixedParallelState @@ -76,7 +77,7 @@ typedef struct FixedParallelState pid_t parallel_master_pid; BackendId parallel_master_backend_id; - /* Entrypoint for parallel workers. */ + /* Entrypoint for parallel workers (deprecated)! */ parallel_worker_main_type entrypoint; /* Mutex protects remaining fields. */ @@ -106,16 +107,36 @@ static FixedParallelState *MyFixedParallelState; /* List of active parallel contexts. */ static dlist_head pcxt_list = DLIST_STATIC_INIT(pcxt_list); +/* + * List of internal parallel worker entry points. We need this for + * reasons explained in LookupParallelWorkerFunction(), below. + */ +static const struct +{ + const char *fn_name; + parallel_worker_main_type fn_addr; +} InternalParallelWorkers[] = + +{ + { + "ParallelQueryMain", ParallelQueryMain + } +}; + /* Private functions. */ static void HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg); -static void ParallelExtensionTrampoline(dsm_segment *seg, shm_toc *toc); static void WaitForParallelWorkersToExit(ParallelContext *pcxt); +static parallel_worker_main_type LookupParallelWorkerFunction(char *libraryname, char *funcname); /* * Establish a new parallel context. This should be done after entering * parallel mode, and (unless there is an error) the context should be * destroyed before exiting the current subtransaction. + * + * NB: specifying the entrypoint as a function address is unportable. + * This will go away in Postgres 10, in favor of the API provided by + * CreateParallelContextForExternalFunction; in the meantime use that. */ ParallelContext * CreateParallelContext(parallel_worker_main_type entrypoint, int nworkers) @@ -163,9 +184,9 @@ CreateParallelContext(parallel_worker_main_type entrypoint, int nworkers) } /* - * Establish a new parallel context that calls a function provided by an - * extension. This works around the fact that the library might get mapped - * at a different address in each backend. + * Establish a new parallel context that calls a function specified by name. + * Unlike CreateParallelContext, this is robust against possible differences + * in address space layout between different processes. */ ParallelContext * CreateParallelContextForExternalFunction(char *library_name, @@ -179,7 +200,7 @@ CreateParallelContextForExternalFunction(char *library_name, oldcontext = MemoryContextSwitchTo(TopTransactionContext); /* Create the context. */ - pcxt = CreateParallelContext(ParallelExtensionTrampoline, nworkers); + pcxt = CreateParallelContext(NULL, nworkers); pcxt->library_name = pstrdup(library_name); pcxt->function_name = pstrdup(function_name); @@ -248,10 +269,9 @@ InitializeParallelDSM(ParallelContext *pcxt) pcxt->nworkers)); shm_toc_estimate_keys(&pcxt->estimator, 1); - /* Estimate how much we'll need for extension entrypoint info. */ + /* Estimate how much we'll need for entrypoint info. */ if (pcxt->library_name != NULL) { - Assert(pcxt->entrypoint == ParallelExtensionTrampoline); Assert(pcxt->function_name != NULL); shm_toc_estimate_chunk(&pcxt->estimator, strlen(pcxt->library_name) + strlen(pcxt->function_name) + 2); @@ -367,7 +387,7 @@ InitializeParallelDSM(ParallelContext *pcxt) } shm_toc_insert(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, error_queue_space); - /* Serialize extension entrypoint information. */ + /* Serialize entrypoint information. */ if (pcxt->library_name != NULL) { Size lnamelen = strlen(pcxt->library_name); @@ -377,7 +397,7 @@ InitializeParallelDSM(ParallelContext *pcxt) + strlen(pcxt->function_name) + 2); strcpy(extensionstate, pcxt->library_name); strcpy(extensionstate + lnamelen + 1, pcxt->function_name); - shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXTENSION_TRAMPOLINE, + shm_toc_insert(pcxt->toc, PARALLEL_KEY_ENTRYPOINT, extensionstate); } } @@ -669,6 +689,10 @@ DestroyParallelContext(ParallelContext *pcxt) } /* Free memory. */ + if (pcxt->library_name) + pfree(pcxt->library_name); + if (pcxt->function_name) + pfree(pcxt->function_name); pfree(pcxt); } @@ -939,6 +963,8 @@ ParallelWorkerMain(Datum main_arg) shm_mq *mq; shm_mq_handle *mqh; char *libraryspace; + char *entrypointstate; + parallel_worker_main_type entrypt; char *gucspace; char *combocidspace; char *tsnapspace; @@ -1038,6 +1064,25 @@ ParallelWorkerMain(Datum main_arg) Assert(libraryspace != NULL); RestoreLibraryState(libraryspace); + /* + * Identify the entry point to be called. In theory this could result in + * loading an additional library, though most likely the entry point is in + * the core backend or in a library we just loaded. + */ + entrypointstate = shm_toc_lookup(toc, PARALLEL_KEY_ENTRYPOINT); + if (entrypointstate != NULL) + { + char *library_name; + char *function_name; + + library_name = entrypointstate; + function_name = entrypointstate + strlen(library_name) + 1; + + entrypt = LookupParallelWorkerFunction(library_name, function_name); + } + else + entrypt = fps->entrypoint; + /* Restore database connection. */ BackgroundWorkerInitializeConnectionByOid(fps->database_id, fps->authenticated_user_id); @@ -1101,10 +1146,11 @@ ParallelWorkerMain(Datum main_arg) /* * Time to do the real work: invoke the caller-supplied code. * - * If you get a crash at this line, see the comments for - * ParallelExtensionTrampoline. + * If you get a crash at this line, try using + * CreateParallelContextForExternalFunction instead of + * CreateParallelContext. */ - fps->entrypoint(seg, toc); + entrypt(seg, toc); /* Must exit parallel mode to pop active snapshot. */ ExitParallelMode(); @@ -1119,33 +1165,6 @@ ParallelWorkerMain(Datum main_arg) pq_putmessage('X', NULL, 0); } -/* - * It's unsafe for the entrypoint invoked by ParallelWorkerMain to be a - * function living in a dynamically loaded module, because the module might - * not be loaded in every process, or might be loaded but not at the same - * address. To work around that problem, CreateParallelContextForExtension() - * arranges to call this function rather than calling the extension-provided - * function directly; and this function then looks up the real entrypoint and - * calls it. - */ -static void -ParallelExtensionTrampoline(dsm_segment *seg, shm_toc *toc) -{ - char *extensionstate; - char *library_name; - char *function_name; - parallel_worker_main_type entrypt; - - extensionstate = shm_toc_lookup(toc, PARALLEL_KEY_EXTENSION_TRAMPOLINE); - Assert(extensionstate != NULL); - library_name = extensionstate; - function_name = extensionstate + strlen(library_name) + 1; - - entrypt = (parallel_worker_main_type) - load_external_function(library_name, function_name, true, NULL); - entrypt(seg, toc); -} - /* * Update shared memory with the ending location of the last WAL record we * wrote, if it's greater than the value already stored there. @@ -1161,3 +1180,47 @@ ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end) fps->last_xlog_end = last_xlog_end; SpinLockRelease(&fps->mutex); } + +/* + * Look up (and possibly load) a parallel worker entry point function. + * + * For functions contained in the core code, we use library name "postgres" + * and consult the InternalParallelWorkers array. External functions are + * looked up, and loaded if necessary, using load_external_function(). + * + * The point of this is to pass function names as strings across process + * boundaries. We can't pass actual function addresses because of the + * possibility that the function has been loaded at a different address + * in a different process. This is obviously a hazard for functions in + * loadable libraries, but it can happen even for functions in the core code + * on platforms using EXEC_BACKEND (e.g., Windows). + * + * At some point it might be worthwhile to get rid of InternalParallelWorkers[] + * in favor of applying load_external_function() for core functions too; + * but that raises portability issues that are not worth addressing now. + */ +static parallel_worker_main_type +LookupParallelWorkerFunction(char *libraryname, char *funcname) +{ + /* + * If the function is to be loaded from postgres itself, search the + * InternalParallelWorkers array. + */ + if (strcmp(libraryname, "postgres") == 0) + { + int i; + + for (i = 0; i < lengthof(InternalParallelWorkers); i++) + { + if (strcmp(InternalParallelWorkers[i].fn_name, funcname) == 0) + return InternalParallelWorkers[i].fn_addr; + } + + /* We can only reach this by programming error. */ + elog(ERROR, "internal function \"%s\" not found", funcname); + } + + /* Otherwise load from external library. */ + return (parallel_worker_main_type) + load_external_function(libraryname, funcname, true, NULL); +} diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index 96e2ac06b8cce..4741aec46de1b 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -105,8 +105,7 @@ static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt, static bool ExecParallelRetrieveInstrumentation(PlanState *planstate, SharedExecutorInstrumentation *instrumentation); -/* Helper functions that run in the parallel worker. */ -static void ParallelQueryMain(dsm_segment *seg, shm_toc *toc); +/* Helper function that runs in the parallel worker. */ static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc); /* @@ -355,7 +354,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers) pstmt_data = ExecSerializePlan(planstate->plan, estate); /* Create a parallel context. */ - pcxt = CreateParallelContext(ParallelQueryMain, nworkers); + pcxt = CreateParallelContextForExternalFunction("postgres", "ParallelQueryMain", nworkers); pei->pcxt = pcxt; /* @@ -720,7 +719,7 @@ ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc) * to do this are also stored in the dsm_segment and can be accessed through * the shm_toc. */ -static void +void ParallelQueryMain(dsm_segment *seg, shm_toc *toc) { BufferUsage *buffer_usage; diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c index 5ea5abff8da20..ff266009fc255 100644 --- a/src/backend/postmaster/bgworker.c +++ b/src/backend/postmaster/bgworker.c @@ -95,23 +95,24 @@ struct BackgroundWorkerHandle static BackgroundWorkerArray *BackgroundWorkerData; /* - * List of internal background workers. These are used for mapping the - * function name to actual function when building with EXEC_BACKEND and also - * to allow these to be loaded outside of shared_preload_libraries. + * List of internal background worker entry points. We need this for + * reasons explained in LookupBackgroundWorkerFunction(), below. */ -typedef struct InternalBGWorkerMain +static const struct { - char *bgw_function_name; - bgworker_main_type bgw_main; -} InternalBGWorkerMain; - -static const InternalBGWorkerMain InternalBGWorkers[] = { - {"ParallelWorkerMain", ParallelWorkerMain}, - /* Dummy entry marking end of the array. */ - {NULL, NULL} + const char *fn_name; + bgworker_main_type fn_addr; +} InternalBGWorkers[] = + +{ + { + "ParallelWorkerMain", ParallelWorkerMain + } }; -static bgworker_main_type GetInternalBgWorkerMain(BackgroundWorker *worker); +/* Private functions. */ +static bgworker_main_type LookupBackgroundWorkerFunction(char *libraryname, char *funcname); + /* * Calculate shared memory needed. @@ -715,27 +716,18 @@ StartBackgroundWorker(void) #endif } - /* For internal workers set the entry point to known function address. */ - entrypt = GetInternalBgWorkerMain(worker); - /* - * Otherwise, if bgw_main is set, we use that value as the initial - * entrypoint. This does not work well EXEC_BACKEND outside Windows but - * we keep the logic for backwards compatibility. In other cases use - * the entry point specified by library name (which will be loaded, if - * necessary) and a function name (which will be looked up in the named - * library). + * If bgw_main is set, we use that value as the entry point function. + * Passing function addresses across processes is unreliable on many + * platforms, but we'll leave the option in place in 9.x for backwards + * compatibility. Otherwise, look up the entry point function, loading + * its library if necessary. */ - if (entrypt == NULL) - { - if (worker->bgw_main != NULL) - entrypt = worker->bgw_main; - else - entrypt = (bgworker_main_type) - load_external_function(worker->bgw_library_name, - worker->bgw_function_name, - true, NULL); - } + if (worker->bgw_main != NULL) + entrypt = worker->bgw_main; + else + entrypt = LookupBackgroundWorkerFunction(worker->bgw_library_name, + worker->bgw_function_name); /* * Note that in normal processes, we would call InitPostgres here. For a @@ -1077,26 +1069,45 @@ TerminateBackgroundWorker(BackgroundWorkerHandle *handle) } /* - * Search the known internal worker array and return its main function - * pointer if found. + * Look up (and possibly load) a bgworker entry point function. * - * Returns NULL if not known internal worker. + * For functions contained in the core code, we use library name "postgres" + * and consult the InternalBGWorkers array. External functions are + * looked up, and loaded if necessary, using load_external_function(). + * + * The point of this is to pass function names as strings across process + * boundaries. We can't pass actual function addresses because of the + * possibility that the function has been loaded at a different address + * in a different process. This is obviously a hazard for functions in + * loadable libraries, but it can happen even for functions in the core code + * on platforms using EXEC_BACKEND (e.g., Windows). + * + * At some point it might be worthwhile to get rid of InternalBGWorkers[] + * in favor of applying load_external_function() for core functions too; + * but that raises portability issues that are not worth addressing now. */ static bgworker_main_type -GetInternalBgWorkerMain(BackgroundWorker *worker) +LookupBackgroundWorkerFunction(char *libraryname, char *funcname) { - int i; + /* + * If the function is to be loaded from postgres itself, search the + * InternalBGWorkers array. + */ + if (strcmp(libraryname, "postgres") == 0) + { + int i; - /* Internal workers always have to use postgres as library name. */ - if (strncmp(worker->bgw_library_name, "postgres", BGW_MAXLEN) != 0) - return NULL; + for (i = 0; i < lengthof(InternalBGWorkers); i++) + { + if (strcmp(InternalBGWorkers[i].fn_name, funcname) == 0) + return InternalBGWorkers[i].fn_addr; + } - for (i = 0; InternalBGWorkers[i].bgw_function_name; i++) - { - if (strncmp(InternalBGWorkers[i].bgw_function_name, - worker->bgw_function_name, BGW_MAXLEN) == 0) - return InternalBGWorkers[i].bgw_main; + /* We can only reach this by programming error. */ + elog(ERROR, "internal function \"%s\" not found", funcname); } - return NULL; + /* Otherwise load from external library. */ + return (bgworker_main_type) + load_external_function(libraryname, funcname, true, NULL); } diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h index f4c6d37a11928..42424d1b5c1ff 100644 --- a/src/include/executor/execParallel.h +++ b/src/include/executor/execParallel.h @@ -36,4 +36,6 @@ extern void ExecParallelFinish(ParallelExecutorInfo *pei); extern void ExecParallelCleanup(ParallelExecutorInfo *pei); extern void ExecParallelReinitialize(ParallelExecutorInfo *pei); +extern void ParallelQueryMain(dsm_segment *seg, shm_toc *toc); + #endif /* EXECPARALLEL_H */