Skip to content

Check for interrupts during archive command #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 94 additions & 2 deletions src/backend/archive/shell_archive.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,27 @@ shell_archive_configured(ArchiveModuleState *state)
return false;
}

#define POLL_TIMEOUT_MSEC 10

static bool
shell_archive_file(ArchiveModuleState *state, const char *file,
const char *path)
{
char *xlogarchcmd;
char *nativePath = NULL;
#ifndef WIN32
FILE *archiveFd = NULL;
int archiveFileno;
char buf[1024];
ssize_t bytesRead;
#else
size_t cmdPrefixLen;
size_t cmdlen;
char *win32cmd = NULL;
STARTUPINFO si;
PROCESS_INFORMATION pi;
DWORD dwRc;
#endif
int rc;

if (path)
Expand All @@ -77,14 +92,91 @@ shell_archive_file(ArchiveModuleState *state, const char *file,

fflush(NULL);
pgstat_report_wait_start(WAIT_EVENT_ARCHIVE_COMMAND);
rc = system(xlogarchcmd);

/*
* Start the command and read until it completes, while keep checking for
* interrupts to process pending events.
*/
#ifndef WIN32
archiveFd = popen(xlogarchcmd, "r");
if (archiveFd != NULL)
{
archiveFileno = fileno(archiveFd);
if (fcntl(archiveFileno, F_SETFL, O_NONBLOCK) == -1)
ereport(FATAL,
(errmsg("could not set handle to nonblocking mode: %m")));

while (true)
{
CHECK_FOR_INTERRUPTS();
bytesRead = read(archiveFileno, &buf, sizeof(buf));
if ((bytesRead > 0) || (bytesRead == -1 && errno == EAGAIN))
pg_usleep(POLL_TIMEOUT_MSEC * 1000);
else
break;
}
rc = pclose(archiveFd);
}
else
rc = -1;
#else
/*
* Create a malloc'd copy of the command string, we need to prefix it with
* cmd /c as the commandLine argument to CreateProcess still expects .exe
* files.
*/
cmdlen = strlen(xlogarchcmd);
#define CMD_PREFIX "cmd /c \""
cmdPrefixLen = strlen(CMD_PREFIX);
win32cmd = malloc(cmdPrefixLen + cmdlen + 1 + 1);
if (win32cmd == NULL)
{
ereport(FATAL,
(errmsg_internal("Failed to malloc win32cmd %m")));
return false;
}
memcpy(win32cmd, CMD_PREFIX, cmdPrefixLen);
memcpy(&win32cmd[cmdPrefixLen], xlogarchcmd, cmdlen);
win32cmd[cmdPrefixLen + cmdlen] = '"';
win32cmd[cmdPrefixLen + cmdlen + 1] = '\0';
ereport(DEBUG4,
(errmsg_internal("WIN32: executing modified archive command \"%s\"",
win32cmd)));

memset(&pi, 0, sizeof(pi));
memset(&si, 0, sizeof(si));
si.cb = sizeof(si);

if (!CreateProcess(NULL, win32cmd, NULL, NULL, FALSE, 0,
NULL, NULL, &si, &pi))
{
ereport(FATAL,
(errmsg("CreateProcess() call failed: %m (error code %lu)",
GetLastError())));
free(win32cmd);
return false;
}
free(win32cmd);

while (true)
{
CHECK_FOR_INTERRUPTS();
if (WaitForSingleObject(pi.hProcess, POLL_TIMEOUT_MSEC) == WAIT_OBJECT_0)
break;
}

GetExitCodeProcess(pi.hProcess, &dwRc);
CloseHandle(pi.hProcess);
CloseHandle(pi.hThread);
rc = dwRc;
#endif
pgstat_report_wait_end();

if (rc != 0)
{
/*
* If either the shell itself, or a called command, died on a signal,
* abort the archiver. We do this because system() ignores SIGINT and
* abort the archiver. We do this because pclose() ignores SIGINT and
* SIGQUIT while waiting; so a signal is very likely something that
* should have interrupted us too. Also die if the shell got a hard
* "command not found" type of error. If we overreact it's no big
Expand Down