Skip to content

Commit

Permalink
Improve filter's notion of "done" to optimize filter processing.
Browse files Browse the repository at this point in the history
Filters had different ideas about what "done" meant and this added complication to the group filter processing.  For example, gzip decompression would detect end of stream and mark the filter as done before it had been flushed.

Improve the IoFilter interface to give a consistent definition of done across all filters, i.e. no filter can be done until it has started flushing no matter what the underlying driver reports.  This removes quite a bit of tricky logic in the processing loop which tried to determine when a filter was "really" done.

Also improve management of the input buffers by pointing directly to the prior output buffer (or the caller's input) to eliminate loops that set/cleared these buffers.
  • Loading branch information
dwsteele committed May 9, 2019
1 parent d5fac35 commit f0f105d
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 65 deletions.
2 changes: 1 addition & 1 deletion doc/xml/release.xml
Expand Up @@ -36,7 +36,7 @@

<release-development-list>
<release-item>
<p>Only process next filter in <code>IoFilterGroup</code> when input buffer is full or flushing.</p>
<p>Filter improvements. Only process next filter in <code>IoFilterGroup</code> when input buffer is full or flushing. Improve filter's notion of <quote>done</quote> to optimize filter processing.</p>
</release-item>

<release-item>
Expand Down
32 changes: 27 additions & 5 deletions src/common/io/filter/filter.c
Expand Up @@ -18,6 +18,8 @@ struct IoFilter
const String *type; // Filter type
void *driver; // Filter driver
IoFilterInterface interface; // Filter interface

bool flushing; // Has the filter started flushing?
};

OBJECT_DEFINE_FREE(IO_FILTER);
Expand Down Expand Up @@ -69,9 +71,13 @@ ioFilterProcessIn(IoFilter *this, const Buffer *input)

ASSERT(this != NULL);
ASSERT(this->interface.in != NULL);
CHECK(input != NULL && bufUsed(input) > 0);
CHECK(input == NULL || bufUsed(input) > 0);
CHECK(!this->flushing || input == NULL);

this->interface.in(this->driver, input);
if (input == NULL)
this->flushing = true;
else
this->interface.in(this->driver, input);

FUNCTION_TEST_RETURN_VOID();
}
Expand All @@ -92,8 +98,13 @@ ioFilterProcessInOut(IoFilter *this, const Buffer *input, Buffer *output)
ASSERT(output != NULL);
ASSERT(this->interface.inOut != NULL);
CHECK(input == NULL || bufUsed(input) > 0);
CHECK(!this->flushing || input == NULL);

this->interface.inOut(this->driver, input, output);
if (input == NULL && !this->flushing)
this->flushing = true;

if (!ioFilterDone(this))
this->interface.inOut(this->driver, input, output);

CHECK(!ioFilterInputSame(this) || bufUsed(output) > 0);
FUNCTION_TEST_RETURN_VOID();
Expand Down Expand Up @@ -121,7 +132,8 @@ ioFilterMove(IoFilter *this, MemContext *parentNew)
/***********************************************************************************************************************************
Is the filter done?
If done is not defined by the filter then check inputSame. If inputSame is true then the filter is not done.
If done is not defined by the filter then check inputSame. If inputSame is true then the filter is not done. Even if the filter
is done the interface will not report done until the interface is flushing.
***********************************************************************************************************************************/
bool
ioFilterDone(const IoFilter *this)
Expand All @@ -132,7 +144,8 @@ ioFilterDone(const IoFilter *this)

ASSERT(this != NULL);

FUNCTION_TEST_RETURN(this->interface.done != NULL ? this->interface.done(this->driver) : !ioFilterInputSame(this));
FUNCTION_TEST_RETURN(
this->flushing && (this->interface.done != NULL ? this->interface.done(this->driver) : !ioFilterInputSame(this)));
}

/***********************************************************************************************************************************
Expand Down Expand Up @@ -230,3 +243,12 @@ ioFilterType(const IoFilter *this)

FUNCTION_TEST_RETURN(this->type);
}

/***********************************************************************************************************************************
Render as string for logging
***********************************************************************************************************************************/
String *
ioFilterToLog(const IoFilter *this)
{
return strNewFmt("{type: %s}", strPtr(this->type));
}
4 changes: 3 additions & 1 deletion src/common/io/filter/filter.h
Expand Up @@ -36,9 +36,11 @@ void ioFilterFree(IoFilter *this);
/***********************************************************************************************************************************
Macros for function logging
***********************************************************************************************************************************/
String *ioFilterToLog(const IoFilter *this);

#define FUNCTION_LOG_IO_FILTER_TYPE \
IoFilter *
#define FUNCTION_LOG_IO_FILTER_FORMAT(value, buffer, bufferSize) \
objToLog(value, "IoFilter", buffer, bufferSize)
FUNCTION_LOG_STRING_OBJECT_FORMAT(value, ioFilterToLog, buffer, bufferSize)

