From f0f105ddeca7b3434d9c3bb78e25247b3c6b7584 Mon Sep 17 00:00:00 2001 From: David Steele Date: Thu, 9 May 2019 12:10:46 -0400 Subject: [PATCH] Improve filter's notion of "done" to optimize filter processing. Filters had different ideas about what "done" meant and this added complication to the group filter processing. For example, gzip decompression would detect end of stream and mark the filter as done before it had been flushed. Improve the IoFilter interface to give a consistent definition of done across all filters, i.e. no filter can be done until it has started flushing no matter what the underlying driver reports. This removes quite a bit of tricky logic in the processing loop which tried to determine when a filter was "really" done. Also improve management of the input buffers by pointing directly to the prior output buffer (or the caller's input) to eliminate loops that set/cleared these buffers. --- doc/xml/release.xml | 2 +- src/common/io/filter/filter.c | 32 ++++++++++-- src/common/io/filter/filter.h | 4 +- src/common/io/filter/group.c | 91 ++++++++++++++------------------- src/common/io/filter/size.c | 3 +- src/common/io/filter/size.h | 6 +++ test/src/module/common/ioTest.c | 6 ++- 7 files changed, 79 insertions(+), 65 deletions(-) diff --git a/doc/xml/release.xml b/doc/xml/release.xml index 94796d45a4..cd28866153 100644 --- a/doc/xml/release.xml +++ b/doc/xml/release.xml @@ -36,7 +36,7 @@ -

Only process next filter in IoFilterGroup when input buffer is full or flushing.

+

Filter improvements. Only process next filter in IoFilterGroup when input buffer is full or flushing. Improve filter's notion of done to optimize filter processing.

