Skip to content
Permalink
Browse files

Only process next filter in IoFilterGroup when input buffer is full o…

…r flushing.

This greatly reduces calls to filter processing, which is a performance benefit, but also makes the trace logs smaller and easier to read.

However, this means that ioWriteFlush() will no longer work with filters since a full flush of IoFilterGroup would require an expensive reset.  Currently ioWriteFlush() is not used in this scenario so for now just add an assert to ensure it stays that way.
  • Loading branch information...
dwsteele committed Apr 20, 2019
1 parent e513c52 commit e7255be108b6b85bb736934b19abdef32cbd39e8
Showing with 23 additions and 15 deletions.
  1. +4 −0 doc/xml/release.xml
  2. +10 −9 src/common/io/filter/group.c
  3. +7 −0 src/common/io/write.c
  4. +2 −6 test/src/module/common/ioTest.c
@@ -21,6 +21,10 @@
</release-improvement-list>

<release-development-list>
<release-item>
<p>Only process next filter in <code>IoFilterGroup</code> when input buffer is full or flushing.</p>
</release-item>

<release-item>
<p>Add macros to create constant <code>Buffer</code> objects.</p>
</release-item>
@@ -262,28 +262,29 @@ ioFilterGroupProcess(IoFilterGroup *this, const Buffer *input, Buffer *output)
// Checking filterIdx - 1 is safe because the first filter's filterData->input is always set to NULL when input
// is NULL.
if (input == NULL && filterData->input != NULL && !ioFilterDone(filterData->filter) &&
ioFilterDone(ioFilterGroupGet(this, filterIdx - 1)->filter) && bufUsed(filterData->input) == 0)
bufUsed(filterData->input) == 0)
{
filterData->input = NULL;
}

ioFilterProcessInOut(filterData->filter, filterData->input, filterData->output);

// If there was no output then break out of filter processing because more input is needed
if (bufUsed(filterData->output) == 0)
{
break;
}
// Else if inputSame is set then the output buffer for this filter is full and it will need to be re-processed
// with the same input once the output buffer is cleared
else if (ioFilterInputSame(filterData->filter))
// If inputSame is set then the output buffer for this filter is full and it will need to be re-processed with
// the same input once the output buffer is cleared
if (ioFilterInputSame(filterData->filter))
{
this->inputSame = true;
}
// Else clear the buffer if it was locally allocated. If the input buffer was passed in then the caller is
// responsible for clearing it.
else if (filterData->inputLocal != NULL)
bufUsedZero(filterData->inputLocal);

// If the output buffer is not full and the filter is not done then more data is required
if (!bufFull(filterData->output) && !ioFilterDone(filterData->filter))
{
break;
}
}
}
// Else the filter does not produce output. No need to flush these filters because they don't buffer data.
@@ -21,6 +21,7 @@ struct IoWrite
Buffer *output; // Output buffer

#ifdef DEBUG
bool filterGroupSet; // Was an IoFilterGroup set?
bool opened; // Has the io been opened?
bool closed; // Has the io been closed?
#endif
@@ -153,6 +154,7 @@ ioWriteFlush(IoWrite *this)

ASSERT(this != NULL);
ASSERT(this->opened && !this->closed);
ASSERT(!this->filterGroupSet);

if (bufUsed(this->output) > 0)
{
@@ -235,6 +237,11 @@ ioWriteFilterGroupSet(IoWrite *this, IoFilterGroup *filterGroup)
ASSERT(this->filterGroup == NULL);
ASSERT(!this->opened && !this->closed);

// Track whether a filter group was set to prevent flush() from being called later
#ifdef DEBUG
this->filterGroupSet = true;
#endif

this->filterGroup = filterGroup;

FUNCTION_LOG_RETURN_VOID();
@@ -460,13 +460,8 @@ testRun(void)

TEST_RESULT_VOID(ioWrite(ioBufferWriteIo(bufferWrite), BUFSTRDEF("Z")), " write string");
TEST_RESULT_STR(strPtr(strNewBuf(buffer)), "AABB\n\n", " no change because output buffer is not full");
TEST_RESULT_VOID(ioWriteFlush(ioBufferWriteIo(bufferWrite)), " flush output");
TEST_RESULT_STR(strPtr(strNewBuf(buffer)), "AABB\n\nZZ", " check output is flushed");
TEST_RESULT_VOID(ioWriteFlush(ioBufferWriteIo(bufferWrite)), " flush again (nothing to flush)");
TEST_RESULT_STR(strPtr(strNewBuf(buffer)), "AABB\n\nZZ", " check output is unchanged");

TEST_RESULT_VOID(ioWrite(ioBufferWriteIo(bufferWrite), BUFSTRDEF("12345")), " write bytes");
TEST_RESULT_STR(strPtr(strNewBuf(buffer)), "AABB\n\nZZ112233445", " check write");
TEST_RESULT_STR(strPtr(strNewBuf(buffer)), "AABB\n\nZZ1122334455", " check write");

TEST_RESULT_VOID(ioWriteClose(ioBufferWriteIo(bufferWrite)), " close buffer write object");
TEST_RESULT_STR(strPtr(strNewBuf(buffer)), "AABB\n\nZZ1122334455XXXY", " check write after close");
@@ -506,6 +501,7 @@ testRun(void)
// Write a line to be read
TEST_RESULT_VOID(ioWriteLine(ioHandleWriteIo(write), strNew("test string 1")), "write test string");
ioWriteFlush(ioHandleWriteIo(write));
ioWriteFlush(ioHandleWriteIo(write));

// Sleep so the other side will timeout
const Buffer *buffer = BUFSTRDEF("12345678");

0 comments on commit e7255be

Please sign in to comment.
You can’t perform that action at this time.