Skip to content
Permalink
Browse files

Update code to use new unsigned int Variant type and config methods.

  • Loading branch information...
dwsteele committed Apr 19, 2019
1 parent 9f0829c commit 0c866f52c69ee2bd6af20e67af78bad94c4048af
@@ -16,11 +16,11 @@
<release-core-list>
<release-development-list>
<release-item>
<p>Add <code>unsigned int</code> <code>Variant</code> type.</p>
<p>Add <code>unsigned int</code> <code>Variant</code> type and update code to use it.</p>
</release-item>

<release-item>
<p>Add <code>cfgOptionUInt()</code> and <code>cfgOptionUInt64()</code>.</p>
<p>Add <code>cfgOptionUInt()</code> and <code>cfgOptionUInt64()</code> and update code to use them.</p>
</release-item>
</release-development-list>
</release-core-list>
@@ -29,12 +29,12 @@ Archive Get Command
Clean the queue and prepare a list of WAL segments that the async process should get
***********************************************************************************************************************************/
static StringList *
queueNeed(const String *walSegment, bool found, size_t queueSize, size_t walSegmentSize, unsigned int pgVersion)
queueNeed(const String *walSegment, bool found, uint64_t queueSize, size_t walSegmentSize, unsigned int pgVersion)
{
FUNCTION_LOG_BEGIN(logLevelDebug);
FUNCTION_LOG_PARAM(STRING, walSegment);
FUNCTION_LOG_PARAM(BOOL, found);
FUNCTION_LOG_PARAM(SIZE, queueSize);
FUNCTION_LOG_PARAM(UINT64, queueSize);
FUNCTION_LOG_PARAM(SIZE, walSegmentSize);
FUNCTION_LOG_PARAM(UINT, pgVersion);
FUNCTION_LOG_END();
@@ -192,7 +192,7 @@ cmdArchiveGet(void)

// Use WAL segment size to estimate queue size and determine if the async process should be launched
queueFull =
strLstSize(queue) * walSegmentSize > (size_t)cfgOptionInt64(cfgOptArchiveGetQueueMax) / 2;
strLstSize(queue) * walSegmentSize > cfgOptionUInt64(cfgOptArchiveGetQueueMax) / 2;
}
}

@@ -221,7 +221,7 @@ cmdArchiveGet(void)
// Clean the current queue using the list of WAL that we ideally want in the queue. queueNeed()
// will return the list of WAL needed to fill the queue and this will be passed to the async process.
const StringList *queue = queueNeed(
walSegment, found, (size_t)cfgOptionInt64(cfgOptArchiveGetQueueMax), pgControl.walSegmentSize,
walSegment, found, cfgOptionUInt64(cfgOptArchiveGetQueueMax), pgControl.walSegmentSize,
pgControl.version);

for (unsigned int queueIdx = 0; queueIdx < strLstSize(queue); queueIdx++)
@@ -309,7 +309,7 @@ cmdArchiveGetAsync(void)
ProtocolParallel *parallelExec = protocolParallelNew(
(TimeMSec)(cfgOptionDbl(cfgOptProtocolTimeout) * MSEC_PER_SEC) / 2);

for (unsigned int processIdx = 1; processIdx <= (unsigned int)cfgOptionInt(cfgOptProcessMax); processIdx++)
for (unsigned int processIdx = 1; processIdx <= cfgOptionUInt(cfgOptProcessMax); processIdx++)
protocolParallelClientAdd(parallelExec, protocolLocalGet(protocolStorageTypeRepo, processIdx));

