Skip to content

Commit

Permalink
Convert page checksum filter result to a pack.
Browse files Browse the repository at this point in the history
The pack is both more compact and more efficient than a variant.

Also aggregate the page error info in the main process rather than in the filter to allow additional LSN filtering, to be added in a future commit.
  • Loading branch information
dwsteele committed Sep 24, 2021
1 parent ac1f6db commit c8ea17c
Show file tree
Hide file tree
Showing 12 changed files with 186 additions and 108 deletions.
107 changes: 102 additions & 5 deletions src/command/backup/backup.c
Expand Up @@ -1013,6 +1013,101 @@ backupStop(BackupData *backupData, Manifest *manifest)
FUNCTION_LOG_RETURN_STRUCT(result);
}

/***********************************************************************************************************************************
Convert page checksum error pack to a VariantList
***********************************************************************************************************************************/
// Helper to output pages and page ranges
static void
backupJobResultPageChecksumOut(VariantList *const result, const unsigned int pageBegin, const unsigned int pageEnd)
{
FUNCTION_TEST_BEGIN();
FUNCTION_TEST_PARAM(VARIANT_LIST, result);
FUNCTION_TEST_PARAM(UINT, pageBegin);
FUNCTION_TEST_PARAM(UINT, pageEnd);
FUNCTION_TEST_END();

// Output a single page
if (pageBegin == pageEnd)
{
varLstAdd(result, varNewUInt64(pageBegin));
}
// Else output a page range
else
{
VariantList *errorListSub = varLstNew();
varLstAdd(errorListSub, varNewUInt64(pageBegin));
varLstAdd(errorListSub, varNewUInt64(pageEnd));
varLstAdd(result, varNewVarLst(errorListSub));
}

FUNCTION_TEST_RETURN_VOID();
}

static VariantList *
backupJobResultPageChecksum(PackRead *const checksumPageResult)
{
FUNCTION_LOG_BEGIN(logLevelDebug);
FUNCTION_LOG_PARAM(PACK_READ, checksumPageResult);
FUNCTION_LOG_END();

VariantList *result = NULL;

// If there is an error result array
if (!pckReadNullP(checksumPageResult))
{
result = varLstNew();
pckReadArrayBeginP(checksumPageResult);

bool first = false;
unsigned int pageBegin = 0;
unsigned int pageEnd = 0;

// Combine results into a more compact form
while (pckReadNext(checksumPageResult))
{
unsigned int pageId = pckReadId(checksumPageResult) - 1;
pckReadObjBeginP(checksumPageResult, .id = pageId + 1);

// ??? Discarded for now but will eventually be used for filtering
pckReadU64P(checksumPageResult);

// If first error then just store page
if (!first)
{
pageBegin = pageId;
pageEnd = pageId;
first = true;
}
// Expand list when the page is in sequence
else if (pageId == pageEnd + 1)
{
pageEnd = pageId;
}
// Else output the page or page range
else
{
backupJobResultPageChecksumOut(result, pageBegin, pageEnd);

// Start again with a single page range
pageBegin = pageId;
pageEnd = pageId;
}

pckReadObjEndP(checksumPageResult);
}

// Check that the array was not empty
CHECK(first);

// Output last page or page range
backupJobResultPageChecksumOut(result, pageBegin, pageEnd);

pckReadArrayEndP(checksumPageResult);
}

FUNCTION_LOG_RETURN(VARIANT_LIST, result);
}