diff --git a/src/common/io/filter/filter.c b/src/common/io/filter/filter.c index 4b58d52315..69755be873 100644 --- a/src/common/io/filter/filter.c +++ b/src/common/io/filter/filter.c @@ -18,6 +18,8 @@ struct IoFilter const String *type; // Filter type void *driver; // Filter driver IoFilterInterface interface; // Filter interface + + bool flushing; // Has the filter started flushing? }; OBJECT_DEFINE_FREE(IO_FILTER); @@ -69,9 +71,13 @@ ioFilterProcessIn(IoFilter *this, const Buffer *input) ASSERT(this != NULL); ASSERT(this->interface.in != NULL); - CHECK(input != NULL && bufUsed(input) > 0); + CHECK(input == NULL || bufUsed(input) > 0); + CHECK(!this->flushing || input == NULL); - this->interface.in(this->driver, input); + if (input == NULL) + this->flushing = true; + else + this->interface.in(this->driver, input); FUNCTION_TEST_RETURN_VOID(); } @@ -92,8 +98,13 @@ ioFilterProcessInOut(IoFilter *this, const Buffer *input, Buffer *output) ASSERT(output != NULL); ASSERT(this->interface.inOut != NULL); CHECK(input == NULL || bufUsed(input) > 0); + CHECK(!this->flushing || input == NULL); - this->interface.inOut(this->driver, input, output); + if (input == NULL && !this->flushing) + this->flushing = true; + + if (!ioFilterDone(this)) + this->interface.inOut(this->driver, input, output); CHECK(!ioFilterInputSame(this) || bufUsed(output) > 0); FUNCTION_TEST_RETURN_VOID(); @@ -121,7 +132,8 @@ ioFilterMove(IoFilter *this, MemContext *parentNew) /*********************************************************************************************************************************** Is the filter done? -If done is not defined by the filter then check inputSame. If inputSame is true then the filter is not done. +If done is not defined by the filter then check inputSame. If inputSame is true then the filter is not done. Even if the filter +is done the interface will not report done until the interface is flushing. ***********************************************************************************************************************************/ bool ioFilterDone(const IoFilter *this) @@ -132,7 +144,8 @@ ioFilterDone(const IoFilter *this) ASSERT(this != NULL); - FUNCTION_TEST_RETURN(this->interface.done != NULL ? this->interface.done(this->driver) : !ioFilterInputSame(this)); + FUNCTION_TEST_RETURN( + this->flushing && (this->interface.done != NULL ? this->interface.done(this->driver) : !ioFilterInputSame(this))); } /*********************************************************************************************************************************** @@ -230,3 +243,12 @@ ioFilterType(const IoFilter *this) FUNCTION_TEST_RETURN(this->type); } + +/*********************************************************************************************************************************** +Render as string for logging +***********************************************************************************************************************************/ +String * +ioFilterToLog(const IoFilter *this) +{ + return strNewFmt("{type: %s}", strPtr(this->type)); +} diff --git a/src/common/io/filter/filter.h b/src/common/io/filter/filter.h index 776b5b20fc..9144740841 100644 --- a/src/common/io/filter/filter.h +++ b/src/common/io/filter/filter.h @@ -36,9 +36,11 @@ void ioFilterFree(IoFilter *this); /*********************************************************************************************************************************** Macros for function logging ***********************************************************************************************************************************/ +String *ioFilterToLog(const IoFilter *this); + #define FUNCTION_LOG_IO_FILTER_TYPE \ IoFilter * #define FUNCTION_LOG_IO_FILTER_FORMAT(value, buffer, bufferSize) \ - objToLog(value, "IoFilter", buffer, bufferSize) + FUNCTION_LOG_STRING_OBJECT_FORMAT(value, ioFilterToLog, buffer, bufferSize) #endif diff --git a/src/common/io/filter/group.c b/src/common/io/filter/group.c index 665f096a84..3c3c09eb11 100644 --- a/src/common/io/filter/group.c +++ b/src/common/io/filter/group.c @@ -22,7 +22,7 @@ Contains the filter object and inout/output buffers. ***********************************************************************************************************************************/ typedef struct IoFilterData { - const Buffer *input; // Input buffer for filter + const Buffer **input; // Input buffer for filter Buffer *inputLocal; // Non-null if a locally created buffer that can be cleared IoFilter *filter; // Filter to apply Buffer *output; // Output buffer for filter @@ -41,7 +41,7 @@ struct IoFilterGroup { MemContext *memContext; // Mem context List *filterList; // List of filters to apply - unsigned int firstOutputFilter; // Index of the first output filter + const Buffer *input; // Input buffer passed in for processing KeyValue *filterResult; // Filter results (if any) bool inputSame; // Same input required again? bool done; // Is processing done? @@ -140,34 +140,33 @@ ioFilterGroupOpen(IoFilterGroup *this) ioFilterGroupAdd(this, ioBufferNew()); // Create filter input/output buffers. Input filters do not get an output buffer since they don't produce output. - Buffer *lastOutputBuffer = NULL; + Buffer **lastOutputBuffer = NULL; for (unsigned int filterIdx = 0; filterIdx < lstSize(this->filterList); filterIdx++) { IoFilterData *filterData = ioFilterGroupGet(this, filterIdx); - // Assign the last output buffer to the input. At first there won't be an input filter because it will be passed into - // the process function as an input. - if (lastOutputBuffer != NULL) + // If there is no last output buffer yet, then use the input buffer that will be provided by the caller + if (lastOutputBuffer == NULL) { - filterData->input = lastOutputBuffer; - filterData->inputLocal = lastOutputBuffer; + filterData->input = &this->input; } - - // Is this an output filter? - if (ioFilterOutput(filterData->filter)) + // Else assign the last output buffer to the input + else { - // If this is the first output buffer found, store the index so it can be easily found during processing - if (lastOutputBuffer == NULL) - this->firstOutputFilter = filterIdx; + // This cast is required because the compiler can't guarantee the const-ness of this object, i.e. it could be + // modified in other parts of the code. This is actually expected and the only reason we need this const is to + // match the const-ness of the input buffer provided by the caller. + filterData->input = (const Buffer **)lastOutputBuffer; + filterData->inputLocal = *lastOutputBuffer; + } - // If this is not the last output filter then create a new output buffer for it. The output buffer for the last - // filter will be provided to the process function. - if (filterIdx < lstSize(this->filterList) - 1) - { - lastOutputBuffer = bufNew(ioBufferSize()); - filterData->output = lastOutputBuffer; - } + // If this is not the last output filter then create a new output buffer for it. The output buffer for the last filter + // will be provided to the process function. + if (ioFilterOutput(filterData->filter) && filterIdx < lstSize(this->filterList) - 1) + { + filterData->output = bufNew(ioBufferSize()); + lastOutputBuffer = &filterData->output; } } } @@ -195,6 +194,7 @@ ioFilterGroupProcess(IoFilterGroup *this, const Buffer *input, Buffer *output) ASSERT(this != NULL); ASSERT(this->opened && !this->closed); + ASSERT(input == NULL || bufUsed(input) > 0); ASSERT(!this->flushing || input == NULL); ASSERT(output != NULL); ASSERT(bufRemains(output) > 0); @@ -205,14 +205,8 @@ ioFilterGroupProcess(IoFilterGroup *this, const Buffer *input, Buffer *output) this->flushing = true; #endif - // Assign the input buffer up to the first output filter. After this point the input buffers were locally created during open. - if (!this->inputSame) - { - for (unsigned int filterIdx = 0; filterIdx <= this->firstOutputFilter; filterIdx++) - (ioFilterGroupGet(this, filterIdx))->input = input; - } - - // Assign the output buffer + // Assign input and output buffers + this->input = input; (ioFilterGroupGet(this, lstSize(this->filterList) - 1))->output = output; // @@ -238,7 +232,7 @@ ioFilterGroupProcess(IoFilterGroup *this, const Buffer *input, Buffer *output) break; } } - while (filterIdx != this->firstOutputFilter); + while (filterIdx != 0); // If no filter is found that needs the same input that means we are done with the current input. So end the loop and // get some more input. @@ -252,27 +246,13 @@ ioFilterGroupProcess(IoFilterGroup *this, const Buffer *input, Buffer *output) { IoFilterData *filterData = ioFilterGroupGet(this, filterIdx); - // If the filter produces output - if (ioFilterOutput(filterData->filter)) + // Process the filter if it is not done + if (!ioFilterDone(filterData->filter)) { - // Keep processing while the filter is not done or there is input - if (!ioFilterDone(filterData->filter) || filterData->input != NULL) + // If the filter produces output + if (ioFilterOutput(filterData->filter)) { - // If we are flushing and the prior filter is done and is not producing any more output then this filter should - // be flushing as well. Set filterData->input = NULL so it knows there is no more input coming. - // - // If the filter is already done then there is no need to set input to NULL because it has already flushed and - // the filter shouldn't need to hand NULL input if it doesn't need it to know when to flush. - // - // Checking filterIdx - 1 is safe because the first filter's filterData->input is always set to NULL when input - // is NULL. - if (input == NULL && filterData->input != NULL && !ioFilterDone(filterData->filter) && - bufUsed(filterData->input) == 0) - { - filterData->input = NULL; - } - - ioFilterProcessInOut(filterData->filter, filterData->input, filterData->output); + ioFilterProcessInOut(filterData->filter, *filterData->input, filterData->output); // If inputSame is set then the output buffer for this filter is full and it will need to be re-processed with // the same input once the output buffer is cleared @@ -287,14 +267,17 @@ ioFilterGroupProcess(IoFilterGroup *this, const Buffer *input, Buffer *output) // If the output buffer is not full and the filter is not done then more data is required if (!bufFull(filterData->output) && !ioFilterDone(filterData->filter)) - { break; - } } + // Else the filter does not produce output + else + ioFilterProcessIn(filterData->filter, *filterData->input); } - // Else the filter does not produce output. No need to flush these filters because they don't buffer data. - else if (filterData->input != NULL) - ioFilterProcessIn(filterData->filter, filterData->input); + + // If the filter is done and has no more output then null the output buffer. Downstream filters have a pointer to this + // buffer so their inputs will also change to null and they'll flush. + if (filterData->output != NULL && ioFilterDone(filterData->filter) && bufUsed(filterData->output) == 0) + filterData->output = NULL; } } while (!bufFull(output) && this->inputSame); diff --git a/src/common/io/filter/size.c b/src/common/io/filter/size.c index ab8da881c8..adac10cf1f 100644 --- a/src/common/io/filter/size.c +++ b/src/common/io/filter/size.c @@ -15,8 +15,7 @@ IO Size Filter /*********************************************************************************************************************************** Filter type constant ***********************************************************************************************************************************/ -#define SIZE_FILTER_TYPE "size" - STRING_STATIC(SIZE_FILTER_TYPE_STR, SIZE_FILTER_TYPE); +STRING_EXTERN(SIZE_FILTER_TYPE_STR, SIZE_FILTER_TYPE); /*********************************************************************************************************************************** Object type diff --git a/src/common/io/filter/size.h b/src/common/io/filter/size.h index 77dba9aa26..d517ce3943 100644 --- a/src/common/io/filter/size.h +++ b/src/common/io/filter/size.h @@ -9,6 +9,12 @@ in a FilterGroup with IoWrite. #include "common/io/filter/filter.h" +/*********************************************************************************************************************************** +Filter type constant +***********************************************************************************************************************************/ +#define SIZE_FILTER_TYPE "size" + STRING_DECLARE(SIZE_FILTER_TYPE_STR); + /*********************************************************************************************************************************** Constructor ***********************************************************************************************************************************/ diff --git a/test/src/module/common/ioTest.c b/test/src/module/common/ioTest.c index 86c566585c..ed99ff5cce 100644 --- a/test/src/module/common/ioTest.c +++ b/test/src/module/common/ioTest.c @@ -290,7 +290,7 @@ testRun(void) IoFilter *sizeFilter = ioSizeNew(); TEST_RESULT_PTR(ioFilterGroupAdd(filterGroup, sizeFilter), filterGroup, " add filter to filter group"); TEST_RESULT_VOID( - ioFilterGroupAdd(filterGroup, ioTestFilterMultiplyNew("double", 2, 1, 'X')), " add filter to filter group"); + ioFilterGroupAdd(filterGroup, ioTestFilterMultiplyNew("double", 2, 3, 'X')), " add filter to filter group"); TEST_RESULT_VOID(ioFilterGroupAdd(filterGroup, ioSizeNew()), " add filter to filter group"); IoFilter *bufferFilter = ioBufferNew(); TEST_RESULT_VOID(ioFilterGroupAdd(filterGroup, bufferFilter), " add filter to filter group"); @@ -314,6 +314,8 @@ testRun(void) TEST_RESULT_STR(strPtr(strNewBuf(buffer)), "33X", " check read"); TEST_RESULT_VOID(bufUsedZero(buffer), " zero buffer"); + TEST_RESULT_SIZE(ioRead(bufferRead, buffer), 2, " read 2 bytes"); + TEST_RESULT_STR(strPtr(strNewBuf(buffer)), "XX", " check read"); TEST_RESULT_BOOL(ioReadEof(bufferRead), true, " eof"); TEST_RESULT_BOOL(ioBufferRead(ioReadDriver(bufferRead), buffer, true), 0, " eof from driver"); TEST_RESULT_SIZE(ioRead(bufferRead, buffer), 0, " read 0 bytes"); @@ -325,7 +327,7 @@ testRun(void) " check filter result"); TEST_RESULT_PTR(ioFilterGroupResult(filterGroup, strNew("double")), NULL, " check filter result is NULL"); TEST_RESULT_UINT( - varUInt64(varLstGet(varVarLst(ioFilterGroupResult(filterGroup, ioFilterType(sizeFilter))), 1)), 7, + varUInt64(varLstGet(varVarLst(ioFilterGroupResult(filterGroup, ioFilterType(sizeFilter))), 1)), 9, " check filter result"); TEST_RESULT_PTR(ioFilterDriver(bufferFilter), bufferFilter->driver, " check filter driver");