Skip to content

Commit

Permalink
Automatic retry for backup, restore, archive-get, and archive-push.
Browse files Browse the repository at this point in the history
If a local command, e.g. backupFile(), fails it will stop the entire process. Instead, retry local commands to deal with transient errors.

Remove special logic in the S3 storage driver to retry RequestTimeTooSkewed errors since this is now handled by the general retry mechanism in the places where it is most likely to happen, i.e. file read/write. Also, this error should have been entirely eliminated by the asynchronous TLS implementation.
  • Loading branch information
dwsteele committed Jul 14, 2020
1 parent 91c7adc commit 620a8d1
Show file tree
Hide file tree
Showing 15 changed files with 154 additions and 134 deletions.
8 changes: 8 additions & 0 deletions doc/xml/release.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,14 @@
<p>Asynchronous S3 multipart upload.</p>
</release-item>

<release-item>
<release-item-contributor-list>
<release-item-reviewer id="cynthia.shang"/>
</release-item-contributor-list>

<p>Automatic retry for <cmd>backup</cmd>, <cmd>restore</cmd>, <cmd>archive-get</cmd>, and <cmd>archive-push</cmd>.</p>
</release-item>

<release-item>
<release-item-contributor-list>
<release-item-reviewer id="stefan.fercot"/>
Expand Down
7 changes: 6 additions & 1 deletion src/command/local/local.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ cmdLocal(int handleRead, int handleWrite)

MEM_CONTEXT_TEMP_BEGIN()
{
// Configure two retries for local commands
VariantList *retryInterval = varLstNew();
varLstAdd(retryInterval, varNewUInt64(0));
varLstAdd(retryInterval, varNewUInt64(15000));

String *name = strNewFmt(PROTOCOL_SERVICE_LOCAL "-%u", cfgOptionUInt(cfgOptProcess));
IoRead *read = ioHandleReadNew(name, handleRead, (TimeMSec)(cfgOptionDbl(cfgOptProtocolTimeout) * 1000));
ioReadOpen(read);
Expand All @@ -35,7 +40,7 @@ cmdLocal(int handleRead, int handleWrite)
protocolServerHandlerAdd(server, archivePushProtocol);
protocolServerHandlerAdd(server, backupProtocol);
protocolServerHandlerAdd(server, restoreProtocol);
protocolServerProcess(server);
protocolServerProcess(server, retryInterval);
}
MEM_CONTEXT_TEMP_END();

Expand Down
2 changes: 1 addition & 1 deletion src/command/remote/remote.c
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ cmdRemote(int handleRead, int handleWrite)

// If not successful we'll just exit
if (success)
protocolServerProcess(server);
protocolServerProcess(server, NULL);
}
MEM_CONTEXT_TEMP_END();

