Skip to content

Commit

Permalink
Merge abc06cb into acd2f4a
Browse files Browse the repository at this point in the history
  • Loading branch information
jpfr committed Jan 13, 2019
2 parents acd2f4a + abc06cb commit 000f327
Showing 1 changed file with 136 additions and 135 deletions.
271 changes: 136 additions & 135 deletions src/pubsub/ua_pubsub.c
Expand Up @@ -3,6 +3,7 @@
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*
* Copyright (c) 2017-2018 Fraunhofer IOSB (Author: Andreas Ebner)
* Copyright (c) 2019 Fraunhofer IOSB (Author: Julius Pfrommer)
*/

#include "server/ua_server_internal.h"
Expand All @@ -19,6 +20,8 @@
#include "ua_pubsub_ns0.h"
#endif

#define UA_MAX_STACKBUF 512 /* Max size of network messages on the stack */

/* Forward declaration */
static void
UA_WriterGroup_deleteMembers(UA_Server *server, UA_WriterGroup *writerGroup);
Expand Down Expand Up @@ -477,7 +480,7 @@ UA_Server_updateWriterGroupConfig(UA_Server *server, UA_NodeId writerGroupIdenti
UA_PubSubManager_removeRepeatedPubSubCallback(server, currentWriterGroup->publishCallbackId);
currentWriterGroup->config.publishingInterval = config->publishingInterval;
UA_WriterGroup_addPublishCallback(server, currentWriterGroup);
} else if (currentWriterGroup->config.priority != config->priority) {
} else if(currentWriterGroup->config.priority != config->priority) {
UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
"No or unsupported WriterGroup update.");
}
Expand Down Expand Up @@ -993,178 +996,176 @@ UA_DataSetWriter_generateDataSetMessage(UA_Server *server, UA_DataSetMessage *da
return UA_STATUSCODE_GOOD;
}

/*
* This callback triggers the collection and publish of NetworkMessages and the contained DataSetMessages.
*/
static UA_StatusCode
sendNetworkMessage(UA_PubSubConnection *connection, UA_DataSetMessage *dsm,
UA_UInt16 *writerIds, UA_Byte dsmCount) {
UA_NetworkMessage nm;
memset(&nm, 0, sizeof(UA_NetworkMessage));
nm.version = 1;
nm.networkMessageType = UA_NETWORKMESSAGE_DATASET;
nm.payloadHeaderEnabled = true;

/* Compute the length of the dsm separately for the header */
UA_STACKARRAY(UA_UInt16, dsmLengths, dsmCount);
for(UA_Byte i = 0; i < dsmCount; i++)
dsmLengths[i] = (UA_UInt16)UA_DataSetMessage_calcSizeBinary(&dsm[i]);

nm.payloadHeader.dataSetPayloadHeader.count = dsmCount;
nm.payloadHeader.dataSetPayloadHeader.dataSetWriterIds = writerIds;
nm.payload.dataSetPayload.sizes = dsmLengths;
nm.payload.dataSetPayload.dataSetMessages = dsm;

/* Allocate the buffer. Allocate on the stack if the buffer is small. */
UA_ByteString buf;
size_t msgSize = UA_NetworkMessage_calcSizeBinary(&nm);
size_t stackSize = 1;
if(msgSize <= UA_MAX_STACKBUF)
stackSize = msgSize;
UA_STACKARRAY(UA_Byte, stackBuf, stackSize);
buf.data = stackBuf;
buf.length = msgSize;
UA_StatusCode retval = UA_STATUSCODE_GOOD;
if(msgSize > UA_MAX_STACKBUF) {
retval = UA_ByteString_allocBuffer(&buf, msgSize);
if(retval != UA_STATUSCODE_GOOD)
return retval;
}

/* Encode the message */
UA_Byte *bufPos = buf.data;
memset(bufPos, 0, msgSize);
const UA_Byte *bufEnd = &buf.data[buf.length];
retval = UA_NetworkMessage_encodeBinary(&nm, &bufPos, bufEnd);
if(retval != UA_STATUSCODE_GOOD) {
if(msgSize > UA_MAX_STACKBUF)
UA_ByteString_deleteMembers(&buf);
return retval;
}

/* Send the prepared messages */
retval = connection->channel->send(connection->channel, NULL, &buf);
if(msgSize > UA_MAX_STACKBUF)
UA_ByteString_deleteMembers(&buf);
return retval;
}

