Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pgx-tests/src/framework.rs
Original file line number Diff line number Diff line change
Expand Up @@ -618,11 +618,11 @@ fn get_pgdata_path() -> eyre::Result<PathBuf> {
Ok(target_dir)
}

fn get_pg_dbname() -> &'static str {
pub(crate) fn get_pg_dbname() -> &'static str {
"pgx_tests"
}

fn get_pg_user() -> String {
pub(crate) fn get_pg_user() -> String {
std::env::var("USER")
.unwrap_or_else(|_| panic!("USER environment var is unset or invalid UTF-8"))
}
Expand Down
105 changes: 105 additions & 0 deletions pgx-tests/src/tests/bgworker_tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
Portions Copyright 2019-2021 ZomboDB, LLC.
Portions Copyright 2021-2022 Technology Concepts & Design, Inc. <support@tcdi.com>

All rights reserved.

Use of this source code is governed by the MIT license that can be found in the LICENSE file.
*/
use pgx::prelude::*;
use pgx::{FromDatum, IntoDatum, PgOid};

#[pg_guard]
#[no_mangle]
pub extern "C" fn bgworker(arg: pg_sys::Datum) {
use pgx::bgworkers::*;
use std::time::Duration;
BackgroundWorker::attach_signal_handlers(SignalWakeFlags::SIGHUP | SignalWakeFlags::SIGTERM);
BackgroundWorker::connect_worker_to_spi(
Some(crate::framework::get_pg_dbname()),
Some(crate::framework::get_pg_user().as_str()),
);

let arg = unsafe { i32::from_datum(arg, false) }.expect("invalid arg");

if arg > 0 {
BackgroundWorker::transaction(|| {
Spi::run("CREATE TABLE tests.bgworker_test (v INTEGER);");
Spi::execute(|mut client| {
client.update(
"INSERT INTO tests.bgworker_test VALUES ($1);",
None,
Some(vec![(PgOid::BuiltIn(PgBuiltInOids::INT4OID), arg.into_datum())]),
);
});
});
}
while BackgroundWorker::wait_latch(Some(Duration::from_millis(100))) {}
if arg > 0 {
BackgroundWorker::transaction(|| {
Spi::run("UPDATE tests.bgworker_test SET v = v + 1;");
});
}
}

#[cfg(any(test, feature = "pg_test"))]
#[pgx::pg_schema]
mod tests {
#[allow(unused_imports)]
use crate as pgx_tests;

use pgx::bgworkers::*;
use pgx::prelude::*;
use pgx::{pg_guard, pg_sys, IntoDatum};

#[pg_test]
fn test_dynamic_bgworker() {
let worker = BackgroundWorkerBuilder::new("dynamic_bgworker")
.set_library("pgx_tests")
.set_function("bgworker")
.set_argument(123i32.into_datum())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...wait, what happens if this argument is None? I realize this is almost certainly going to be a case of severe programmer error, but we should test the None case as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My assumption here was that it was up to the worker to handle the argument correctly. What exactly do you suggest to test? Maybe I missing your train of thought.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I mostly mean that this basically goes Option<Datum> -> NullableDatum -> Option<Datum> (via Postgres) and we should make sure, as simple as it sounds, that if we say the Datum is "SQL NULL" (i.e. None) that it also becomes a None on the other side.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

( I have recently become suspicious of these kinds of transitions after finding out that sometimes Postgres will pass values that are basically nullptr_t, true and so I am wondering if there are other hijinx in store. )

I am also kind of wondering what happens if a BackgroundWorker, er, fucks up massively, and if PGX can actually recover from that with our current API surface.

.enable_spi_access()
.set_notify_pid(unsafe { pg_sys::MyProcPid })
.load_dynamic();
let pid = worker.wait_for_startup().expect("no PID from the worker");
assert!(pid > 0);
let handle = worker.terminate();
handle.wait_for_shutdown().expect("aborted shutdown");

assert_eq!(
124,
Spi::get_one::<i32>("SELECT v FROM tests.bgworker_test;")
.expect("no return value from the worker")
);
}

#[pg_test]
fn test_dynamic_bgworker_untracked() {
let worker = BackgroundWorkerBuilder::new("dynamic_bgworker")
.set_library("pgx_tests")
.set_function("bgworker")
.set_argument(0i32.into_datum())
.enable_spi_access()
.load_dynamic();
assert!(matches!(worker.wait_for_startup(), Err(BackgroundWorkerStatus::Untracked { .. })));
assert!(matches!(
worker.wait_for_shutdown(),
Err(BackgroundWorkerStatus::Untracked { .. })
));
}

#[pg_test]
fn test_dynamic_bgworker_untracked_termination_handle() {
let worker = BackgroundWorkerBuilder::new("dynamic_bgworker")
.set_library("pgx_tests")
.set_function("bgworker")
.set_argument(0i32.into_datum())
.enable_spi_access()
.load_dynamic();
let handle = worker.terminate();
assert!(matches!(
handle.wait_for_shutdown(),
Err(BackgroundWorkerStatus::Untracked { .. })
));
}
}
1 change: 1 addition & 0 deletions pgx-tests/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ mod aggregate_tests;
mod anyarray_tests;
mod array_tests;
mod attributes_tests;
mod bgworker_tests;
mod bytea_tests;
mod cfg_tests;
mod datetime_tests;
Expand Down
174 changes: 161 additions & 13 deletions pgx/src/bgworkers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::pg_sys;
use std::convert::TryInto;
use std::ffi::{CStr, CString};
use std::os::raw::c_char;
use std::ptr::null_mut;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;

Expand Down Expand Up @@ -52,6 +53,7 @@ bitflags! {
}

/// The various points in which a BackgroundWorker can be started by Postgres
#[derive(Copy, Clone)]
pub enum BgWorkerStartTime {
PostmasterStart = pg_sys::BgWorkerStartTime_BgWorkerStart_PostmasterStart as isize,
ConsistentState = pg_sys::BgWorkerStartTime_BgWorkerStart_ConsistentState as isize,
Expand Down Expand Up @@ -210,10 +212,130 @@ unsafe extern "C" fn worker_spi_sigterm(_signal_args: i32) {
pg_sys::SetLatch(pg_sys::MyLatch);
}

/// Dynamic background worker handle
pub struct DynamicBackgroundWorker {
handle: *mut pg_sys::BackgroundWorkerHandle,
notify_pid: pg_sys::pid_t,
}

/// PID
pub type Pid = pg_sys::pid_t;

/// Dynamic background worker status
#[derive(Debug, Clone, Copy)]
pub enum BackgroundWorkerStatus {
Started,
NotYetStarted,
Stopped,
PostmasterDied,
/// `BackgroundWorkerBuilder.bgw_notify_pid` was not set to `pg_sys::MyProcPid`
///
/// This makes worker's startup or shutdown untrackable by the current process.
Untracked {
/// `bgw_notify_pid` as specified in the builder
notify_pid: pg_sys::pid_t,
},
}

impl From<pg_sys::BgwHandleStatus> for BackgroundWorkerStatus {
fn from(s: pg_sys::BgwHandleStatus) -> Self {
match s {
pg_sys::BgwHandleStatus_BGWH_STARTED => BackgroundWorkerStatus::Started,
pg_sys::BgwHandleStatus_BGWH_NOT_YET_STARTED => BackgroundWorkerStatus::NotYetStarted,
pg_sys::BgwHandleStatus_BGWH_STOPPED => BackgroundWorkerStatus::Stopped,
pg_sys::BgwHandleStatus_BGWH_POSTMASTER_DIED => BackgroundWorkerStatus::PostmasterDied,
_ => unreachable!(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
_ => unreachable!(),
_ => unreachable!("postgres returned unknown BgwHandleStatus"),

}
}
}

impl DynamicBackgroundWorker {
/// Return dynamic background worker's PID if the worker is successfully registered,
/// otherwise it return worker's status as an error.
pub fn pid(&self) -> Result<Pid, BackgroundWorkerStatus> {
let mut pid: pg_sys::pid_t = 0;
let status: BackgroundWorkerStatus =
unsafe { pg_sys::GetBackgroundWorkerPid(self.handle, &mut pid) }.into();
match status {
BackgroundWorkerStatus::Started => Ok(pid),
_ => Err(status),
}
}

/// Causes the postmaster to send SIGTERM to the worker if it is running,
/// and to unregister it as soon as it is not.
pub fn terminate(self) -> TerminatingDynamicBackgroundWorker {
unsafe {
pg_sys::TerminateBackgroundWorker(self.handle);
}
TerminatingDynamicBackgroundWorker { handle: self.handle, notify_pid: self.notify_pid }
}

/// Block until the postmaster has attempted to start the background worker,
/// or until the postmaster dies. If the background worker is running, the successful return value
/// will be the worker's PID. Otherwise, the return value will be an error with the worker's status.
///
/// Requires `BackgroundWorkerBuilder.bgw_notify_pid` to be set to `pg_sys::MyProcPid`, otherwise it'll
/// return [`BackgroundWorkerStatus::Untracked`] error
pub fn wait_for_startup(&self) -> Result<Pid, BackgroundWorkerStatus> {
unsafe {
if self.notify_pid != pg_sys::MyProcPid {
return Err(BackgroundWorkerStatus::Untracked { notify_pid: self.notify_pid });
}
}
let mut pid: pg_sys::pid_t = 0;
let status: BackgroundWorkerStatus =
unsafe { pg_sys::WaitForBackgroundWorkerStartup(self.handle, &mut pid) }.into();
match status {
BackgroundWorkerStatus::Started => Ok(pid),
_ => Err(status),
}
}

/// Block until the background worker exits, or postmaster dies. When the background worker exits, the return value is unit,
/// if postmaster dies it will return error with `BackgroundWorkerStatus::PostmasterDied` status
///
/// Requires `BackgroundWorkerBuilder.bgw_notify_pid` to be set to `pg_sys::MyProcPid`, otherwise it'll
/// return [`BackgroundWorkerStatus::Untracked`] error
pub fn wait_for_shutdown(self) -> Result<(), BackgroundWorkerStatus> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens when this is called on a worker that hasn't been started yet?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading the code suggests that this is fine. It'll simply wait until it is stopped.

TerminatingDynamicBackgroundWorker { handle: self.handle, notify_pid: self.notify_pid }
.wait_for_shutdown()
}
}

/// Handle of a dynamic background worker that is being terminated with
/// [`DynamicBackgroundWorker::terminate`]. Only allows waiting for shutdown.
pub struct TerminatingDynamicBackgroundWorker {
handle: *mut pg_sys::BackgroundWorkerHandle,
notify_pid: pg_sys::pid_t,
}

impl TerminatingDynamicBackgroundWorker {
/// Block until the background worker exits, or postmaster dies. When the background worker exits, the return value is unit,
/// if postmaster dies it will return error with `BackgroundWorkerStatus::PostmasterDied` status
///
/// Requires `BackgroundWorkerBuilder.bgw_notify_pid` to be set to `pg_sys::MyProcPid`, otherwise it'll
/// return [`BackgroundWorkerStatus::Untracked`] error
pub fn wait_for_shutdown(self) -> Result<(), BackgroundWorkerStatus> {
unsafe {
if self.notify_pid != pg_sys::MyProcPid {
return Err(BackgroundWorkerStatus::Untracked { notify_pid: self.notify_pid });
}
}
let status: BackgroundWorkerStatus =
unsafe { pg_sys::WaitForBackgroundWorkerShutdown(self.handle) }.into();
match status {
BackgroundWorkerStatus::Stopped => Ok(()),
_ => Err(status),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can't really be any status, can it? We've already excluded ::Stopped and ::Untracked. The documentation implies it has to be ::PostmasterDied (and that such might be a successful ending?). While normally this function as described does defer error handling to the caller, I'm not sure it makes sense to do anything but panic! if we see any other BackgroundWorkerStatus, as that should only happen in the case of severe errors. We do have the option to reduce this function's anxiety when we indeed have more confidence in the code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See below. I am open to introducing narrow error types and removing this confusion of status and error.

}
}
}

/// A builder-style interface for creating a new Background Worker
///
/// This must be used from within your extension's `_PG_init()` function,
/// finishing with the `.load()` function.
/// For a static background worker, this must be used from within your extension's `_PG_init()` function,
/// finishing with the `.load()` function. Dynamic background workers are loaded with `.load_dynamic()` and
/// have no restriction as to where they can be loaded.
///
/// ## Example
///
Expand Down Expand Up @@ -279,6 +401,9 @@ impl BackgroundWorkerBuilder {
}

/// Does this BackgroundWorker want Shared Memory access?
///
/// `startup` allows specifying shared memory initialization startup hook. Ignored
/// if [`BackgroundWorkerBuilder::load_dynamic`] is used.
pub fn enable_shmem_access(mut self: Self, startup: Option<unsafe extern "C" fn()>) -> Self {
self.bgw_flags = self.bgw_flags | BGWflags::BGWORKER_SHMEM_ACCESS;
self.shared_memory_startup_fn = startup;
Expand Down Expand Up @@ -397,8 +522,39 @@ impl BackgroundWorkerBuilder {
/// Once properly configured, call `load()` to get the BackgroundWorker registered and
/// started at the proper time by Postgres.
pub fn load(self: Self) {
let mut bgw: pg_sys::BackgroundWorker = (&self).into();

unsafe {
pg_sys::RegisterBackgroundWorker(&mut bgw);
if self.bgw_flags.contains(BGWflags::BGWORKER_SHMEM_ACCESS)
&& self.shared_memory_startup_fn.is_some()
{
PREV_SHMEM_STARTUP_HOOK = pg_sys::shmem_startup_hook;
pg_sys::shmem_startup_hook = self.shared_memory_startup_fn;
}
};
}

/// Once properly configured, call `load_dynamic()` to get the BackgroundWorker registered and started dynamically.
pub fn load_dynamic(self: Self) -> DynamicBackgroundWorker {
let mut bgw: pg_sys::BackgroundWorker = (&self).into();
let mut handle: *mut pg_sys::BackgroundWorkerHandle = null_mut();

unsafe {
pg_sys::RegisterDynamicBackgroundWorker(&mut bgw, &mut handle);
};

DynamicBackgroundWorker { handle, notify_pid: bgw.bgw_notify_pid }
}
}

/// This conversion is useful only in limited context outside of pgx, such as when this structure is required
/// by other libraries and the worker is not to be started by pgx itself. In this case,
/// the builder is useful for building this structure.
impl<'a> Into<pg_sys::BackgroundWorker> for &'a BackgroundWorkerBuilder {
fn into(self) -> pg_sys::BackgroundWorker {
#[cfg(feature = "pg10")]
let mut bgw = pg_sys::BackgroundWorker {
let bgw = pg_sys::BackgroundWorker {
bgw_name: RpgffiChar::from(&self.bgw_name[..]).0,
bgw_flags: self.bgw_flags.bits(),
bgw_start_time: self.bgw_start_time as u32,
Expand All @@ -414,7 +570,7 @@ impl BackgroundWorkerBuilder {
};

#[cfg(any(feature = "pg11", feature = "pg12", feature = "pg13", feature = "pg14"))]
let mut bgw = pg_sys::BackgroundWorker {
let bgw = pg_sys::BackgroundWorker {
bgw_name: RpgffiChar::from(&self.bgw_name[..]).0,
bgw_type: RpgffiChar::from(&self.bgw_type[..]).0,
bgw_flags: self.bgw_flags.bits(),
Expand All @@ -430,15 +586,7 @@ impl BackgroundWorkerBuilder {
bgw_notify_pid: self.bgw_notify_pid,
};

unsafe {
pg_sys::RegisterBackgroundWorker(&mut bgw);
if self.bgw_flags.contains(BGWflags::BGWORKER_SHMEM_ACCESS)
&& self.shared_memory_startup_fn.is_some()
{
PREV_SHMEM_STARTUP_HOOK = pg_sys::shmem_startup_hook;
pg_sys::shmem_startup_hook = self.shared_memory_startup_fn;
}
};
bgw
}
}

Expand Down