// Queue jobs in executor
@@ -44,9 +44,9 @@ archivePushProtocol(const String *command, const VariantList *paramList, Protoco
VARSTR(
archivePushFile(
varStr(varLstGet(paramList, 0)), varStr(varLstGet(paramList, 1)),
(unsigned int)varUInt64(varLstGet(paramList, 2)), varUInt64(varLstGet(paramList, 3)),
varStr(varLstGet(paramList, 4)), (CipherType)varUInt64(varLstGet(paramList, 5)),
varStr(varLstGet(paramList, 6)), varBool(varLstGet(paramList, 7)), varIntForce(varLstGet(paramList, 8)))));
varUIntForce(varLstGet(paramList, 2)), varUInt64(varLstGet(paramList, 3)), varStr(varLstGet(paramList, 4)),
(CipherType)varUIntForce(varLstGet(paramList, 5)), varStr(varLstGet(paramList, 6)),
varBool(varLstGet(paramList, 7)), varIntForce(varLstGet(paramList, 8)))));
}
else
found = false;
@@ -32,17 +32,15 @@ Ready file extension constants
Format the warning when a file is dropped
***********************************************************************************************************************************/
static String *
archivePushDropWarning(const String *walFile, int64_t queueMax)
archivePushDropWarning(const String *walFile, uint64_t queueMax)
{
FUNCTION_TEST_BEGIN();
FUNCTION_TEST_PARAM(STRING, walFile);
FUNCTION_TEST_PARAM(INT64, queueMax);
FUNCTION_TEST_PARAM(UINT64, queueMax);
FUNCTION_TEST_END();

FUNCTION_TEST_RETURN(
strNewFmt(
"dropped WAL file '%s' because archive queue exceeded %s", strPtr(walFile),
strPtr(strSizeFormat((uint64_t)queueMax))));
strNewFmt("dropped WAL file '%s' because archive queue exceeded %s", strPtr(walFile), strPtr(strSizeFormat(queueMax))));
}