/* This callback triggers the collection and publish of NetworkMessages and the
* contained DataSetMessages. */
void
UA_WriterGroup_publishCallback(UA_Server *server, UA_WriterGroup *writerGroup) {
if(!writerGroup) {
UA_LOG_ERROR(&server->config.logger, UA_LOGCATEGORY_SERVER,
"Publish failed. WriterGroup not found");
return;
}

/* Nothing to do? */
if(writerGroup->writersCount <= 0)
return;

/* Binary encoding? */
if(writerGroup->config.encodingMimeType != UA_PUBSUB_ENCODING_UADP) {
UA_LOG_ERROR(&server->config.logger, UA_LOGCATEGORY_SERVER, "Unknown encoding type.");
UA_LOG_ERROR(&server->config.logger, UA_LOGCATEGORY_SERVER,
"Unknown encoding type.");
return;
}
UA_PubSubConnection *connection = UA_PubSubConnection_findConnectionbyId(server, writerGroup->linkedConnection);

/* Find the connection associated with the writer */
UA_PubSubConnection *connection =
UA_PubSubConnection_findConnectionbyId(server, writerGroup->linkedConnection);
if(!connection) {
UA_LOG_ERROR(&server->config.logger, UA_LOGCATEGORY_SERVER,
"Publish failed. PubSubConnection invalid.");
return;
}
//prevent error if the maxEncapsulatedDataSetMessageCount is set to 0->1
writerGroup->config.maxEncapsulatedDataSetMessageCount = (UA_UInt16) (writerGroup->config.maxEncapsulatedDataSetMessageCount == 0 ||
writerGroup->config.maxEncapsulatedDataSetMessageCount > UA_BYTE_MAX
? 1 : writerGroup->config.maxEncapsulatedDataSetMessageCount);

UA_DataSetMessage *dsmStore = (UA_DataSetMessage *) UA_calloc(writerGroup->writersCount, sizeof(UA_DataSetMessage));
if(!dsmStore) {
UA_LOG_ERROR(&server->config.logger, UA_LOGCATEGORY_SERVER,
"DataSetMessage allocation failed");
return;
}
memset(dsmStore, 0, sizeof(UA_DataSetMessage) * writerGroup->writersCount);
//The binary DataSetMessage sizes are part of the payload. Memory is allocated on the stack.
UA_STACKARRAY(UA_UInt16, dsmSizes, writerGroup->writersCount);
/* How many DSM can be sent in one NM? */
UA_Byte maxDSM = (UA_Byte)writerGroup->config.maxEncapsulatedDataSetMessageCount;
if(writerGroup->config.maxEncapsulatedDataSetMessageCount > UA_BYTE_MAX)
maxDSM = UA_BYTE_MAX;
/* If the maxEncapsulatedDataSetMessageCount is set to 0->1 */
if(maxDSM == 0)
maxDSM = 1;

/* It is possible to put several DataSetMessages into one NetworkMessage.
* But only if they do not contain promoted fields. NM with only DSM are
* sent out right away. The others are kept in a buffer for "batching". */
size_t dsmCount = 0;
UA_DataSetWriter *dsw;
UA_STACKARRAY(UA_UInt16, dsWriterIds, writerGroup->writersCount);
memset(dsmSizes, 0, writerGroup->writersCount * sizeof(UA_UInt16));
memset(dsWriterIds, 0, writerGroup->writersCount * sizeof(UA_UInt16));
/*
* Calculate the number of needed NetworkMessages. The previous allocated DataSetMessage array is
* filled from left for combined DSM messages and from the right for single DSM.
* Allocated DSM Array
* +----------------------------+
* |DSM1||DSM2||DSM3||DSM4||DSM5|
* +--+----+-----+-----+-----+--+
* | | | | |
* | | | | |
* +--v----v-----v-----v--+--v--+
* | NM1 || NM2 | NM3 |
* +----------------------+-----+
* NetworkMessages
*/
UA_UInt16 combinedNetworkMessageCount = 0, singleNetworkMessagesCount = 0;
UA_DataSetWriter *tmpDataSetWriter;
LIST_FOREACH(tmpDataSetWriter, &writerGroup->writers, listEntry){
//if promoted fields are contained in the PublishedDataSet, then this DSM must encapsulated in one NM
UA_PublishedDataSet *tmpPublishedDataSet = UA_PublishedDataSet_findPDSbyId(server, tmpDataSetWriter->connectedDataSet);
if(!tmpPublishedDataSet) {
UA_STACKARRAY(UA_DataSetMessage, dsmStore, writerGroup->writersCount);
LIST_FOREACH(dsw, &writerGroup->writers, listEntry) {
/* Find the dataset */
UA_PublishedDataSet *pds =
UA_PublishedDataSet_findPDSbyId(server, dsw->connectedDataSet);
if(!pds) {
UA_LOG_ERROR(&server->config.logger, UA_LOGCATEGORY_SERVER,
"Publish failed. PublishedDataSet not found");
return;
"PubSub Publish: PublishedDataSet not found");
continue;
}
if(tmpPublishedDataSet->promotedFieldsCount > 0) {
if(UA_DataSetWriter_generateDataSetMessage(server, &dsmStore[(writerGroup->writersCount - 1) - singleNetworkMessagesCount],
tmpDataSetWriter) != UA_STATUSCODE_GOOD){

/* If promoted fields are contained in the PublishedDataSet, then this
* DSM must encapsulated in one NM. Send it right away. */
if(pds->promotedFieldsCount > 0 || maxDSM == 1) {
UA_DataSetMessage dsm;
UA_StatusCode res = UA_DataSetWriter_generateDataSetMessage(server, &dsm, dsw);
if(res != UA_STATUSCODE_GOOD) {
UA_LOG_ERROR(&server->config.logger, UA_LOGCATEGORY_SERVER,
"Publish failed. DataSetMessage creation failed");
return;
};
dsWriterIds[(writerGroup->writersCount - 1) - singleNetworkMessagesCount] = tmpDataSetWriter->config.dataSetWriterId;
dsmSizes[(writerGroup->writersCount-1) - singleNetworkMessagesCount] = (UA_UInt16) UA_DataSetMessage_calcSizeBinary(&dsmStore[(writerGroup->writersCount-1)
- singleNetworkMessagesCount]);
singleNetworkMessagesCount++;
} else {
if(UA_DataSetWriter_generateDataSetMessage(server, &dsmStore[combinedNetworkMessageCount], tmpDataSetWriter) != UA_STATUSCODE_GOOD){
"PubSub Publish: DataSetMessage creation failed");
continue;
}

res = sendNetworkMessage(connection, &dsm, &dsw->config.dataSetWriterId, 1);
if(res != UA_STATUSCODE_GOOD)
UA_LOG_ERROR(&server->config.logger, UA_LOGCATEGORY_SERVER,
"Publish failed. DataSetMessage creation failed");
return;
};
dsWriterIds[combinedNetworkMessageCount] = tmpDataSetWriter->config.dataSetWriterId;
dsmSizes[combinedNetworkMessageCount] = (UA_UInt16) UA_DataSetMessage_calcSizeBinary(&dsmStore[combinedNetworkMessageCount]);
combinedNetworkMessageCount++;
"PubSub Publish: Could not send a NetworkMessage");
UA_DataSetMessage_free(&dsm);
continue;
}
}
UA_UInt32 networkMessageCount = singleNetworkMessagesCount;
if(combinedNetworkMessageCount != 0){
combinedNetworkMessageCount = (UA_UInt16) (
combinedNetworkMessageCount / writerGroup->config.maxEncapsulatedDataSetMessageCount +
(combinedNetworkMessageCount % writerGroup->config.maxEncapsulatedDataSetMessageCount) == 0 ? 0 : 1);
networkMessageCount += combinedNetworkMessageCount;
}
if(networkMessageCount < 1){
for(size_t i = 0; i < writerGroup->writersCount; i++){
UA_DataSetMessage_free(&dsmStore[i]);

UA_StatusCode res2 =
UA_DataSetWriter_generateDataSetMessage(server, &dsmStore[dsmCount], dsw);
if(res2 != UA_STATUSCODE_GOOD) {
UA_LOG_ERROR(&server->config.logger, UA_LOGCATEGORY_SERVER,
"PubSub Publish: DataSetMessage creation failed");
continue;
}
UA_free(dsmStore);
return;

dsWriterIds[dsmCount] = dsw->config.dataSetWriterId;
dsmCount++;
}

//Alloc memory for the NetworkMessages on the stack
UA_STACKARRAY(UA_NetworkMessage, nmStore, networkMessageCount);
memset(nmStore, 0, networkMessageCount * sizeof(UA_NetworkMessage));
UA_UInt32 currentDSMPosition = 0;
for(UA_UInt32 i = 0; i < networkMessageCount; i++) {
nmStore[i].version = 1;
nmStore[i].networkMessageType = UA_NETWORKMESSAGE_DATASET;
nmStore[i].payloadHeaderEnabled = true;
//create combined NetworkMessages
if(i < (networkMessageCount-singleNetworkMessagesCount)){
if(combinedNetworkMessageCount - (i * writerGroup->config.maxEncapsulatedDataSetMessageCount)){
if(combinedNetworkMessageCount == 1){
nmStore[i].payloadHeader.dataSetPayloadHeader.count = (UA_Byte) ((writerGroup->writersCount) - singleNetworkMessagesCount);
currentDSMPosition = 0;
} else {
nmStore[i].payloadHeader.dataSetPayloadHeader.count = (UA_Byte) writerGroup->config.maxEncapsulatedDataSetMessageCount;
currentDSMPosition = i * writerGroup->config.maxEncapsulatedDataSetMessageCount;
}
//nmStore[i].payloadHeader.dataSetPayloadHeader.count = (UA_Byte) writerGroup->config.maxEncapsulatedDataSetMessageCount;
nmStore[i].payload.dataSetPayload.dataSetMessages = &dsmStore[currentDSMPosition];
nmStore->payload.dataSetPayload.sizes = &dsmSizes[currentDSMPosition];
nmStore->payloadHeader.dataSetPayloadHeader.dataSetWriterIds = &dsWriterIds[currentDSMPosition];
} else {
currentDSMPosition = i * writerGroup->config.maxEncapsulatedDataSetMessageCount;
nmStore[i].payloadHeader.dataSetPayloadHeader.count = (UA_Byte) (currentDSMPosition - ((i - 1) * writerGroup->config.maxEncapsulatedDataSetMessageCount)); //attention cast from uint32 to byte
nmStore[i].payload.dataSetPayload.dataSetMessages = &dsmStore[currentDSMPosition];
nmStore->payload.dataSetPayload.sizes = &dsmSizes[currentDSMPosition];
nmStore->payloadHeader.dataSetPayloadHeader.dataSetWriterIds = &dsWriterIds[currentDSMPosition];
}
} else {///create single NetworkMessages (1 DSM per NM)
nmStore[i].payloadHeader.dataSetPayloadHeader.count = 1;
currentDSMPosition = (UA_UInt32) combinedNetworkMessageCount + (i - combinedNetworkMessageCount/writerGroup->config.maxEncapsulatedDataSetMessageCount
+ (combinedNetworkMessageCount % writerGroup->config.maxEncapsulatedDataSetMessageCount) == 0 ? 0 : 1);
nmStore[i].payload.dataSetPayload.dataSetMessages = &dsmStore[currentDSMPosition];
nmStore->payload.dataSetPayload.sizes = &dsmSizes[currentDSMPosition];
nmStore->payloadHeader.dataSetPayloadHeader.dataSetWriterIds = &dsWriterIds[currentDSMPosition];
}
//send the prepared messages
UA_ByteString buf;
size_t msgSize = UA_NetworkMessage_calcSizeBinary(&nmStore[i]);
if(UA_ByteString_allocBuffer(&buf, msgSize) == UA_STATUSCODE_GOOD) {
UA_Byte *bufPos = buf.data;
memset(bufPos, 0, msgSize);
const UA_Byte *bufEnd = &(buf.data[buf.length]);
if(UA_NetworkMessage_encodeBinary(&nmStore[i], &bufPos, bufEnd) != UA_STATUSCODE_GOOD){
UA_ByteString_deleteMembers(&buf);
return;
};
connection->channel->send(connection->channel, NULL, &buf);
}
//The stack allocated sizes and dataSetWriterIds field must be set to NULL to prevent invalid free.
nmStore[i].payload.dataSetPayload.sizes = NULL;
nmStore->payloadHeader.dataSetPayloadHeader.dataSetWriterIds = NULL;
UA_ByteString_deleteMembers(&buf);
UA_NetworkMessage_deleteMembers(&nmStore[i]);
/* Send the NetworkMessages with batched DataSetMessages */
size_t nmCount = (dsmCount / maxDSM) + ((dsmCount % maxDSM) == 0 ? 0 : 1);
for(UA_UInt32 i = 0; i < nmCount; i++) {
UA_Byte nmDsmCount = maxDSM;
if(i == nmCount - 1)
nmDsmCount = (UA_Byte)dsmCount % maxDSM;
UA_StatusCode res3 = sendNetworkMessage(connection, &dsmStore[i * maxDSM],
&dsWriterIds[i * maxDSM], nmDsmCount);
if(res3 != UA_STATUSCODE_GOOD)
UA_LOG_ERROR(&server->config.logger, UA_LOGCATEGORY_SERVER,
"PubSub Publish: Sending a NetworkMessage failed");
}

/* Clean up DSM */
for(size_t i = 0; i < dsmCount; i++)
UA_DataSetMessage_free(&dsmStore[i]);
}

/*
* Add new publishCallback. The first execution is triggered directly after creation.
* @Warning - The duration (double) is currently casted to int. -> intervals smaller 1ms are not possible.
*/
/* Add new publishCallback. The first execution is triggered directly after
* creation. */
UA_StatusCode
UA_WriterGroup_addPublishCallback(UA_Server *server, UA_WriterGroup *writerGroup) {
UA_StatusCode retval =
UA_PubSubManager_addRepeatedCallback(server, (UA_ServerCallback) UA_WriterGroup_publishCallback,
UA_PubSubManager_addRepeatedCallback(server,
(UA_ServerCallback) UA_WriterGroup_publishCallback,
writerGroup, writerGroup->config.publishingInterval,
&writerGroup->publishCallbackId);
if(retval == UA_STATUSCODE_GOOD)
writerGroup->publishCallbackIsRegistered = true;
//run once after creation

/* Run once after creation */
UA_WriterGroup_publishCallback(server, writerGroup);
return retval;
}
Expand Down

0 comments on commit 000f327

Please sign in to comment.