#endif
91 changes: 37 additions & 54 deletions src/common/io/filter/group.c
Expand Up @@ -22,7 +22,7 @@ Contains the filter object and inout/output buffers.
***********************************************************************************************************************************/
typedef struct IoFilterData
{
const Buffer *input; // Input buffer for filter
const Buffer **input; // Input buffer for filter
Buffer *inputLocal; // Non-null if a locally created buffer that can be cleared
IoFilter *filter; // Filter to apply
Buffer *output; // Output buffer for filter
Expand All @@ -41,7 +41,7 @@ struct IoFilterGroup
{
MemContext *memContext; // Mem context
List *filterList; // List of filters to apply
unsigned int firstOutputFilter; // Index of the first output filter
const Buffer *input; // Input buffer passed in for processing
KeyValue *filterResult; // Filter results (if any)
bool inputSame; // Same input required again?
bool done; // Is processing done?
Expand Down Expand Up @@ -140,34 +140,33 @@ ioFilterGroupOpen(IoFilterGroup *this)
ioFilterGroupAdd(this, ioBufferNew());

// Create filter input/output buffers. Input filters do not get an output buffer since they don't produce output.
Buffer *lastOutputBuffer = NULL;
Buffer **lastOutputBuffer = NULL;

for (unsigned int filterIdx = 0; filterIdx < lstSize(this->filterList); filterIdx++)
{
IoFilterData *filterData = ioFilterGroupGet(this, filterIdx);

// Assign the last output buffer to the input. At first there won't be an input filter because it will be passed into
// the process function as an input.
if (lastOutputBuffer != NULL)
// If there is no last output buffer yet, then use the input buffer that will be provided by the caller
if (lastOutputBuffer == NULL)
{
filterData->input = lastOutputBuffer;
filterData->inputLocal = lastOutputBuffer;
filterData->input = &this->input;
}

// Is this an output filter?
if (ioFilterOutput(filterData->filter))
// Else assign the last output buffer to the input
else
{
// If this is the first output buffer found, store the index so it can be easily found during processing
if (lastOutputBuffer == NULL)
this->firstOutputFilter = filterIdx;
// This cast is required because the compiler can't guarantee the const-ness of this object, i.e. it could be
// modified in other parts of the code. This is actually expected and the only reason we need this const is to
// match the const-ness of the input buffer provided by the caller.
filterData->input = (const Buffer **)lastOutputBuffer;
filterData->inputLocal = *lastOutputBuffer;
}

// If this is not the last output filter then create a new output buffer for it. The output buffer for the last
// filter will be provided to the process function.
if (filterIdx < lstSize(this->filterList) - 1)
{
lastOutputBuffer = bufNew(ioBufferSize());
filterData->output = lastOutputBuffer;
}
// If this is not the last output filter then create a new output buffer for it. The output buffer for the last filter
// will be provided to the process function.
if (ioFilterOutput(filterData->filter) && filterIdx < lstSize(this->filterList) - 1)
{
filterData->output = bufNew(ioBufferSize());
lastOutputBuffer = &filterData->output;
}
}
}
Expand Down Expand Up @@ -195,6 +194,7 @@ ioFilterGroupProcess(IoFilterGroup *this, const Buffer *input, Buffer *output)

ASSERT(this != NULL);
ASSERT(this->opened && !this->closed);
ASSERT(input == NULL || bufUsed(input) > 0);
ASSERT(!this->flushing || input == NULL);
ASSERT(output != NULL);
ASSERT(bufRemains(output) > 0);
Expand All @@ -205,14 +205,8 @@ ioFilterGroupProcess(IoFilterGroup *this, const Buffer *input, Buffer *output)
this->flushing = true;
#endif

// Assign the input buffer up to the first output filter. After this point the input buffers were locally created during open.
if (!this->inputSame)
{
for (unsigned int filterIdx = 0; filterIdx <= this->firstOutputFilter; filterIdx++)
(ioFilterGroupGet(this, filterIdx))->input = input;
}

// Assign the output buffer
// Assign input and output buffers
this->input = input;
(ioFilterGroupGet(this, lstSize(this->filterList) - 1))->output = output;

//
Expand All @@ -238,7 +232,7 @@ ioFilterGroupProcess(IoFilterGroup *this, const Buffer *input, Buffer *output)
break;
}
}
while (filterIdx != this->firstOutputFilter);
while (filterIdx != 0);

