Skip to content

Commit

Permalink
Improve performance of non-blocking reads by using maximum buffer size.
Browse files Browse the repository at this point in the history
Since the introduction of blocking read drivers (e.g. IoHandleRead, TlsClient) the non-blocking drivers have used the same rules for determining maximum buffer size, i.e. read only as much as requested.  This is necessary so the blocking drivers don't get stuck waiting for data that might not be coming.

Instead mark blocking drivers so IoRead knows how much buffer to allow for the read.  The non-blocking drivers can now request the maximum number of bytes allowed by buffer-size.
  • Loading branch information
dwsteele committed Apr 19, 2019
1 parent 0c866f5 commit c916802
Show file tree
Hide file tree
Showing 8 changed files with 34 additions and 6 deletions.
6 changes: 6 additions & 0 deletions doc/xml/release.xml
Expand Up @@ -14,6 +14,12 @@
<release-list> <release-list>
<release date="XXXX-XX-XX" version="2.14dev" title="UNDER DEVELOPMENT"> <release date="XXXX-XX-XX" version="2.14dev" title="UNDER DEVELOPMENT">
<release-core-list> <release-core-list>
<release-improvement-list>
<release-item>
<p>Improve performance of non-blocking reads by using maximum buffer size.</p>
</release-item>
</release-improvement-list>

<release-development-list> <release-development-list>
<release-item> <release-item>
<p>Add <code>unsigned int</code> <code>Variant</code> type and update code to use it.</p> <p>Add <code>unsigned int</code> <code>Variant</code> type and update code to use it.</p>
Expand Down
2 changes: 1 addition & 1 deletion src/common/exec.c
Expand Up @@ -174,7 +174,7 @@ execOpen(Exec *this)


// Create wrapper interfaces that check process state // Create wrapper interfaces that check process state
this->ioReadExec = ioReadNewP( this->ioReadExec = ioReadNewP(
this, .read = (IoReadInterfaceRead)execRead, .eof = (IoReadInterfaceEof)execEof, this, .block = true, .read = (IoReadInterfaceRead)execRead, .eof = (IoReadInterfaceEof)execEof,
.handle = (IoReadInterfaceHandle)execHandleRead); .handle = (IoReadInterfaceHandle)execHandleRead);
ioReadOpen(this->ioReadExec); ioReadOpen(this->ioReadExec);
this->ioWriteExec = ioWriteNewP(this, .write = (IoWriteInterfaceWrite)execWrite); this->ioWriteExec = ioWriteNewP(this, .write = (IoWriteInterfaceWrite)execWrite);
Expand Down
2 changes: 1 addition & 1 deletion src/common/io/handleRead.c
Expand Up @@ -42,7 +42,7 @@ ioHandleReadNew(const String *name, int handle, TimeMSec timeout)
this = memNew(sizeof(IoHandleRead)); this = memNew(sizeof(IoHandleRead));
this->memContext = memContextCurrent(); this->memContext = memContextCurrent();
this->io = ioReadNewP( this->io = ioReadNewP(
this, .eof = (IoReadInterfaceEof)ioHandleReadEof, .handle = (IoReadInterfaceHandle)ioHandleReadHandle, this, .block = true, .eof = (IoReadInterfaceEof)ioHandleReadEof, .handle = (IoReadInterfaceHandle)ioHandleReadHandle,
.read = (IoReadInterfaceRead)ioHandleRead); .read = (IoReadInterfaceRead)ioHandleRead);
this->name = strDup(name); this->name = strDup(name);
this->handle = handle; this->handle = handle;
Expand Down
18 changes: 17 additions & 1 deletion src/common/io/read.c
Expand Up @@ -144,7 +144,7 @@ ioReadInternal(IoRead *this, Buffer *buffer, bool block)
bufUsedZero(this->input); bufUsedZero(this->input);


// If blocking then limit the amount of data requested // If blocking then limit the amount of data requested
if (block && bufRemains(this->input) > bufRemains(buffer)) if (ioReadBlock(this) && bufRemains(this->input) > bufRemains(buffer))
bufLimitSet(this->input, bufRemains(buffer)); bufLimitSet(this->input, bufRemains(buffer));


this->interface.read(this->driver, this->input, block); this->interface.read(this->driver, this->input, block);
Expand Down Expand Up @@ -305,6 +305,21 @@ ioReadClose(IoRead *this)
FUNCTION_LOG_RETURN_VOID(); FUNCTION_LOG_RETURN_VOID();
} }


/***********************************************************************************************************************************
Do reads block when more bytes are requested than are available to read?
***********************************************************************************************************************************/
bool
ioReadBlock(const IoRead *this)
{
FUNCTION_TEST_BEGIN();
FUNCTION_TEST_PARAM(IO_READ, this);
FUNCTION_TEST_END();

ASSERT(this != NULL);

FUNCTION_TEST_RETURN(this->interface.block);
}