/***********************************************************************************************************************************
Log the results of a job and throw errors
***********************************************************************************************************************************/
Expand Down Expand Up @@ -1049,7 +1144,7 @@ backupJobResult(
const uint64_t copySize = pckReadU64P(jobResult);
const uint64_t repoSize = pckReadU64P(jobResult);
const String *const copyChecksum = pckReadStrP(jobResult);
const KeyValue *const checksumPageResult = varKv(jsonToVar(pckReadStrP(jobResult, .defaultValue = NULL_STR)));
PackRead *const checksumPageResult = pckReadPackReadP(jobResult);

// Increment backup copy progress
sizeCopied += copySize;
Expand Down Expand Up @@ -1109,12 +1204,14 @@ backupJobResult(

if (checksumPageResult != NULL)
{
checksumPageErrorList = backupJobResultPageChecksum(checksumPageResult);

// If the checksum was valid
if (!varBool(kvGet(checksumPageResult, VARSTRDEF("valid"))))
if (!pckReadBoolP(checksumPageResult))
{
checksumPageError = true;

if (!varBool(kvGet(checksumPageResult, VARSTRDEF("align"))))
if (!pckReadBoolP(checksumPageResult))
{
checksumPageErrorList = NULL;

Expand All @@ -1126,8 +1223,8 @@ backupJobResult(
else
{
// Format the page checksum errors
checksumPageErrorList = varVarLst(kvGet(checksumPageResult, VARSTRDEF("error")));
ASSERT(!varLstEmpty(checksumPageErrorList));
CHECK(checksumPageErrorList != NULL);
CHECK(!varLstEmpty(checksumPageErrorList));

String *error = strNew();
unsigned int errorTotalMin = 0;
Expand Down
5 changes: 2 additions & 3 deletions src/command/backup/file.c
Expand Up @@ -251,9 +251,8 @@ backupFile(
// Get results of page checksum validation
if (pgFileChecksumPage)
{
result.pageChecksumResult = jsonToKv(
pckReadStrP(
ioFilterGroupResultP(ioReadFilterGroup(storageReadIo(read)), PAGE_CHECKSUM_FILTER_TYPE)));
result.pageChecksumResult = pckDup(
ioFilterGroupResultPackP(ioReadFilterGroup(storageReadIo(read)), PAGE_CHECKSUM_FILTER_TYPE));
}
}
MEM_CONTEXT_PRIOR_END();
Expand Down
2 changes: 1 addition & 1 deletion src/command/backup/file.h
Expand Up @@ -30,7 +30,7 @@ typedef struct BackupFileResult
uint64_t copySize;
String *copyChecksum;
uint64_t repoSize;
KeyValue *pageChecksumResult;
Pack *pageChecksumResult;
} BackupFileResult;

BackupFileResult backupFile(
Expand Down
107 changes: 30 additions & 77 deletions src/command/backup/pageChecksum.c
Expand Up @@ -27,10 +27,7 @@ typedef struct PageChecksum

bool valid; // Is the relation structure valid?
bool align; // Is the relation alignment valid?
VariantList *error; // List of checksum errors

unsigned int errorMin; // Current min error page
unsigned int errorMax; // Current max error page
PackWrite *error; // List of checksum errors
} PageChecksum;

/***********************************************************************************************************************************
Expand Down Expand Up @@ -121,20 +118,21 @@ pageChecksumProcess(THIS_VOID, const Buffer *input)
continue;
}

// Add the page error
MEM_CONTEXT_BEGIN(this->memContext)
// Create the error list if it does not exist yet
if (this->error == NULL)
{
// Create the error list if it does not exist yet
if (this->error == NULL)
this->error = varLstNew();

// Add page number and lsn to the error list
VariantList *pair = varLstNew();
varLstAdd(pair, varNewUInt(blockNo));
varLstAdd(pair, varNewUInt64(pageLsn));
varLstAdd(this->error, varNewVarLst(pair));
MEM_CONTEXT_BEGIN(this->memContext)
{
this->error = pckWriteNewP();
pckWriteArrayBeginP(this->error);
}
MEM_CONTEXT_END();
}
MEM_CONTEXT_END();

// Add page number and lsn to the error list
pckWriteObjBeginP(this->error, .id = blockNo + 1);
pckWriteU64P(this->error, pageLsn);
pckWriteObjEndP(this->error);
}

this->pageNoOffset += pageTotal;
Expand All @@ -159,76 +157,31 @@ pageChecksumResult(THIS_VOID)

Pack *result = NULL;

MEM_CONTEXT_TEMP_BEGIN()
MEM_CONTEXT_BEGIN(this->memContext)
{
KeyValue *error = kvNew();

// End the error array
if (this->error != NULL)
{
VariantList *errorList = varLstNew();
unsigned int errorIdx = 0;

// Convert the full list to an abbreviated list. In the future we want to return the entire list so pages can be verified
// in the WAL.
do
{
unsigned int pageId = varUInt(varLstGet(varVarLst(varLstGet(this->error, errorIdx)), 0));

if (errorIdx == varLstSize(this->error) - 1)
{
varLstAdd(errorList, varNewUInt(pageId));
errorIdx++;
}
else
{
unsigned int pageIdNext = varUInt(varLstGet(varVarLst(varLstGet(this->error, errorIdx + 1)), 0));

if (pageIdNext > pageId + 1)
{
varLstAdd(errorList, varNewUInt(pageId));
errorIdx++;
}
else
{
unsigned int pageIdLast = pageIdNext;
errorIdx++;

while (errorIdx < varLstSize(this->error) - 1)
{
pageIdNext = varUInt(varLstGet(varVarLst(varLstGet(this->error, errorIdx + 1)), 0));

if (pageIdNext > pageIdLast + 1)
break;

pageIdLast = pageIdNext;
errorIdx++;
}

VariantList *errorListSub = varLstNew();
varLstAdd(errorListSub, varNewUInt(pageId));
varLstAdd(errorListSub, varNewUInt(pageIdLast));
varLstAdd(errorList, varNewVarLst(errorListSub));
errorIdx++;
}
}
}
while (errorIdx < varLstSize(this->error));

pckWriteArrayEndP(this->error);
this->valid = false;
kvPut(error, varNewStrZ("error"), varNewVarLst(errorList));
}
// Else create a pack to hold the flags
else
{
this->error = pckWriteNewP();
pckWriteNullP(this->error);
}

kvPut(error, VARSTRDEF("valid"), VARBOOL(this->valid));
kvPut(error, VARSTRDEF("align"), VARBOOL(this->align));

PackWrite *const packWrite = pckWriteNewP();
// Valid and align flags
pckWriteBoolP(this->error, this->valid, .defaultWrite = true);
pckWriteBoolP(this->error, this->align, .defaultWrite = true);

pckWriteStrP(packWrite, jsonFromKv(error));
pckWriteEndP(packWrite);
// End pack
pckWriteEndP(this->error);

result = pckMove(pckWriteResult(packWrite), memContextPrior());
result = pckMove(pckWriteResult(this->error), memContextPrior());
}
MEM_CONTEXT_TEMP_END();
MEM_CONTEXT_END();

FUNCTION_LOG_RETURN(PACK, result);
}
Expand Down
2 changes: 1 addition & 1 deletion src/command/backup/protocol.c
Expand Up @@ -55,7 +55,7 @@ backupFileProtocol(PackRead *const param, ProtocolServer *const server)
pckWriteU64P(resultPack, result.copySize);
pckWriteU64P(resultPack, result.repoSize);
pckWriteStrP(resultPack, result.copyChecksum);
pckWriteStrP(resultPack, result.pageChecksumResult != NULL ? jsonFromKv(result.pageChecksumResult) : NULL);
pckWritePackP(resultPack, result.pageChecksumResult);

protocolServerDataPut(server, resultPack);
protocolServerDataEndPut(server);
Expand Down
23 changes: 18 additions & 5 deletions src/common/io/filter/group.c
Expand Up @@ -420,8 +420,8 @@ ioFilterGroupParamAll(const IoFilterGroup *this)
}

/**********************************************************************************************************************************/
PackRead *
ioFilterGroupResult(const IoFilterGroup *const this, const StringId filterType, const IoFilterGroupResultParam param)
const Pack *
ioFilterGroupResultPack(const IoFilterGroup *const this, const StringId filterType, const IoFilterGroupResultParam param)
{
FUNCTION_LOG_BEGIN(logLevelDebug);
FUNCTION_LOG_PARAM(IO_FILTER_GROUP, this);
Expand All @@ -432,7 +432,7 @@ ioFilterGroupResult(const IoFilterGroup *const this, const StringId filterType,
ASSERT(this->pub.opened);
ASSERT(filterType != 0);

PackRead *result = NULL;
const Pack *result = NULL;

// Search for the result
unsigned int foundIdx = 0;
Expand All @@ -447,7 +447,7 @@ ioFilterGroupResult(const IoFilterGroup *const this, const StringId filterType,
// If the index matches return the result
if (foundIdx == param.idx)
{
result = pckReadNew(filterResult->result);
result = filterResult->result;
break;
}

Expand All @@ -456,7 +456,20 @@ ioFilterGroupResult(const IoFilterGroup *const this, const StringId filterType,
}
}

FUNCTION_LOG_RETURN(PACK_READ, result);
FUNCTION_LOG_RETURN_CONST(PACK, result);
}

/**********************************************************************************************************************************/
PackRead *
ioFilterGroupResult(const IoFilterGroup *const this, const StringId filterType, const IoFilterGroupResultParam param)
{
FUNCTION_LOG_BEGIN(logLevelDebug);
FUNCTION_LOG_PARAM(IO_FILTER_GROUP, this);
FUNCTION_LOG_PARAM(STRING_ID, filterType);
FUNCTION_LOG_PARAM(UINT, param.idx);
FUNCTION_LOG_END();

FUNCTION_LOG_RETURN(PACK_READ, pckReadNew(ioFilterGroupResultPack(this, filterType, param)));
}

/**********************************************************************************************************************************/
Expand Down
5 changes: 5 additions & 0 deletions src/common/io/filter/group.h
Expand Up @@ -73,6 +73,11 @@ typedef struct IoFilterGroupResultParam

PackRead *ioFilterGroupResult(const IoFilterGroup *this, StringId filterType, IoFilterGroupResultParam param);

#define ioFilterGroupResultPackP(this, filterType, ...) \
ioFilterGroupResultPack(this, filterType, (IoFilterGroupResultParam){VAR_PARAM_INIT, __VA_ARGS__})

const Pack *ioFilterGroupResultPack(const IoFilterGroup *this, StringId filterType, IoFilterGroupResultParam param);

// Get/set all filter results
Pack *ioFilterGroupResultAll(const IoFilterGroup *this);
void ioFilterGroupResultAllSet(IoFilterGroup *this, const Pack *filterResult);
Expand Down
7 changes: 7 additions & 0 deletions src/common/type/pack.h
Expand Up @@ -134,6 +134,13 @@ typedef enum
/***********************************************************************************************************************************
Pack Functions
***********************************************************************************************************************************/
// Duplicate pack
__attribute__((always_inline)) static inline Pack *
pckDup(const Pack *const this)
{
return (Pack *)bufDup((const Buffer *)this);
}

// Cast Buffer to Pack
__attribute__((always_inline)) static inline const Pack *
pckFromBuf(const Buffer *const buffer)
Expand Down

0 comments on commit c8ea17c

Please sign in to comment.