/***********************************************************************************************************************************
@@ -56,7 +54,7 @@ archivePushDrop(const String *walPath, const StringList *const processList)
FUNCTION_LOG_PARAM(STRING_LIST, processList);
FUNCTION_LOG_END();

const uint64_t queueMax = (uint64_t)cfgOptionInt64(cfgOptArchivePushQueueMax);
const uint64_t queueMax = cfgOptionUInt64(cfgOptArchivePushQueueMax);
uint64_t queueSize = 0;
bool result = false;

@@ -356,7 +354,7 @@ cmdArchivePush(void)
if (cfgOptionTest(cfgOptArchivePushQueueMax) &&
archivePushDrop(strPath(walFile), archivePushReadyList(strPath(walFile))))
{
LOG_WARN(strPtr(archivePushDropWarning(archiveFile, cfgOptionInt64(cfgOptArchivePushQueueMax))));
LOG_WARN(strPtr(archivePushDropWarning(archiveFile, cfgOptionUInt64(cfgOptArchivePushQueueMax))));
}
// Else push the file
else
@@ -424,7 +422,7 @@ cmdArchivePushAsync(void)
for (unsigned int walFileIdx = 0; walFileIdx < strLstSize(walFileList); walFileIdx++)
{
const String *walFile = strLstGet(walFileList, walFileIdx);
const String *warning = archivePushDropWarning(walFile, cfgOptionInt64(cfgOptArchivePushQueueMax));
const String *warning = archivePushDropWarning(walFile, cfgOptionUInt64(cfgOptArchivePushQueueMax));

archiveAsyncStatusOkWrite(archiveModePush, walFile, warning);
LOG_WARN(strPtr(warning));
@@ -444,7 +442,7 @@ cmdArchivePushAsync(void)
ProtocolParallel *parallelExec = protocolParallelNew(
(TimeMSec)(cfgOptionDbl(cfgOptProtocolTimeout) * MSEC_PER_SEC) / 2);

for (unsigned int processIdx = 1; processIdx <= (unsigned int)cfgOptionInt(cfgOptProcessMax); processIdx++)
for (unsigned int processIdx = 1; processIdx <= cfgOptionUInt(cfgOptProcessMax); processIdx++)
protocolParallelClientAdd(parallelExec, protocolLocalGet(protocolStorageTypeRepo, processIdx));

// Queue jobs in executor
@@ -457,10 +455,10 @@ cmdArchivePushAsync(void)
ProtocolCommand *command = protocolCommandNew(PROTOCOL_COMMAND_ARCHIVE_PUSH_STR);
protocolCommandParamAdd(command, VARSTR(strNewFmt("%s/%s", strPtr(walPath), strPtr(walFile))));
protocolCommandParamAdd(command, VARSTR(archiveInfo.archiveId));
protocolCommandParamAdd(command, VARUINT64(archiveInfo.pgVersion));
protocolCommandParamAdd(command, VARUINT(archiveInfo.pgVersion));
protocolCommandParamAdd(command, VARUINT64(archiveInfo.pgSystemId));
protocolCommandParamAdd(command, VARSTR(walFile));
protocolCommandParamAdd(command, VARUINT64(cipherType(cfgOptionStr(cfgOptRepoCipherType))));
protocolCommandParamAdd(command, VARUINT(cipherType(cfgOptionStr(cfgOptRepoCipherType))));
protocolCommandParamAdd(command, VARSTR(archiveInfo.archiveCipherPass));
protocolCommandParamAdd(command, VARBOOL(cfgOptionBool(cfgOptCompress)));
protocolCommandParamAdd(command, VARINT(cfgOptionInt(cfgOptCompressLevel)));
@@ -174,7 +174,7 @@ archiveDbList(const String *stanza, const InfoPgData *pgData, VariantList *archi
// Add empty database section to archiveInfo and then fill in database id from the backup.info
KeyValue *databaseInfo = kvPutKv(varKv(archiveInfo), KEY_DATABASE_VAR);

kvAdd(databaseInfo, DB_KEY_ID_VAR, VARUINT64(pgData->id));
kvAdd(databaseInfo, DB_KEY_ID_VAR, VARUINT(pgData->id));

kvPut(varKv(archiveInfo), DB_KEY_ID_VAR, VARSTR(archiveId));
kvPut(varKv(archiveInfo), ARCHIVE_KEY_MIN_VAR, (archiveStart != NULL ? VARSTR(archiveStart) : (Variant *)NULL));
@@ -231,13 +231,13 @@ backupList(VariantList *backupSection, InfoBackup *info)
// backrest section
KeyValue *backrestInfo = kvPutKv(varKv(backupInfo), BACKUP_KEY_BACKREST_VAR);

kvAdd(backrestInfo, BACKREST_KEY_FORMAT_VAR, VARUINT64(backupData.backrestFormat));
kvAdd(backrestInfo, BACKREST_KEY_FORMAT_VAR, VARUINT(backupData.backrestFormat));
kvAdd(backrestInfo, BACKREST_KEY_VERSION_VAR, VARSTR(backupData.backrestVersion));

// database section
KeyValue *dbInfo = kvPutKv(varKv(backupInfo), KEY_DATABASE_VAR);

kvAdd(dbInfo, DB_KEY_ID_VAR, VARUINT64(backupData.backupPgId));
kvAdd(dbInfo, DB_KEY_ID_VAR, VARUINT(backupData.backupPgId));

// info section
KeyValue *infoInfo = kvPutKv(varKv(backupInfo), BACKUP_KEY_INFO_VAR);
@@ -347,7 +347,7 @@ stanzaInfoList(const String *stanza, StringList *stanzaList)
InfoPgData pgData = infoPgData(infoBackupPg(info), pgIdx);
Variant *pgInfo = varNewKv();

kvPut(varKv(pgInfo), DB_KEY_ID_VAR, VARUINT64(pgData.id));
kvPut(varKv(pgInfo), DB_KEY_ID_VAR, VARUINT(pgData.id));
kvPut(varKv(pgInfo), DB_KEY_SYSTEM_ID_VAR, VARUINT64(pgData.systemId));
kvPut(varKv(pgInfo), DB_KEY_VERSION_VAR, VARSTR(pgVersionToStr(pgData.version)));

@@ -421,7 +421,7 @@ formatTextDb(const KeyValue *stanzaInfo, String *resultStr)
for (unsigned int dbIdx = 0; dbIdx < varLstSize(dbSection); dbIdx++)
{
KeyValue *pgInfo = varKv(varLstGet(dbSection, dbIdx));
uint64_t dbId = varUInt64(kvGet(pgInfo, DB_KEY_ID_VAR));
unsigned int dbId = varUInt(kvGet(pgInfo, DB_KEY_ID_VAR));

// List is ordered so 0 is always the current DB index
if (dbIdx == varLstSize(dbSection) - 1)
@@ -434,7 +434,7 @@ formatTextDb(const KeyValue *stanzaInfo, String *resultStr)
{
KeyValue *archiveInfo = varKv(varLstGet(archiveSection, archiveIdx));
KeyValue *archiveDbInfo = varKv(kvGet(archiveInfo, KEY_DATABASE_VAR));
uint64_t archiveDbId = varUInt64(kvGet(archiveDbInfo, DB_KEY_ID_VAR));
unsigned int archiveDbId = varUInt(kvGet(archiveDbInfo, DB_KEY_ID_VAR));

if (archiveDbId == dbId)
{
@@ -461,7 +461,7 @@ formatTextDb(const KeyValue *stanzaInfo, String *resultStr)
{
KeyValue *backupInfo = varKv(varLstGet(backupSection, backupIdx));
KeyValue *backupDbInfo = varKv(kvGet(backupInfo, KEY_DATABASE_VAR));
uint64_t backupDbId = varUInt64(kvGet(backupDbInfo, DB_KEY_ID_VAR));
unsigned int backupDbId = varUInt(kvGet(backupDbInfo, DB_KEY_ID_VAR));

if (backupDbId == dbId)
{
@@ -474,8 +474,8 @@ formatTextDb(const KeyValue *stanzaInfo, String *resultStr)
// Get and format the backup start/stop time
static char timeBufferStart[20];
static char timeBufferStop[20];
time_t timeStart = (time_t) varUInt64(kvGet(timestampInfo, KEY_START_VAR));
time_t timeStop = (time_t) varUInt64(kvGet(timestampInfo, KEY_STOP_VAR));
time_t timeStart = (time_t)varUInt64(kvGet(timestampInfo, KEY_START_VAR));
time_t timeStop = (time_t)varUInt64(kvGet(timestampInfo, KEY_STOP_VAR));

strftime(timeBufferStart, 20, "%Y-%m-%d %H:%M:%S", localtime(&timeStart));
strftime(timeBufferStop, 20, "%Y-%m-%d %H:%M:%S", localtime(&timeStop));
@@ -22,7 +22,7 @@ cmdLocal(int handleRead, int handleWrite)

MEM_CONTEXT_TEMP_BEGIN()
{
String *name = strNewFmt(PROTOCOL_SERVICE_LOCAL "-%d", cfgOptionInt(cfgOptProcess));
String *name = strNewFmt(PROTOCOL_SERVICE_LOCAL "-%u", cfgOptionUInt(cfgOptProcess));
IoRead *read = ioHandleReadIo(ioHandleReadNew(name, handleRead, (TimeMSec)(cfgOptionDbl(cfgOptProtocolTimeout) * 1000)));
ioReadOpen(read);
IoWrite *write = ioHandleWriteIo(ioHandleWriteNew(name, handleWrite));
@@ -23,7 +23,7 @@ cmdRemote(int handleRead, int handleWrite)

MEM_CONTEXT_TEMP_BEGIN()
{
String *name = strNewFmt(PROTOCOL_SERVICE_REMOTE "-%d", cfgOptionInt(cfgOptProcess));
String *name = strNewFmt(PROTOCOL_SERVICE_REMOTE "-%u", cfgOptionUInt(cfgOptProcess));
IoRead *read = ioHandleReadIo(ioHandleReadNew(name, handleRead, (TimeMSec)(cfgOptionDbl(cfgOptProtocolTimeout) * 1000)));
ioReadOpen(read);
IoWrite *write = ioHandleWriteIo(ioHandleWriteNew(name, handleWrite));
@@ -43,7 +43,7 @@ cmdRemote(int handleRead, int handleWrite)
ioReadLine(read);

// Only try the lock if this is process 0, i.e. the remote started from the main process
if (cfgOptionInt(cfgOptProcess) == 0)
if (cfgOptionUInt(cfgOptProcess) == 0)
{
ConfigCommand commandId = cfgCommandId(strPtr(cfgOptionStr(cfgOptCommand)));

@@ -48,7 +48,7 @@ cfgLoadLogSetting(void)
logTimestamp = cfgOptionBool(cfgOptLogTimestamp);

if (cfgOptionValid(cfgOptProcessMax))
logProcessMax = (unsigned int)cfgOptionInt(cfgOptProcessMax);
logProcessMax = cfgOptionUInt(cfgOptProcessMax);

logInit(logLevelConsole, logLevelStdErr, logLevelFile, logTimestamp, logProcessMax);

@@ -172,7 +172,7 @@ cfgLoadUpdateOption(void)
if (cfgOptionTest(cfgOptRepoRetentionFull + optionIdx))
{
cfgOptionSet(cfgOptRepoRetentionArchive + optionIdx, cfgSourceDefault,
VARINT(cfgOptionInt(cfgOptRepoRetentionFull + optionIdx)));
VARUINT(cfgOptionUInt(cfgOptRepoRetentionFull + optionIdx)));
}
}
else if (strEqZ(archiveRetentionType, CFGOPTVAL_TMP_REPO_RETENTION_ARCHIVE_TYPE_DIFF))
@@ -181,7 +181,7 @@ cfgLoadUpdateOption(void)
if (cfgOptionTest(cfgOptRepoRetentionDiff + optionIdx))
{
cfgOptionSet(cfgOptRepoRetentionArchive + optionIdx, cfgSourceDefault,
VARINT(cfgOptionInt(cfgOptRepoRetentionDiff + optionIdx)));
VARUINT(cfgOptionUInt(cfgOptRepoRetentionDiff + optionIdx)));
}
else
{
@@ -270,7 +270,7 @@ cfgLoad(unsigned int argListSize, const char *argList[])
{
// Set IO buffer size
if (cfgOptionValid(cfgOptBufferSize))
ioBufferSizeSet((size_t)cfgOptionInt(cfgOptBufferSize));
ioBufferSizeSet(cfgOptionUInt(cfgOptBufferSize));

// Open the log file if this command logs to a file
if (cfgLogFile() && !cfgCommandHelp())
@@ -284,8 +284,8 @@ cfgLoad(unsigned int argListSize, const char *argList[])
if (cfgCommand() == cfgCmdLocal || cfgCommand() == cfgCmdRemote)
{
strCatFmt(
logFile, "%s-%s-%03d.log", strPtr(cfgOptionStr(cfgOptCommand)), cfgCommandName(cfgCommand()),
cfgOptionInt(cfgOptProcess));
logFile, "%s-%s-%03u.log", strPtr(cfgOptionStr(cfgOptCommand)), cfgCommandName(cfgCommand()),
cfgOptionUInt(cfgOptProcess));
}
// Else add command name
else
@@ -104,7 +104,7 @@ infoBackupNew(const Storage *storage, const String *fileName, bool ignoreMissing

InfoBackupData infoBackupData =
{
.backrestFormat = (unsigned int)varUInt64(kvGet(backupKv, VARSTR(INFO_KEY_FORMAT_STR))),
.backrestFormat = varUIntForce(kvGet(backupKv, VARSTR(INFO_KEY_FORMAT_STR))),
.backrestVersion = varStrForce(kvGet(backupKv, VARSTR(INFO_KEY_VERSION_STR))),
.backupInfoRepoSize = varUInt64(kvGet(backupKv, INFO_BACKUP_KEY_BACKUP_INFO_REPO_SIZE_VAR)),
.backupInfoRepoSizeDelta = varUInt64(kvGet(backupKv, INFO_BACKUP_KEY_BACKUP_INFO_REPO_SIZE_DELTA_VAR)),
@@ -131,7 +131,7 @@ protocolLocalGet(ProtocolStorageType protocolStorageType, unsigned int protocolI
{
MEM_CONTEXT_BEGIN(protocolHelper.memContext)
{
protocolHelper.clientLocalSize = (unsigned int)cfgOptionInt(cfgOptProcessMax);
protocolHelper.clientLocalSize = cfgOptionUInt(cfgOptProcessMax);
protocolHelper.clientLocal = (ProtocolHelperClient *)memNew(
protocolHelper.clientLocalSize * sizeof(ProtocolHelperClient));
}
@@ -193,7 +193,7 @@ protocolRemoteParam(ProtocolStorageType protocolStorageType, unsigned int protoc
if (cfgOptionTest(cfgOptRepoHostPort))
{
strLstAddZ(result, "-p");
strLstAdd(result, strNewFmt("%d", cfgOptionInt(cfgOptRepoHostPort)));
strLstAdd(result, strNewFmt("%u", cfgOptionUInt(cfgOptRepoHostPort)));
}

// Append user/host
@@ -276,7 +276,7 @@ protocolRemoteGet(ProtocolStorageType protocolStorageType)
unsigned int protocolId = 0;

if (cfgOptionTest(cfgOptProcess))
protocolId = (unsigned int)cfgOptionInt(cfgOptProcess);
protocolId = cfgOptionUInt(cfgOptProcess);

ASSERT(protocolId < protocolHelper.clientRemoteSize);

@@ -107,8 +107,8 @@ storageDriverRemoteFileWriteOpen(StorageDriverRemoteFileWrite *this)
{
ProtocolCommand *command = protocolCommandNew(PROTOCOL_COMMAND_STORAGE_OPEN_WRITE_STR);
protocolCommandParamAdd(command, VARSTR(this->name));
protocolCommandParamAdd(command, VARUINT64(this->modeFile));
protocolCommandParamAdd(command, VARUINT64(this->modePath));
protocolCommandParamAdd(command, VARUINT(this->modeFile));
protocolCommandParamAdd(command, VARUINT(this->modePath));
protocolCommandParamAdd(command, VARBOOL(this->createPath));
protocolCommandParamAdd(command, VARBOOL(this->syncFile));
protocolCommandParamAdd(command, VARBOOL(this->syncPath));
Oops, something went wrong.

0 comments on commit 0c866f5

Please sign in to comment.
You can’t perform that action at this time.