/*********************************************************************************************************************************** /***********************************************************************************************************************************
Is IO at EOF? Is IO at EOF?
Expand Down Expand Up @@ -352,6 +367,7 @@ ioReadFilterGroupSet(IoRead *this, IoFilterGroup *filterGroup)
ASSERT(filterGroup != NULL); ASSERT(filterGroup != NULL);
ASSERT(this->filterGroup == NULL); ASSERT(this->filterGroup == NULL);
ASSERT(!this->opened && !this->closed); ASSERT(!this->opened && !this->closed);
ASSERT(!ioReadBlock(this));


this->filterGroup = filterGroup; this->filterGroup = filterGroup;


Expand Down
1 change: 1 addition & 0 deletions src/common/io/read.h
Expand Up @@ -28,6 +28,7 @@ void ioReadClose(IoRead *this);
/*********************************************************************************************************************************** /***********************************************************************************************************************************
Getters/Setters Getters/Setters
***********************************************************************************************************************************/ ***********************************************************************************************************************************/
bool ioReadBlock(const IoRead *this);
bool ioReadEof(const IoRead *this); bool ioReadEof(const IoRead *this);
const IoFilterGroup *ioReadFilterGroup(const IoRead *this); const IoFilterGroup *ioReadFilterGroup(const IoRead *this);
void ioReadFilterGroupSet(IoRead *this, IoFilterGroup *filterGroup); void ioReadFilterGroupSet(IoRead *this, IoFilterGroup *filterGroup);
Expand Down
1 change: 1 addition & 0 deletions src/common/io/read.intern.h
Expand Up @@ -17,6 +17,7 @@ typedef size_t (*IoReadInterfaceRead)(void *driver, Buffer *buffer, bool block);


typedef struct IoReadInterface typedef struct IoReadInterface
{ {
bool block; // Do reads block when buffer is larger than available bytes?
IoReadInterfaceEof eof; IoReadInterfaceEof eof;
IoReadInterfaceClose close; IoReadInterfaceClose close;
IoReadInterfaceHandle handle; IoReadInterfaceHandle handle;
Expand Down
3 changes: 2 additions & 1 deletion src/common/io/tls/client.c
Expand Up @@ -414,7 +414,8 @@ tlsClientOpen(TlsClient *this)
// Create read and write interfaces // Create read and write interfaces
this->write = ioWriteNewP(this, .write = (IoWriteInterfaceWrite)tlsClientWrite); this->write = ioWriteNewP(this, .write = (IoWriteInterfaceWrite)tlsClientWrite);
ioWriteOpen(this->write); ioWriteOpen(this->write);
this->read = ioReadNewP(this, .eof = (IoReadInterfaceEof)tlsClientEof, .read = (IoReadInterfaceRead)tlsClientRead); this->read = ioReadNewP(
this, .block = true, .eof = (IoReadInterfaceEof)tlsClientEof, .read = (IoReadInterfaceRead)tlsClientRead);
ioReadOpen(this->read); ioReadOpen(this->read);
} }
MEM_CONTEXT_END(); MEM_CONTEXT_END();
Expand Down
7 changes: 5 additions & 2 deletions test/src/module/common/ioTest.c
Expand Up @@ -551,10 +551,13 @@ testRun(void)
TEST_ERROR(ioRead(ioHandleReadIo(read), buffer), FileReadError, "unable to read data from read test after 1000ms"); TEST_ERROR(ioRead(ioHandleReadIo(read), buffer), FileReadError, "unable to read data from read test after 1000ms");
TEST_RESULT_UINT(bufSize(buffer), 16, "buffer is only partially read"); TEST_RESULT_UINT(bufSize(buffer), 16, "buffer is only partially read");


// Read a buffer that is transmitted in two parts // Read a buffer that is transmitted in two parts with blocking on the read side
buffer = bufNew(16); buffer = bufNew(16);
bufLimitSet(buffer, 12);


TEST_RESULT_UINT(ioRead(ioHandleReadIo(read), buffer), 16, "read buffer"); TEST_RESULT_UINT(ioRead(ioHandleReadIo(read), buffer), 12, "read buffer");
bufLimitClear(buffer);
TEST_RESULT_UINT(ioRead(ioHandleReadIo(read), buffer), 4, "read buffer");
TEST_RESULT_STR(strPtr(strNewBuf(buffer)), "1234567812345678", "check buffer"); TEST_RESULT_STR(strPtr(strNewBuf(buffer)), "1234567812345678", "check buffer");


// Check EOF // Check EOF
Expand Down

0 comments on commit c916802

Please sign in to comment.