Expand Down
48 changes: 46 additions & 2 deletions src/protocol/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,11 @@ protocolServerError(ProtocolServer *this, int code, const String *message, const

/**********************************************************************************************************************************/
void
protocolServerProcess(ProtocolServer *this)
protocolServerProcess(ProtocolServer *this, const VariantList *retryInterval)
{
FUNCTION_LOG_BEGIN(logLevelDebug);
FUNCTION_LOG_PARAM(PROTOCOL_SERVER, this);
FUNCTION_LOG_PARAM(VARIANT_LIST, retryInterval);
FUNCTION_LOG_END();

// Loop until exit command is received
Expand Down Expand Up @@ -157,7 +158,50 @@ protocolServerProcess(ProtocolServer *this)
// needs to be stored by the handler.
MEM_CONTEXT_BEGIN(this->memContext)
{
found = handler(command, paramList, this);
// Initialize retries in case of command failure
bool retry = false;
unsigned int retryRemaining = retryInterval != NULL ? varLstSize(retryInterval) : 0;

do
{
retry = false;

TRY_BEGIN()
{
found = handler(command, paramList, this);
}
CATCH_ANY()
{
// Are there retries remaining?
if (retryRemaining > 0)
{
// Get the sleep interval for this retry
TimeMSec retrySleepMs = varUInt64(
varLstGet(retryInterval, varLstSize(retryInterval) - retryRemaining));

// Log the retry
LOG_DEBUG_FMT(
"retry %s after %" PRIu64 "ms: %s", errorTypeName(errorType()), retrySleepMs,
errorMessage());

// Sleep if there is an interval
if (retrySleepMs > 0)
sleepMSec(retrySleepMs);

// Decrement retries remaining and retry
retryRemaining--;
retry = true;

// Send keep alives to remotes. A retry means the command is taking longer than usual so make
// sure the remote does not timeout.
protocolKeepAlive();
}
else
RETHROW();
}
TRY_END();
}
while (retry);
}
MEM_CONTEXT_END();

Expand Down
2 changes: 1 addition & 1 deletion src/protocol/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ Functions
void protocolServerError(ProtocolServer *this, int code, const String *message, const String *stack);

// Process requests
void protocolServerProcess(ProtocolServer *this);
void protocolServerProcess(ProtocolServer *this, const VariantList *retryInterval);

// Respond to request with output if provided
void protocolServerResponse(ProtocolServer *this, const Variant *output);
Expand Down
62 changes: 9 additions & 53 deletions src/storage/s3/storage.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,6 @@ STRING_STATIC(S3_QUERY_PREFIX_STR, "prefix");

STRING_STATIC(S3_QUERY_VALUE_LIST_TYPE_2_STR, "2");

/***********************************************************************************************************************************
S3 errors
***********************************************************************************************************************************/
STRING_STATIC(S3_ERROR_REQUEST_TIME_TOO_SKEWED_STR, "RequestTimeTooSkewed");

/***********************************************************************************************************************************
XML tags
***********************************************************************************************************************************/
Expand Down Expand Up @@ -303,59 +298,20 @@ storageS3Response(HttpRequest *request, StorageS3ResponseParam param)
ASSERT(request != NULL);

HttpResponse *result = NULL;
unsigned int retryRemaining = 2;
bool done;

do
MEM_CONTEXT_TEMP_BEGIN()
{
done = true;

MEM_CONTEXT_TEMP_BEGIN()
{
// Process request
result = httpRequest(request, !param.contentIo);

// Error if the request was not successful
if (!httpResponseCodeOk(result) && (!param.allowMissing || httpResponseCode(result) != HTTP_RESPONSE_CODE_NOT_FOUND))
{
// If there are retries remaining and a response parse it as XML to extract the S3 error code
const Buffer *content = httpResponseContent(result);
// Get response
result = httpRequest(request, !param.contentIo);

if (bufUsed(content) > 0 && retryRemaining > 0)
{
// Attempt to parse the XML and extract the S3 error code
TRY_BEGIN()
{
XmlNode *error = xmlDocumentRoot(xmlDocumentNewBuf(content));
const String *errorCode = xmlNodeContent(xmlNodeChild(error, S3_XML_TAG_CODE_STR, true));

if (strEq(errorCode, S3_ERROR_REQUEST_TIME_TOO_SKEWED_STR))
{
LOG_DEBUG_FMT(
"retry %s: %s", strPtr(errorCode),
strPtr(xmlNodeContent(xmlNodeChild(error, S3_XML_TAG_MESSAGE_STR, true))));

retryRemaining--;
done = false;
}
}
// On failure just drop through and report the error as usual
CATCH_ANY()
{
}
TRY_END();
}
// Error if the request was not successful
if (!httpResponseCodeOk(result) && (!param.allowMissing || httpResponseCode(result) != HTTP_RESPONSE_CODE_NOT_FOUND))
httpRequestError(request, result);

// If done throw the error
if (done)
httpRequestError(request, result);
}
else
httpResponseMove(result, memContextPrior());
}
MEM_CONTEXT_TEMP_END();
// Move response to the prior context
httpResponseMove(result, memContextPrior());
}
while (!done);
MEM_CONTEXT_TEMP_END();

FUNCTION_LOG_RETURN(HTTP_RESPONSE, result);
}
Expand Down
10 changes: 10 additions & 0 deletions test/src/module/command/archiveGetTest.c
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,13 @@ testRun(void)
NULL),
"normal WAL segment");

// Create tmp file to make it look like a prior async get failed partway through to ensure that retries work
TEST_RESULT_VOID(
storagePutP(
storageNewWriteP(storageSpoolWrite(), strNew(STORAGE_SPOOL_ARCHIVE_IN "/000000010000000100000001.pgbackrest.tmp")),
NULL),
"normal WAL segment");