// If no filter is found that needs the same input that means we are done with the current input. So end the loop and
// get some more input.
Expand All @@ -252,27 +246,13 @@ ioFilterGroupProcess(IoFilterGroup *this, const Buffer *input, Buffer *output)
{
IoFilterData *filterData = ioFilterGroupGet(this, filterIdx);

// If the filter produces output
if (ioFilterOutput(filterData->filter))
// Process the filter if it is not done
if (!ioFilterDone(filterData->filter))
{
// Keep processing while the filter is not done or there is input
if (!ioFilterDone(filterData->filter) || filterData->input != NULL)
// If the filter produces output
if (ioFilterOutput(filterData->filter))
{
// If we are flushing and the prior filter is done and is not producing any more output then this filter should
// be flushing as well. Set filterData->input = NULL so it knows there is no more input coming.
//
// If the filter is already done then there is no need to set input to NULL because it has already flushed and
// the filter shouldn't need to hand NULL input if it doesn't need it to know when to flush.
//
// Checking filterIdx - 1 is safe because the first filter's filterData->input is always set to NULL when input
// is NULL.
if (input == NULL && filterData->input != NULL && !ioFilterDone(filterData->filter) &&
bufUsed(filterData->input) == 0)
{
filterData->input = NULL;
}

ioFilterProcessInOut(filterData->filter, filterData->input, filterData->output);
ioFilterProcessInOut(filterData->filter, *filterData->input, filterData->output);

// If inputSame is set then the output buffer for this filter is full and it will need to be re-processed with
// the same input once the output buffer is cleared
Expand All @@ -287,14 +267,17 @@ ioFilterGroupProcess(IoFilterGroup *this, const Buffer *input, Buffer *output)

// If the output buffer is not full and the filter is not done then more data is required
if (!bufFull(filterData->output) && !ioFilterDone(filterData->filter))
{
break;
}
}
// Else the filter does not produce output
else
ioFilterProcessIn(filterData->filter, *filterData->input);
}
// Else the filter does not produce output. No need to flush these filters because they don't buffer data.
else if (filterData->input != NULL)
ioFilterProcessIn(filterData->filter, filterData->input);

// If the filter is done and has no more output then null the output buffer. Downstream filters have a pointer to this
// buffer so their inputs will also change to null and they'll flush.
if (filterData->output != NULL && ioFilterDone(filterData->filter) && bufUsed(filterData->output) == 0)
filterData->output = NULL;
}
}
while (!bufFull(output) && this->inputSame);
Expand Down
3 changes: 1 addition & 2 deletions src/common/io/filter/size.c
Expand Up @@ -15,8 +15,7 @@ IO Size Filter
/***********************************************************************************************************************************
Filter type constant
***********************************************************************************************************************************/
#define SIZE_FILTER_TYPE "size"
STRING_STATIC(SIZE_FILTER_TYPE_STR, SIZE_FILTER_TYPE);
STRING_EXTERN(SIZE_FILTER_TYPE_STR, SIZE_FILTER_TYPE);

/***********************************************************************************************************************************
Object type
Expand Down
6 changes: 6 additions & 0 deletions src/common/io/filter/size.h
Expand Up @@ -9,6 +9,12 @@ in a FilterGroup with IoWrite.

#include "common/io/filter/filter.h"

/***********************************************************************************************************************************
Filter type constant
***********************************************************************************************************************************/
#define SIZE_FILTER_TYPE "size"
STRING_DECLARE(SIZE_FILTER_TYPE_STR);

/***********************************************************************************************************************************
Constructor
***********************************************************************************************************************************/
Expand Down
6 changes: 4 additions & 2 deletions test/src/module/common/ioTest.c
Expand Up @@ -290,7 +290,7 @@ testRun(void)
IoFilter *sizeFilter = ioSizeNew();
TEST_RESULT_PTR(ioFilterGroupAdd(filterGroup, sizeFilter), filterGroup, " add filter to filter group");
TEST_RESULT_VOID(
ioFilterGroupAdd(filterGroup, ioTestFilterMultiplyNew("double", 2, 1, 'X')), " add filter to filter group");
ioFilterGroupAdd(filterGroup, ioTestFilterMultiplyNew("double", 2, 3, 'X')), " add filter to filter group");
TEST_RESULT_VOID(ioFilterGroupAdd(filterGroup, ioSizeNew()), " add filter to filter group");
IoFilter *bufferFilter = ioBufferNew();
TEST_RESULT_VOID(ioFilterGroupAdd(filterGroup, bufferFilter), " add filter to filter group");
Expand All @@ -314,6 +314,8 @@ testRun(void)
TEST_RESULT_STR(strPtr(strNewBuf(buffer)), "33X", " check read");

TEST_RESULT_VOID(bufUsedZero(buffer), " zero buffer");
TEST_RESULT_SIZE(ioRead(bufferRead, buffer), 2, " read 2 bytes");
TEST_RESULT_STR(strPtr(strNewBuf(buffer)), "XX", " check read");
TEST_RESULT_BOOL(ioReadEof(bufferRead), true, " eof");
TEST_RESULT_BOOL(ioBufferRead(ioReadDriver(bufferRead), buffer, true), 0, " eof from driver");
TEST_RESULT_SIZE(ioRead(bufferRead, buffer), 0, " read 0 bytes");
Expand All @@ -325,7 +327,7 @@ testRun(void)
" check filter result");
TEST_RESULT_PTR(ioFilterGroupResult(filterGroup, strNew("double")), NULL, " check filter result is NULL");
TEST_RESULT_UINT(
varUInt64(varLstGet(varVarLst(ioFilterGroupResult(filterGroup, ioFilterType(sizeFilter))), 1)), 7,
varUInt64(varLstGet(varVarLst(ioFilterGroupResult(filterGroup, ioFilterType(sizeFilter))), 1)), 9,
" check filter result");

TEST_RESULT_PTR(ioFilterDriver(bufferFilter), bufferFilter->driver, " check filter driver");
Expand Down

0 comments on commit f0f105d

Please sign in to comment.