Skip to content

Commit

Permalink
Merge 1043568 into 7ea5a14
Browse files Browse the repository at this point in the history
  • Loading branch information
jpfr committed Feb 27, 2019
2 parents 7ea5a14 + 1043568 commit 735f1f1
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 23 deletions.
5 changes: 3 additions & 2 deletions arch/ua_network_tcp.c
Expand Up @@ -570,11 +570,12 @@ ClientNetworkLayerTCP_close(UA_Connection *connection) {

static void
ClientNetworkLayerTCP_free(UA_Connection *connection) {
if (connection->handle){
if(connection->handle) {
TCPClientConnection *tcpConnection = (TCPClientConnection *)connection->handle;
if(tcpConnection->server)
UA_freeaddrinfo(tcpConnection->server);
UA_freeaddrinfo(tcpConnection->server);
UA_free(tcpConnection);
connection->handle = NULL;
}
}

Expand Down
14 changes: 7 additions & 7 deletions src/client/ua_client_connect_async.c
Expand Up @@ -315,15 +315,15 @@ responseActivateSession(UA_Client *client, void *userdata, UA_UInt32 requestId,
setClientState(client, UA_CLIENTSTATE_SESSION);

#ifdef UA_ENABLE_SUBSCRIPTIONS
/* A new session has been created. We need to clean up the subscriptions */
UA_Client_Subscriptions_clean(client);
/* A new session has been created. We need to clean up the subscriptions */
UA_Client_Subscriptions_clean(client);
#endif

/* call onConnect (client_async.c) callback */
AsyncServiceCall ac = client->asyncConnectCall;

ac.callback(client, ac.userdata, requestId + 1,
&activateResponse->responseHeader.serviceResult);
/* Call onConnect (client_async.c) callback */
if(client->asyncConnectCall.callback)
client->asyncConnectCall.callback(client, client->asyncConnectCall.userdata,
requestId + 1,
&activateResponse->responseHeader.serviceResult);
}

static UA_StatusCode
Expand Down
29 changes: 25 additions & 4 deletions src/ua_connection.c
Expand Up @@ -95,6 +95,7 @@ UA_Connection_sendError(UA_Connection *connection, UA_TcpErrorMessage *error) {
static UA_StatusCode
bufferIncompleteChunk(UA_Connection *connection, const UA_Byte *pos,
const UA_Byte *end) {
UA_assert(connection->incompleteChunk.length == 0);
UA_assert(pos < end);
size_t length = (uintptr_t)end - (uintptr_t)pos;
UA_StatusCode retval = UA_ByteString_allocBuffer(&connection->incompleteChunk, length);
Expand Down Expand Up @@ -160,24 +161,44 @@ UA_StatusCode
UA_Connection_processChunks(UA_Connection *connection, void *application,
UA_Connection_processChunk processCallback,
const UA_ByteString *packet) {
/* The connection has already prepended any incomplete chunk during recv */
const UA_Byte *pos = packet->data;
const UA_Byte *end = &packet->data[packet->length];
UA_ByteString appended = connection->incompleteChunk;

/* Prepend the incomplete last chunk. This is usually done in the
* networklayer. But we test for a buffered incomplete chunk here again to
* work around "lazy" network layers. */
if(appended.length > 0) {
connection->incompleteChunk = UA_BYTESTRING_NULL;
UA_Byte *t = (UA_Byte*)UA_realloc(appended.data, appended.length + packet->length);
if(!t) {
UA_ByteString_deleteMembers(&appended);
return UA_STATUSCODE_BADOUTOFMEMORY;
}
memcpy(&t[appended.length], pos, packet->length);
appended.data = t;
appended.length += packet->length;
pos = t;
end = &t[appended.length];
}

UA_assert(connection->incompleteChunk.length == 0);

/* Loop over the received chunks. pos is increased with each chunk. */
const UA_Byte *pos = packet->data;
const UA_Byte *end = &packet->data[packet->length];
UA_Boolean done = false;
UA_StatusCode retval = UA_STATUSCODE_GOOD;
while(!done) {
retval = processChunk(connection, application, processCallback, &pos, end, &done);
/* If an irrecoverable error happens: do not buffer incomplete chunk */
if(retval != UA_STATUSCODE_GOOD)
return retval;
goto cleanup;
}

if(end > pos)
retval = bufferIncompleteChunk(connection, pos, end);

cleanup:
UA_ByteString_deleteMembers(&appended);
return retval;
}

Expand Down
19 changes: 9 additions & 10 deletions src/ua_connection_internal.h
Expand Up @@ -28,22 +28,21 @@ typedef UA_StatusCode (*UA_Connection_processChunk)(void *application,
UA_Connection *connection,
UA_ByteString *chunk);

/* The network layer may receive chopped up messages since TCP is a streaming
* protocol. This method calls the processChunk callback on all full chunks that
* were received. Dangling half-complete chunks are buffered in the connection
* and considered for the next received packet.
/* The network layer may receive several chunks in one packet since TCP is a
* streaming protocol. The last chunk in the packet may be only partial. This
* method calls the processChunk callback on all full chunks that were received.
* The last incomplete chunk is buffered in the connection for the next
* iteration.
*
* If an entire chunk is received, it is forwarded directly. But the memory
* needs to be freed with the networklayer-specific mechanism. If a half message
* is received, we copy it into a local buffer. Then, the stack-specific free
* needs to be used.
* The packet itself is not edited in this method. But possibly in the callback
* that is executed on complete chunks.
*
* @param connection The connection
* @param application The client or server application
* @param processCallback The function pointer for processing each chunk
* @param packet The received packet.
* @return Returns UA_STATUSCODE_GOOD or an error code. When an error occurs,
* the ingoing message and the current buffer in the connection are
* the current buffer in the connection are
* freed. */
UA_StatusCode
UA_Connection_processChunks(UA_Connection *connection, void *application,
Expand All @@ -66,7 +65,7 @@ UA_Connection_receiveChunksBlocking(UA_Connection *connection, void *application

UA_StatusCode
UA_Connection_receiveChunksNonBlocking(UA_Connection *connection, void *application,
UA_Connection_processChunk processCallback);
UA_Connection_processChunk processCallback);

/* When a fatal error occurs the Server shall send an Error Message to the
* Client and close the socket. When a Client encounters one of these errors, it
Expand Down

0 comments on commit 735f1f1

Please sign in to comment.