TEST_RESULT_VOID(cmdArchiveGetAsync(), "archive async");
harnessLogResult(
"P00 INFO: get 1 WAL file(s) from archive: 000000010000000100000001\n"
Expand All @@ -386,6 +393,9 @@ testRun(void)
TEST_RESULT_BOOL(
storageExistsP(storageSpool(), strNew(STORAGE_SPOOL_ARCHIVE_IN "/000000010000000100000001")), true,
"check 000000010000000100000001 in spool");
TEST_RESULT_BOOL(
storageExistsP(storageSpool(), strNew(STORAGE_SPOOL_ARCHIVE_IN "/000000010000000100000001.pgbackrest.tmp")), false,
"check 000000010000000100000001 tmp not in spool");

// Get multiple segments where some are missing or errored
// -------------------------------------------------------------------------------------------------------------------------
Expand Down
23 changes: 18 additions & 5 deletions test/src/module/command/archivePushTest.c
Original file line number Diff line number Diff line change
Expand Up @@ -293,24 +293,37 @@ testRun(void)
// -------------------------------------------------------------------------------------------------------------------------
TEST_TITLE("WAL with absolute path and no pg1-path");

const char *sha1 = TEST_64BIT() ? "755defa48a0a0872767b6dea49bdd3b64902f147" : "9c2a6ec4491a2118bcdc9b653366581d8821c982";

argListTemp = strLstNew();
strLstAddZ(argListTemp, "--" CFGOPT_STANZA "=test");
strLstAdd(argListTemp, strNewFmt("--" CFGOPT_REPO1_PATH "=%s/repo", testPath()));
strLstAdd(argListTemp, strNewFmt("%s/pg/pg_wal/000000010000000100000002", testPath()));
harnessCfgLoad(cfgCmdArchivePush, argListTemp);

TEST_RESULT_VOID(storagePutP(storageNewWriteP(storageTest, strNew("pg/pg_wal/000000010000000100000002")), walBuffer2), "write WAL");
TEST_RESULT_VOID(
storagePutP(storageNewWriteP(storageTest, strNew("pg/pg_wal/000000010000000100000002")), walBuffer2), "write WAL");

// Create tmp file to make it look like a prior push failed partway through to ensure that retries work
TEST_RESULT_VOID(
storagePutP(
storageNewWriteP(
storageTest,
strNewFmt("repo/archive/test/11-1/0000000100000001/000000010000000100000002-%s.gz.pgbackrest.tmp", sha1)),
BUFSTRDEF("PARTIAL")),
"write WAL tmp file");

TEST_RESULT_VOID(cmdArchivePush(), "push the WAL segment");
harnessLogResult("P00 INFO: pushed WAL file '000000010000000100000002' to the archive");

TEST_RESULT_BOOL(
storageExistsP(storageTest, strNewFmt("repo/archive/test/11-1/0000000100000001/000000010000000100000002-%s.gz", sha1)),
true, "check repo for WAL file");
TEST_RESULT_BOOL(
storageExistsP(
storageTest,
strNewFmt(
"repo/archive/test/11-1/0000000100000001/000000010000000100000002-%s.gz",
TEST_64BIT() ? "755defa48a0a0872767b6dea49bdd3b64902f147" : "9c2a6ec4491a2118bcdc9b653366581d8821c982")),
true, "check repo for WAL file");
strNewFmt("repo/archive/test/11-1/0000000100000001/000000010000000100000002-%s.gz.pgbackrest.tmp", sha1)),
false, "check WAL tmp file is gone");

// Push a history file
// -------------------------------------------------------------------------------------------------------------------------
Expand Down
8 changes: 8 additions & 0 deletions test/src/module/command/backupTest.c
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,11 @@ testRun(void)
uint64_t feature = storageRepo()->interface.feature;
((Storage *)storageRepo())->interface.feature = feature && ((1 << storageFeatureCompress) ^ 0xFFFFFFFFFFFFFFFF);

// Create tmp file to make it look like a prior backup file failed partway through to ensure that retries work
TEST_RESULT_VOID(
storagePutP(storageNewWriteP(storageRepoWrite(), strNewFmt("%s.pgbackrest.tmp", strPtr(backupPathFile))), NULL),
" create tmp file");

TEST_ASSIGN(
result,
backupFile(
Expand All @@ -562,6 +567,9 @@ testRun(void)
storageExistsP(storageRepo(), backupPathFile) && result.pageChecksumResult == NULL),
true, " copy file to repo success");

TEST_RESULT_BOOL(
storageExistsP(storageRepoWrite(), strNewFmt("%s.pgbackrest.tmp", strPtr(backupPathFile))), false,
" check temp file removed");
TEST_RESULT_VOID(storageRemoveP(storageRepoWrite(), backupPathFile), " remove repo file");

// -------------------------------------------------------------------------------------------------------------------------
Expand Down
6 changes: 6 additions & 0 deletions test/src/module/command/restoreTest.c
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,12 @@ testRun(void)
"error restoring 'normal': actual checksum 'd1cd8a7d11daa26814b93eb604e1d49ab4b43770' does not match expected checksum"
" 'ffffffffffffffffffffffffffffffffffffffff'");

// Create normal file to make it look like a prior restore file failed partway through to ensure that retries work. It will
// be clear if the file was overwritten when checking the info below since the size and timestamp will be changed.
TEST_RESULT_VOID(
storagePutP(storageNewWriteP(storagePgWrite(), STRDEF("normal"), .modeFile = 0600), BUFSTRDEF("PRT")),
" create normal file");

TEST_RESULT_BOOL(
restoreFile(
repoFile1, repoFileReferenceFull, compressTypeGz, strNew("normal"),
Expand Down
2 changes: 1 addition & 1 deletion test/src/module/config/protocolTest.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ testRun(void)

ProtocolServer *server = protocolServerNew(strNew("test"), strNew("config"), read, write);
protocolServerHandlerAdd(server, configProtocol);
protocolServerProcess(server);
protocolServerProcess(server, NULL);
}
HARNESS_FORK_CHILD_END();

Expand Down
2 changes: 1 addition & 1 deletion test/src/module/db/dbTest.c
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ testRun(void)

TEST_ASSIGN(server, protocolServerNew(strNew("db test server"), strNew("test"), read, write), "create server");
TEST_RESULT_VOID(protocolServerHandlerAdd(server, dbProtocol), "add handler");
TEST_RESULT_VOID(protocolServerProcess(server), "run process loop");
TEST_RESULT_VOID(protocolServerProcess(server, NULL), "run process loop");
TEST_RESULT_VOID(protocolServerFree(server), "free server");
}
HARNESS_FORK_CHILD_END();
Expand Down
2 changes: 1 addition & 1 deletion test/src/module/performance/storageTest.c
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ testRun(void)

ProtocolServer *server = protocolServerNew(strNew("storage test server"), strNew("test"), read, write);
protocolServerHandlerAdd(server, storageRemoteProtocol);
protocolServerProcess(server);
protocolServerProcess(server, NULL);

}
HARNESS_FORK_CHILD_END();
Expand Down
41 changes: 38 additions & 3 deletions test/src/module/protocol/protocolTest.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ Test Protocol
/***********************************************************************************************************************************
Test protocol request handler
***********************************************************************************************************************************/
bool
static unsigned int testServerProtocolErrorTotal = 0;

static bool
testServerProtocol(const String *command, const VariantList *paramList, ProtocolServer *server)
{
FUNCTION_HARNESS_BEGIN();
Expand Down Expand Up @@ -47,6 +49,16 @@ testServerProtocol(const String *command, const VariantList *paramList, Protocol
protocolServerWriteLine(server, NULL);
ioWriteFlush(protocolServerIoWrite(server));
}
else if (strEq(command, STRDEF("error-until-0")))
{
if (testServerProtocolErrorTotal > 0)
{
testServerProtocolErrorTotal--;
THROW(FormatError, "error-until-0");
}

protocolServerResponse(server, varNewBool(true));
}
else
found = false;
}
Expand Down Expand Up @@ -611,6 +623,15 @@ testRun(void)
// Exit
TEST_RESULT_VOID(ioWriteStrLine(write, strNew("{\"cmd\":\"exit\"}")), "write exit");
TEST_RESULT_VOID(ioWriteFlush(write), "flush exit");

// Retry errors until success
TEST_RESULT_VOID(ioWriteStrLine(write, strNew("{\"cmd\":\"error-until-0\"}")), "write error-until-0");
TEST_RESULT_VOID(ioWriteFlush(write), "flush error-until-0");
TEST_RESULT_STR_Z(ioReadLine(read), "{\"out\":true}", "error-until-0 result");

// Exit
TEST_RESULT_VOID(ioWriteStrLine(write, strNew("{\"cmd\":\"exit\"}")), "write exit");
TEST_RESULT_VOID(ioWriteFlush(write), "flush exit");
}
HARNESS_FORK_CHILD_END();

Expand Down Expand Up @@ -640,9 +661,23 @@ testRun(void)

TEST_RESULT_VOID(protocolServerHandlerAdd(server, testServerProtocol), "add handler");

TEST_RESULT_VOID(protocolServerProcess(server), "run process loop");
TEST_RESULT_VOID(protocolServerProcess(server, NULL), "run process loop");

// -----------------------------------------------------------------------------------------------------------------
TEST_TITLE("run process loop with retries");

VariantList *retryInterval = varLstNew();
varLstAdd(retryInterval, varNewUInt64(0));
varLstAdd(retryInterval, varNewUInt64(50));

testServerProtocolErrorTotal = 2;

TEST_RESULT_VOID(protocolServerProcess(server, retryInterval), "run process loop");

// -----------------------------------------------------------------------------------------------------------------
TEST_TITLE("free server");

TEST_RESULT_VOID(protocolServerFree(server), "free server");
TEST_RESULT_VOID(protocolServerFree(server), "free");
}
HARNESS_FORK_PARENT_END();
}
Expand Down
Loading

0 comments on commit 620a8d1

Please sign in to comment.