-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
pubsub_TSN_loopback_single_thread.c
1408 lines (1253 loc) · 64.1 KB
/
pubsub_TSN_loopback_single_thread.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/* This work is licensed under a Creative Commons CCZero 1.0 Universal License.
* See http://creativecommons.org/publicdomain/zero/1.0/ for more information. */
/**
* .. _pubsub-tutorial:
*
* Realtime Loopback Example
* ------------------------
*
* This tutorial shows publishing and subscribing information in Realtime.
* This example has both Publisher and Subscriber and userapplication handling in the same thread. This example
* receives the data from the publisher application (pubsub_TSN_publisher_mulltiple thread application) and process the
* received data and send them back to the publisher application
* Another additional feature called the Blocking Socket is employed in the Subscriber thread. When using Blocking Socket,
* the Subscriber thread remains in "blocking mode" until a message is received from every wake up time of the thread. In other words,
* the timeout is overwritten and the thread continuously waits for the message from every wake up time of the thread.
* Once the message is received, the Subscriber thread updates the value in the Information Model, sleeps up to wake up time and
* again waits for the next message. This process is repeated until the application is terminated.
*
* Run step of the example is as mentioned below:
*
* ./bin/examples/pubsub_TSN_loopback_single_thread -interface <interface> -operBaseTime <Basetime> -monotonicOffset <offset>
*
* For more options, run ./bin/examples/pubsub_TSN_loopback_single_thread -h
*/
/**
* Trace point setup
*
* +--------------+ +----------------+
* T1 | OPCUA PubSub | T8 T5 | OPCUA loopback | T4
* | | Application | ^ | | Application | ^
* | +--------------+ | | +----------------+ |
* User | | | | | | | |
* Space | | | | | | | |
* | | | | | | | |
* -----------|--------------|------------------------|----------------|--------
* | | Node 1 | | | | Node 2 | |
* Kernel | | | | | | | |
* Space | | | | | | | |
* | | | | | | | |
* v +--------------+ | v +----------------+ |
* T2 | TX tcpdump | T7<----------------T6 | RX tcpdump | T3
* | +--------------+ +----------------+ ^
* | |
* ----------------------------------------------------------------
*/
#define _GNU_SOURCE
#include <sched.h>
#include <signal.h>
#include <time.h>
#include <stdio.h>
#include <stdlib.h>
#include <linux/types.h>
#include <getopt.h>
/* For thread operations */
#include <pthread.h>
#include <open62541/server.h>
#include <open62541/server_pubsub.h>
#include <open62541/server_config_default.h>
#include <open62541/plugin/log_stdout.h>
#include <open62541/plugin/log.h>
#include <open62541/types_generated.h>
#include "ua_pubsub.h"
/*to find load of each thread
* ps -L -o pid,pri,%cpu -C pubsub_TSN_loopback_single_thread */
/* Configurable Parameters */
//If you disable the below macro then two way communication then only subscriber will be active
#define TWO_WAY_COMMUNICATION
/* Cycle time in milliseconds */
#define DEFAULT_CYCLE_TIME 0.25
/* Qbv offset */
#define DEFAULT_QBV_OFFSET 125
#define DEFAULT_SOCKET_PRIORITY 7
#define PUBLISHER_ID 2235
#define WRITER_GROUP_ID 100
#define DATA_SET_WRITER_ID 62541
#define DEFAULT_PUBLISHING_MAC_ADDRESS "opc.eth://01-00-5E-00-00-01:8.3"
#define DEFAULT_PUBLISHER_MULTICAST_ADDRESS "opc.udp://224.0.0.32:4840/"
#define PUBLISHER_ID_SUB 2234
#define WRITER_GROUP_ID_SUB 101
#define DATA_SET_WRITER_ID_SUB 62541
#define DEFAULT_SUBSCRIBING_MAC_ADDRESS "opc.eth://01-00-5E-7F-00-01:8.3"
#define DEFAULT_SUBSCRIBER_MULTICAST_ADDRESS "opc.udp://224.0.0.22:4840/"
#define REPEATED_NODECOUNTS 2 // Default to publish 64 bytes
#define PORT_NUMBER 62541
#define DEFAULT_PUBSUBAPP_THREAD_PRIORITY 90
#define DEFAULT_PUBSUBAPP_THREAD_CORE 1
/* Non-Configurable Parameters */
/* Milli sec and sec conversion to nano sec */
#define MILLI_SECONDS 1000000
#if defined(__arm__)
#define SECONDS 1e9
#else
#define SECONDS 1000000000
#endif
#define SECONDS_SLEEP 5
#define MAX_MEASUREMENTS 100000
#define SECONDS_INCREMENT 1
#ifndef CLOCK_MONOTONIC
#define CLOCK_MONOTONIC 1
#endif
#define CLOCKID CLOCK_MONOTONIC
#define ETH_TRANSPORT_PROFILE "http://opcfoundation.org/UA-Profile/Transport/pubsub-eth-uadp"
#define UDP_TRANSPORT_PROFILE "http://opcfoundation.org/UA-Profile/Transport/pubsub-udp-uadp"
/* If the Hardcoded publisher/subscriber MAC addresses need to be changed,
* change PUBLISHING_MAC_ADDRESS and SUBSCRIBING_MAC_ADDRESS
*/
/* Set server running as true */
UA_Boolean runningServer = UA_TRUE;
char* pubUri = DEFAULT_PUBLISHING_MAC_ADDRESS;
char* subUri = DEFAULT_SUBSCRIBING_MAC_ADDRESS;
static UA_Double cycleTimeInMsec = DEFAULT_CYCLE_TIME;
static UA_Int32 socketPriority = DEFAULT_SOCKET_PRIORITY;
static UA_Int32 pubSubAppPriority = 90;
static UA_Int32 pubSubAppCore = 1;
static UA_Int32 qbvOffset = DEFAULT_QBV_OFFSET;
static UA_Boolean disableSoTxtime = UA_TRUE;
static UA_Boolean enableCsvLog = UA_FALSE;
static UA_Boolean consolePrint = UA_FALSE;
static UA_Boolean signalTerm = UA_FALSE;
#ifdef TWO_WAY_COMMUNICATION
/* Variables corresponding to PubSub connection creation,
* published data set and writer group */
UA_NodeId connectionIdent;
UA_NodeId publishedDataSetIdent;
UA_NodeId writerGroupIdent;
UA_NodeId pubNodeID;
UA_NodeId pubRepeatedCountNodeID;
UA_NodeId runningPubStatusNodeID;
/* Variables for counter data handling in address space */
UA_UInt64 *pubCounterData = NULL;
UA_DataValue *pubDataValueRT = NULL;
UA_Boolean *runningPub = NULL;
UA_DataValue *runningPubDataValueRT = NULL;
UA_UInt64 *repeatedCounterData[REPEATED_NODECOUNTS] = {NULL};
UA_DataValue *repeatedDataValueRT[REPEATED_NODECOUNTS] = {NULL};
#else
static UA_UInt64 previousSubCounterData = 0;
#endif
UA_NodeId subNodeID;
UA_NodeId subRepeatedCountNodeID;
UA_NodeId runningSubStatusNodeID;
UA_UInt64 *subCounterData = NULL;
UA_DataValue *subDataValueRT = NULL;
UA_Boolean *runningSub = NULL;
UA_DataValue *runningSubDataValueRT = NULL;
UA_UInt64 *subRepeatedCounterData[REPEATED_NODECOUNTS] = {NULL};
UA_DataValue *subRepeatedDataValueRT[REPEATED_NODECOUNTS] = {NULL};
/**
* **CSV file handling**
*
* csv files are written for pubSubApp thread.
* csv files include the counterdata that is being either Published or Subscribed
* along with the timestamp. These csv files can be used to compute latency for following
* combinations of Tracepoints, T1-T4 and T1-T8.
*
* T1-T8 - Gives the Round-trip time of a counterdata, as the value published by the Publisher thread
* in pubsub_TSN_publisher_multiple_thread.c example is subscribed by the pubSubApp thread in pubsub_TSN_loopback_single_thread.c
* example and is published back to the pubsub_TSN_publisher_multiple_thread.c example
*/
#ifdef TWO_WAY_COMMUNICATION
/* File to store the data and timestamps for different traffic */
FILE *fpPublisher;
char *filePublishedData = "publisher_T5.csv";
/* Array to store published counter data */
UA_UInt64 publishCounterValue[MAX_MEASUREMENTS];
size_t measurementsPublisher = 0;
/* Array to store timestamp */
struct timespec publishTimestamp[MAX_MEASUREMENTS];
struct timespec dataModificationTime;
#endif
/* File to store the data and timestamps for different traffic */
FILE *fpSubscriber;
char *fileSubscribedData = "subscriber_T4.csv";
/* Array to store subscribed counter data */
UA_UInt64 subscribeCounterValue[MAX_MEASUREMENTS];
size_t measurementsSubscriber = 0;
/* Array to store timestamp */
struct timespec subscribeTimestamp[MAX_MEASUREMENTS];
/* Variable for PubSub connection creation */
UA_NodeId connectionIdentSubscriber;
struct timespec dataReceiveTime;
UA_NodeId readerGroupIdentifier;
UA_NodeId readerIdentifier;
UA_DataSetReaderConfig readerConfig;
/* Structure to define thread parameters */
typedef struct {
UA_Server* server;
void* pubData;
void* subData;
UA_ServerCallback pubCallback;
UA_ServerCallback subCallback;
UA_Duration interval_ms;
UA_UInt64 operBaseTime;
UA_UInt64 monotonicOffset;
UA_UInt64 packetLossCount;
} threadArgPubSub;
threadArgPubSub *threadArgPubSub1;
/* PubSub application thread routine */
void *pubSubApp(void *arg);
/* For adding nodes in the server information model */
static void addServerNodes(UA_Server *server);
/* For deleting the nodes created */
static void removeServerNodes(UA_Server *server);
/* To create multi-threads */
static pthread_t threadCreation(UA_Int16 threadPriority, size_t coreAffinity, void *(*thread) (void *),
char *applicationName, void *serverConfig);
void userApplication(UA_UInt64 monotonicOffsetValue);
/* Stop signal */
static void stopHandler(int sign) {
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "received ctrl-c");
signalTerm = UA_TRUE;
}
/**
* **Nanosecond field handling**
*
* Nanosecond field in timespec is checked for overflowing and one second
* is added to seconds field and nanosecond field is set to zero
*/
static void nanoSecondFieldConversion(struct timespec *timeSpecValue) {
/* Check if ns field is greater than '1 ns less than 1sec' */
while (timeSpecValue->tv_nsec > (SECONDS -1)) {
/* Move to next second and remove it from ns field */
timeSpecValue->tv_sec += SECONDS_INCREMENT;
timeSpecValue->tv_nsec -= (__syscall_slong_t)(SECONDS);
}
}
/**
* **Custom callback handling**
*
* Custom callback thread handling overwrites the default timer based
* callback function with the custom (user-specified) callback interval. */
/* Add a callback for cyclic repetition */
static UA_StatusCode
addPubSubApplicationCallback(UA_Server *server, UA_NodeId identifier,
UA_ServerCallback callback,
void *data, UA_Double interval_ms,
UA_DateTime *baseTime, UA_TimerPolicy timerPolicy,
UA_UInt64 *callbackId) {
#ifdef TWO_WAY_COMMUNICATION
/* Check the writer group identifier and create the thread accordingly */
if(UA_NodeId_equal(&identifier, &writerGroupIdent)) {
threadArgPubSub1->pubData = data;
threadArgPubSub1->pubCallback = callback;
threadArgPubSub1->interval_ms = interval_ms;
}
else {
#endif
threadArgPubSub1->subData = data;
threadArgPubSub1->subCallback = callback;
#ifdef TWO_WAY_COMMUNICATION
}
#else
threadArgPubSub1->interval_ms = interval_ms;
#endif
return UA_STATUSCODE_GOOD;
}
static UA_StatusCode
changePubSubApplicationCallback(UA_Server *server, UA_NodeId identifier,
UA_UInt64 callbackId, UA_Double interval_ms,
UA_DateTime *baseTime, UA_TimerPolicy timerPolicy) {
/* Callback interval need not be modified as it is thread based implementation.
* The thread uses nanosleep for calculating cycle time and modification in
* nanosleep value changes cycle time */
return UA_STATUSCODE_GOOD;
}
/* Remove the callback added for cyclic repetition */
static void
removePubSubApplicationCallback(UA_Server *server, UA_NodeId identifier, UA_UInt64 callbackId) {
/* ToDo: Handle thread id */
}
/**
* **External data source handling**
*
* If the external data source is written over the information model, the
* externalDataWriteCallback will be triggered. The user has to take care and assure
* that the write leads not to synchronization issues and race conditions. */
static UA_StatusCode
externalDataWriteCallback(UA_Server *server, const UA_NodeId *sessionId,
void *sessionContext, const UA_NodeId *nodeId,
void *nodeContext, const UA_NumericRange *range,
const UA_DataValue *data){
//node values are updated by using variables in the memory
//UA_Server_write is not used for updating node values.
return UA_STATUSCODE_GOOD;
}
static UA_StatusCode
externalDataReadNotificationCallback(UA_Server *server, const UA_NodeId *sessionId,
void *sessionContext, const UA_NodeId *nodeid,
void *nodeContext, const UA_NumericRange *range){
//allow read without any preparation
return UA_STATUSCODE_GOOD;
}
/**
* **Subscriber**
*
* Create connection, readergroup, datasetreader, subscribedvariables for the Subscriber thread.
*/
static void
addPubSubConnectionSubscriber(UA_Server *server, UA_String *transportProfile,
UA_NetworkAddressUrlDataType *networkAddressUrlSubscriber){
UA_StatusCode retval = UA_STATUSCODE_GOOD;
/* Details about the connection configuration and handling are located
* in the pubsub connection tutorial */
UA_PubSubConnectionConfig connectionConfig;
memset(&connectionConfig, 0, sizeof(connectionConfig));
connectionConfig.name = UA_STRING("Subscriber Connection");
connectionConfig.enabled = UA_TRUE;
UA_NetworkAddressUrlDataType networkAddressUrlsubscribe = *networkAddressUrlSubscriber;
connectionConfig.transportProfileUri = *transportProfile;
UA_Variant_setScalar(&connectionConfig.address, &networkAddressUrlsubscribe, &UA_TYPES[UA_TYPES_NETWORKADDRESSURLDATATYPE]);
connectionConfig.publisherIdType = UA_PUBLISHERIDTYPE_UINT32;
connectionConfig.publisherId.uint32 = UA_UInt32_random();
retval |= UA_Server_addPubSubConnection(server, &connectionConfig, &connectionIdentSubscriber);
if (retval == UA_STATUSCODE_GOOD)
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,"The PubSub Connection was created successfully!");
}
/* Add ReaderGroup to the created connection */
static void
addReaderGroup(UA_Server *server) {
if(server == NULL)
return;
UA_ReaderGroupConfig readerGroupConfig;
memset (&readerGroupConfig, 0, sizeof(UA_ReaderGroupConfig));
readerGroupConfig.name = UA_STRING("ReaderGroup");
readerGroupConfig.rtLevel = UA_PUBSUB_RT_FIXED_SIZE;
UA_Server_addReaderGroup(server, connectionIdentSubscriber, &readerGroupConfig,
&readerGroupIdentifier);
}
/* Set SubscribedDataSet type to TargetVariables data type
* Add subscribedvariables to the DataSetReader */
static void addSubscribedVariables (UA_Server *server) {
UA_Int32 iterator = 0;
UA_Int32 iteratorRepeatedCount = 0;
if(server == NULL) {
return;
}
UA_FieldTargetVariable *targetVars = (UA_FieldTargetVariable*) UA_calloc((REPEATED_NODECOUNTS + 2), sizeof(UA_FieldTargetVariable));
if(!targetVars) {
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "FieldTargetVariable - Bad out of memory");
return;
}
runningSub = UA_Boolean_new();
if(!runningSub) {
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "runningsub - Bad out of memory");
UA_free(targetVars);
return;
}
*runningSub = UA_TRUE;
runningSubDataValueRT = UA_DataValue_new();
if(!runningSubDataValueRT) {
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "runningsubDatavalue - Bad out of memory");
UA_free(targetVars);
return;
}
UA_Variant_setScalar(&runningSubDataValueRT->value, runningSub, &UA_TYPES[UA_TYPES_BOOLEAN]);
runningSubDataValueRT->hasValue = UA_TRUE;
/* Set the value backend of the above create node to 'external value source' */
UA_ValueBackend runningSubvalueBackend;
runningSubvalueBackend.backendType = UA_VALUEBACKENDTYPE_EXTERNAL;
runningSubvalueBackend.backend.external.value = &runningSubDataValueRT;
runningSubvalueBackend.backend.external.callback.userWrite = externalDataWriteCallback;
runningSubvalueBackend.backend.external.callback.notificationRead = externalDataReadNotificationCallback;
UA_Server_setVariableNode_valueBackend(server, UA_NODEID_NUMERIC(1, (UA_UInt32)30000), runningSubvalueBackend);
UA_FieldTargetDataType_init(&targetVars[iterator].targetVariable);
targetVars[iterator].targetVariable.attributeId = UA_ATTRIBUTEID_VALUE;
targetVars[iterator].targetVariable.targetNodeId = UA_NODEID_NUMERIC(1, (UA_UInt32)30000);
iterator++;
/* For creating Targetvariable */
for (iterator = 1, iteratorRepeatedCount = 0; iterator <= REPEATED_NODECOUNTS; iterator++, iteratorRepeatedCount++)
{
subRepeatedCounterData[iteratorRepeatedCount] = UA_UInt64_new();
if(!subRepeatedCounterData[iteratorRepeatedCount]) {
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "SubscribeRepeatedCounterData - Bad out of memory");
UA_free(targetVars);
return;
}
*subRepeatedCounterData[iteratorRepeatedCount] = 0;
subRepeatedDataValueRT[iteratorRepeatedCount] = UA_DataValue_new();
if(!subRepeatedDataValueRT[iteratorRepeatedCount]) {
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "SubscribeRepeatedCounterDataValue - Bad out of memory");
UA_free(targetVars);
return;
}
UA_Variant_setScalar(&subRepeatedDataValueRT[iteratorRepeatedCount]->value, subRepeatedCounterData[iteratorRepeatedCount], &UA_TYPES[UA_TYPES_UINT64]);
subRepeatedDataValueRT[iteratorRepeatedCount]->hasValue = UA_TRUE;
/* Set the value backend of the above create node to 'external value source' */
UA_ValueBackend valueBackend;
valueBackend.backendType = UA_VALUEBACKENDTYPE_EXTERNAL;
valueBackend.backend.external.value = &subRepeatedDataValueRT[iteratorRepeatedCount];
valueBackend.backend.external.callback.userWrite = externalDataWriteCallback;
valueBackend.backend.external.callback.notificationRead = externalDataReadNotificationCallback;
UA_Server_setVariableNode_valueBackend(server, UA_NODEID_NUMERIC(1, (UA_UInt32)iteratorRepeatedCount+50000), valueBackend);
UA_FieldTargetDataType_init(&targetVars[iterator].targetVariable);
targetVars[iterator].targetVariable.attributeId = UA_ATTRIBUTEID_VALUE;
targetVars[iterator].targetVariable.targetNodeId = UA_NODEID_NUMERIC(1, (UA_UInt32)iteratorRepeatedCount + 50000);
}
subCounterData = UA_UInt64_new();
if(!subCounterData) {
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "SubscribeCounterData - Bad out of memory");
UA_free(targetVars);
return;
}
*subCounterData = 0;
subDataValueRT = UA_DataValue_new();
if(!subDataValueRT) {
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "SubscribeDataValue - Bad out of memory");
UA_free(targetVars);
return;
}
UA_Variant_setScalar(&subDataValueRT->value, subCounterData, &UA_TYPES[UA_TYPES_UINT64]);
subDataValueRT->hasValue = UA_TRUE;
/* Set the value backend of the above create node to 'external value source' */
UA_ValueBackend valueBackend;
valueBackend.backendType = UA_VALUEBACKENDTYPE_EXTERNAL;
valueBackend.backend.external.value = &subDataValueRT;
valueBackend.backend.external.callback.userWrite = externalDataWriteCallback;
valueBackend.backend.external.callback.notificationRead = externalDataReadNotificationCallback;
UA_Server_setVariableNode_valueBackend(server, subNodeID, valueBackend);
UA_FieldTargetDataType_init(&targetVars[iterator].targetVariable);
targetVars[iterator].targetVariable.attributeId = UA_ATTRIBUTEID_VALUE;
targetVars[iterator].targetVariable.targetNodeId = subNodeID;
/* Set the subscribed data to TargetVariable type */
readerConfig.subscribedDataSetType = UA_PUBSUB_SDS_TARGET;
readerConfig.subscribedDataSet.subscribedDataSetTarget.targetVariables = targetVars;
readerConfig.subscribedDataSet.subscribedDataSetTarget.targetVariablesSize = REPEATED_NODECOUNTS + 2;
}
/* Add DataSetReader to the ReaderGroup */
static void
addDataSetReader(UA_Server *server) {
UA_Int32 iterator = 0;
if(server == NULL) {
return;
}
memset (&readerConfig, 0, sizeof(UA_DataSetReaderConfig));
readerConfig.name = UA_STRING("DataSet Reader");
UA_UInt16 publisherIdentifier = PUBLISHER_ID_SUB;
readerConfig.publisherId.type = &UA_TYPES[UA_TYPES_UINT16];
readerConfig.publisherId.data = &publisherIdentifier;
readerConfig.writerGroupId = WRITER_GROUP_ID_SUB;
readerConfig.dataSetWriterId = DATA_SET_WRITER_ID_SUB;
readerConfig.messageSettings.encoding = UA_EXTENSIONOBJECT_DECODED;
readerConfig.messageSettings.content.decoded.type = &UA_TYPES[UA_TYPES_UADPDATASETREADERMESSAGEDATATYPE];
UA_UadpDataSetReaderMessageDataType *dataSetReaderMessage = UA_UadpDataSetReaderMessageDataType_new();
dataSetReaderMessage->networkMessageContentMask = (UA_UadpNetworkMessageContentMask)(UA_UADPNETWORKMESSAGECONTENTMASK_PUBLISHERID |
(UA_UadpNetworkMessageContentMask)UA_UADPNETWORKMESSAGECONTENTMASK_GROUPHEADER |
(UA_UadpNetworkMessageContentMask)UA_UADPNETWORKMESSAGECONTENTMASK_WRITERGROUPID |
(UA_UadpNetworkMessageContentMask)UA_UADPNETWORKMESSAGECONTENTMASK_PAYLOADHEADER);
readerConfig.messageSettings.content.decoded.data = dataSetReaderMessage;
/* Setting up Meta data configuration in DataSetReader */
UA_DataSetMetaDataType *pMetaData = &readerConfig.dataSetMetaData;
UA_DataSetMetaDataType_init (pMetaData);
/* Static definition of number of fields size to 1 to create one
targetVariable */
pMetaData->fieldsSize = REPEATED_NODECOUNTS + 2;
pMetaData->fields = (UA_FieldMetaData*)UA_Array_new (pMetaData->fieldsSize,
&UA_TYPES[UA_TYPES_FIELDMETADATA]);
/* Boolean DataType */
UA_FieldMetaData_init (&pMetaData->fields[iterator]);
UA_NodeId_copy (&UA_TYPES[UA_TYPES_BOOLEAN].typeId,
&pMetaData->fields[iterator].dataType);
pMetaData->fields[iterator].builtInType = UA_NS0ID_BOOLEAN;
pMetaData->fields[iterator].valueRank = -1; /* scalar */
iterator++;
for (iterator = 1; iterator <= REPEATED_NODECOUNTS; iterator++)
{
UA_FieldMetaData_init (&pMetaData->fields[iterator]);
UA_NodeId_copy (&UA_TYPES[UA_TYPES_UINT64].typeId,
&pMetaData->fields[iterator].dataType);
pMetaData->fields[iterator].builtInType = UA_NS0ID_UINT64;
pMetaData->fields[iterator].valueRank = -1; /* scalar */
}
/* Unsigned Integer DataType */
UA_FieldMetaData_init (&pMetaData->fields[iterator]);
UA_NodeId_copy (&UA_TYPES[UA_TYPES_UINT64].typeId,
&pMetaData->fields[iterator].dataType);
pMetaData->fields[iterator].builtInType = UA_NS0ID_UINT64;
pMetaData->fields[iterator].valueRank = -1; /* scalar */
/* Setup Target Variables in DSR config */
addSubscribedVariables(server);
/* Setting up Meta data configuration in DataSetReader */
UA_Server_addDataSetReader(server, readerGroupIdentifier, &readerConfig,
&readerIdentifier);
UA_free(readerConfig.subscribedDataSet.subscribedDataSetTarget.targetVariables);
UA_free(readerConfig.dataSetMetaData.fields);
UA_UadpDataSetReaderMessageDataType_delete(dataSetReaderMessage);
}
#ifdef TWO_WAY_COMMUNICATION
/**
* **Publisher**
*
* Create connection, writergroup, datasetwriter and publisheddataset for Publisher thread.
*/
static void
addPubSubConnection(UA_Server *server, UA_String *transportProfile,
UA_NetworkAddressUrlDataType *networkAddressUrlPub){
/* Details about the connection configuration and handling are located
* in the pubsub connection tutorial */
UA_PubSubConnectionConfig connectionConfig;
memset(&connectionConfig, 0, sizeof(connectionConfig));
connectionConfig.name = UA_STRING("Publisher Connection");
connectionConfig.enabled = UA_TRUE;
UA_NetworkAddressUrlDataType networkAddressUrl = *networkAddressUrlPub;
connectionConfig.transportProfileUri = *transportProfile;
UA_Variant_setScalar(&connectionConfig.address, &networkAddressUrl,
&UA_TYPES[UA_TYPES_NETWORKADDRESSURLDATATYPE]);
connectionConfig.publisherIdType = UA_PUBLISHERIDTYPE_UINT16;
connectionConfig.publisherId.uint16 = PUBLISHER_ID;
/* Connection options are given as Key/Value Pairs - Sockprio and Txtime */
UA_KeyValuePair connectionOptions[2];
connectionOptions[0].key = UA_QUALIFIEDNAME(0, "sockpriority");
UA_Variant_setScalar(&connectionOptions[0].value, &socketPriority, &UA_TYPES[UA_TYPES_UINT32]);
connectionOptions[1].key = UA_QUALIFIEDNAME(0, "enablesotxtime");
UA_Variant_setScalar(&connectionOptions[1].value, &disableSoTxtime, &UA_TYPES[UA_TYPES_BOOLEAN]);
connectionConfig.connectionProperties.map = connectionOptions;
connectionConfig.connectionProperties.mapSize = 2;
UA_Server_addPubSubConnection(server, &connectionConfig, &connectionIdent);
}
/* PublishedDataset handling */
static void
addPublishedDataSet(UA_Server *server) {
UA_PublishedDataSetConfig publishedDataSetConfig;
memset(&publishedDataSetConfig, 0, sizeof(UA_PublishedDataSetConfig));
publishedDataSetConfig.publishedDataSetType = UA_PUBSUB_DATASET_PUBLISHEDITEMS;
publishedDataSetConfig.name = UA_STRING("Demo PDS");
UA_Server_addPublishedDataSet(server, &publishedDataSetConfig, &publishedDataSetIdent);
}
/* DataSetField handling */
static void
_addDataSetField(UA_Server *server) {
/* Add a field to the previous created PublishedDataSet */
UA_NodeId dataSetFieldIdent1;
UA_DataSetFieldConfig dataSetFieldConfig;
#if defined PUBSUB_CONFIG_FASTPATH_FIXED_OFFSETS
staticValueSource = UA_DataValue_new();
#endif
UA_NodeId dataSetFieldIdentRunning;
UA_DataSetFieldConfig dsfConfigPubStatus;
memset(&dsfConfigPubStatus, 0, sizeof(UA_DataSetFieldConfig));
runningPub = UA_Boolean_new();
if(!runningPub) {
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "runningPub - Bad out of memory");
return;
}
*runningPub = UA_TRUE;
runningPubDataValueRT = UA_DataValue_new();
if(!runningPubDataValueRT) {
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "runningPubDataValue - Bad out of memory");
return;
}
UA_Variant_setScalar(&runningPubDataValueRT->value, runningPub, &UA_TYPES[UA_TYPES_BOOLEAN]);
runningPubDataValueRT->hasValue = UA_TRUE;
/* Set the value backend of the above create node to 'external value source' */
UA_ValueBackend runningPubvalueBackend;
runningPubvalueBackend.backendType = UA_VALUEBACKENDTYPE_EXTERNAL;
runningPubvalueBackend.backend.external.value = &runningPubDataValueRT;
runningPubvalueBackend.backend.external.callback.userWrite = externalDataWriteCallback;
runningPubvalueBackend.backend.external.callback.notificationRead = externalDataReadNotificationCallback;
UA_Server_setVariableNode_valueBackend(server, UA_NODEID_NUMERIC(1, (UA_UInt32)20000), runningPubvalueBackend);
/* setup RT DataSetField config */
dsfConfigPubStatus.field.variable.rtValueSource.rtInformationModelNode = UA_TRUE;
dsfConfigPubStatus.field.variable.publishParameters.publishedVariable = UA_NODEID_NUMERIC(1, (UA_UInt32)20000);
UA_Server_addDataSetField(server, publishedDataSetIdent, &dsfConfigPubStatus, &dataSetFieldIdentRunning);
for (UA_Int32 iterator = 0; iterator < REPEATED_NODECOUNTS; iterator++)
{
memset(&dataSetFieldConfig, 0, sizeof(UA_DataSetFieldConfig));
repeatedCounterData[iterator] = UA_UInt64_new();
if(!repeatedCounterData[iterator]) {
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PublishRepeatedCounter - Bad out of memory");
return;
}
*repeatedCounterData[iterator] = 0;
repeatedDataValueRT[iterator] = UA_DataValue_new();
if(!repeatedDataValueRT[iterator]) {
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PublishRepeatedCounterDataValue - Bad out of memory");
return;
}
UA_Variant_setScalar(&repeatedDataValueRT[iterator]->value, repeatedCounterData[iterator], &UA_TYPES[UA_TYPES_UINT64]);
repeatedDataValueRT[iterator]->hasValue = UA_TRUE;
/* Set the value backend of the above create node to 'external value source' */
UA_ValueBackend valueBackend;
valueBackend.backendType = UA_VALUEBACKENDTYPE_EXTERNAL;
valueBackend.backend.external.value = &repeatedDataValueRT[iterator];
valueBackend.backend.external.callback.userWrite = externalDataWriteCallback;
valueBackend.backend.external.callback.notificationRead = externalDataReadNotificationCallback;
UA_Server_setVariableNode_valueBackend(server, UA_NODEID_NUMERIC(1, (UA_UInt32)iterator+10000), valueBackend);
/* setup RT DataSetField config */
dataSetFieldConfig.field.variable.rtValueSource.rtInformationModelNode = UA_TRUE;
dataSetFieldConfig.field.variable.publishParameters.publishedVariable = UA_NODEID_NUMERIC(1, (UA_UInt32)iterator+10000);
UA_Server_addDataSetField(server, publishedDataSetIdent, &dataSetFieldConfig, &dataSetFieldIdent1);
}
UA_NodeId dataSetFieldIdent;
UA_DataSetFieldConfig dsfConfig;
memset(&dsfConfig, 0, sizeof(UA_DataSetFieldConfig));
pubCounterData = UA_UInt64_new();
if(!pubCounterData) {
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PublishCounter - Bad out of memory");
return;
}
*pubCounterData = 0;
pubDataValueRT = UA_DataValue_new();
if(!pubDataValueRT) {
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PublishDataValue - Bad out of memory");
return;
}
UA_Variant_setScalar(&pubDataValueRT->value, pubCounterData, &UA_TYPES[UA_TYPES_UINT64]);
pubDataValueRT->hasValue = UA_TRUE;
/* Set the value backend of the above create node to 'external value source' */
UA_ValueBackend valueBackend;
valueBackend.backendType = UA_VALUEBACKENDTYPE_EXTERNAL;
valueBackend.backend.external.value = &pubDataValueRT;
valueBackend.backend.external.callback.userWrite = externalDataWriteCallback;
valueBackend.backend.external.callback.notificationRead = externalDataReadNotificationCallback;
UA_Server_setVariableNode_valueBackend(server, pubNodeID, valueBackend);
/* setup RT DataSetField config */
dsfConfig.field.variable.rtValueSource.rtInformationModelNode = UA_TRUE;
dsfConfig.field.variable.publishParameters.publishedVariable = pubNodeID;
UA_Server_addDataSetField(server, publishedDataSetIdent, &dsfConfig, &dataSetFieldIdent);
}
/* WriterGroup handling */
static void
addWriterGroup(UA_Server *server) {
UA_WriterGroupConfig writerGroupConfig;
memset(&writerGroupConfig, 0, sizeof(UA_WriterGroupConfig));
writerGroupConfig.name = UA_STRING("Demo WriterGroup");
writerGroupConfig.publishingInterval = cycleTimeInMsec;
writerGroupConfig.enabled = UA_FALSE;
writerGroupConfig.encodingMimeType = UA_PUBSUB_ENCODING_UADP;
writerGroupConfig.writerGroupId = WRITER_GROUP_ID;
writerGroupConfig.rtLevel = UA_PUBSUB_RT_FIXED_SIZE;
writerGroupConfig.pubsubManagerCallback.addCustomCallback = addPubSubApplicationCallback;
writerGroupConfig.pubsubManagerCallback.changeCustomCallback = changePubSubApplicationCallback;
writerGroupConfig.pubsubManagerCallback.removeCustomCallback = removePubSubApplicationCallback;
writerGroupConfig.messageSettings.encoding = UA_EXTENSIONOBJECT_DECODED;
writerGroupConfig.messageSettings.content.decoded.type = &UA_TYPES[UA_TYPES_UADPWRITERGROUPMESSAGEDATATYPE];
/* The configuration flags for the messages are encapsulated inside the
* message- and transport settings extension objects. These extension
* objects are defined by the standard. e.g.
* UadpWriterGroupMessageDataType */
UA_UadpWriterGroupMessageDataType *writerGroupMessage = UA_UadpWriterGroupMessageDataType_new();
/* Change message settings of writerGroup to send PublisherId,
* WriterGroupId in GroupHeader and DataSetWriterId in PayloadHeader
* of NetworkMessage */
writerGroupMessage->networkMessageContentMask = (UA_UadpNetworkMessageContentMask)(UA_UADPNETWORKMESSAGECONTENTMASK_PUBLISHERID |
(UA_UadpNetworkMessageContentMask)UA_UADPNETWORKMESSAGECONTENTMASK_GROUPHEADER |
(UA_UadpNetworkMessageContentMask)UA_UADPNETWORKMESSAGECONTENTMASK_WRITERGROUPID |
(UA_UadpNetworkMessageContentMask)UA_UADPNETWORKMESSAGECONTENTMASK_PAYLOADHEADER);
writerGroupConfig.messageSettings.content.decoded.data = writerGroupMessage;
UA_Server_addWriterGroup(server, connectionIdent, &writerGroupConfig, &writerGroupIdent);
UA_Server_enableWriterGroup(server, writerGroupIdent);
UA_UadpWriterGroupMessageDataType_delete(writerGroupMessage);
}
/* DataSetWriter handling */
static void
addDataSetWriter(UA_Server *server) {
UA_NodeId dataSetWriterIdent;
UA_DataSetWriterConfig dataSetWriterConfig;
memset(&dataSetWriterConfig, 0, sizeof(UA_DataSetWriterConfig));
dataSetWriterConfig.name = UA_STRING("Demo DataSetWriter");
dataSetWriterConfig.dataSetWriterId = DATA_SET_WRITER_ID;
dataSetWriterConfig.keyFrameCount = 10;
UA_Server_addDataSetWriter(server, writerGroupIdent, publishedDataSetIdent,
&dataSetWriterConfig, &dataSetWriterIdent);
}
/**
* **Published data handling**
*
* The published data is updated in the array using this function
*/
static void
updateMeasurementsPublisher(struct timespec start_time,
UA_UInt64 counterValue, UA_UInt64 monotonicOffsetValue) {
if(measurementsPublisher >= MAX_MEASUREMENTS) {
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "Publisher: Maximum log measurements reached - Closing the application");
signalTerm = UA_TRUE;
return;
}
if(consolePrint)
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,"Pub:%"PRId64",%ld.%09ld\n", counterValue, start_time.tv_sec, start_time.tv_nsec);
if (signalTerm != UA_TRUE){
UA_UInt64 actualTimeValue = (UA_UInt64)((start_time.tv_sec * SECONDS) + start_time.tv_nsec) + monotonicOffsetValue;
publishTimestamp[measurementsPublisher].tv_sec = (__time_t)(actualTimeValue/(UA_UInt64)SECONDS);
publishTimestamp[measurementsPublisher].tv_nsec = (__syscall_slong_t)(actualTimeValue%(UA_UInt64)SECONDS);
publishCounterValue[measurementsPublisher] = counterValue;
measurementsPublisher++;
}
}
#endif
/**
* **Subscribed data handling**
*
* The subscribed data is updated in the array using this function Subscribed data handling**
*/
static void
updateMeasurementsSubscriber(struct timespec receive_time, UA_UInt64 counterValue, UA_UInt64 monotonicOffsetValue) {
if(measurementsSubscriber >= MAX_MEASUREMENTS) {
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "Subscriber: Maximum log measurements reached - Closing the application");
signalTerm = UA_TRUE;
return;
}
if(consolePrint)
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,"Sub:%"PRId64",%ld.%09ld\n", counterValue, receive_time.tv_sec, receive_time.tv_nsec);
if (signalTerm != UA_TRUE){
UA_UInt64 actualTimeValue = (UA_UInt64)((receive_time.tv_sec * SECONDS) + receive_time.tv_nsec) + monotonicOffsetValue;
subscribeTimestamp[measurementsSubscriber].tv_sec = (__time_t)(actualTimeValue/(UA_UInt64)SECONDS);
subscribeTimestamp[measurementsSubscriber].tv_nsec = (__syscall_slong_t)(actualTimeValue%(UA_UInt64)SECONDS);
subscribeCounterValue[measurementsSubscriber] = counterValue;
measurementsSubscriber++;
}
}
/**
* userApplication function is used to increment the counterdata to be published by the Publisher and
* read the data from Information Model for the Subscriber and writes the updated counterdata in distinct csv files
**/
void userApplication(UA_UInt64 monotonicOffsetValue) {
clock_gettime(CLOCKID, &dataReceiveTime);
#ifdef TWO_WAY_COMMUNICATION
/* Check packet loss count */
/* *subCounterData > 0 check is kept because while setting the writerGroupToOperational condition
* in the pubsub_TSN_publisher_single_thread.c publisher publishes a data of zero once while callback setup */
if ((*subCounterData > 0) && (*subCounterData != (*pubCounterData + 1))) {
UA_UInt64 missedCount = *subCounterData - (*pubCounterData + 1);
threadArgPubSub1->packetLossCount += missedCount;
}
*pubCounterData = *subCounterData;
for (UA_Int32 iterator = 0; iterator < REPEATED_NODECOUNTS; iterator++)
*repeatedCounterData[iterator] = *subRepeatedCounterData[iterator];
clock_gettime(CLOCKID, &dataModificationTime);
#else
if ((*subCounterData > 0) && (*subCounterData != (previousSubCounterData + 1))) {
UA_UInt64 missedCount = *subCounterData - (previousSubCounterData + 1);
threadArgPubSub1->packetLossCount += missedCount;
}
previousSubCounterData = *subCounterData;
#endif
if (enableCsvLog || consolePrint) {
if (*subCounterData > 0)
updateMeasurementsSubscriber(dataReceiveTime, *subCounterData, monotonicOffsetValue);
#ifdef TWO_WAY_COMMUNICATION
if (*pubCounterData > 0)
updateMeasurementsPublisher(dataModificationTime, *pubCounterData, monotonicOffsetValue);
#endif
}
/* *runningPub variable made false and send to the publisher application which is running in another node
which will close the application during blocking socket condition */
if (signalTerm == UA_TRUE) {
#ifdef TWO_WAY_COMMUNICATION
*runningPub = UA_FALSE;
#endif
*runningSub = UA_FALSE;
}
}
/**
* **PubSub thread routine**
*/
void *pubSubApp(void *arg) {
struct timespec nextnanosleeptimePubSubApplication;
//struct timespec currentTimeInTsCheck;
UA_Server* server;
UA_ReaderGroup* currentReaderGroup;
UA_ServerCallback subCallback;
#ifdef TWO_WAY_COMMUNICATION
UA_ServerCallback pubCallback;
UA_WriterGroup* currentWriterGroup;
#endif
UA_UInt64 interval_ms;
UA_UInt64 monotonicOffsetValue = 0;
server = threadArgPubSub1->server;
currentReaderGroup = (UA_ReaderGroup*)threadArgPubSub1->subData;
subCallback = threadArgPubSub1->subCallback;
#ifdef TWO_WAY_COMMUNICATION
currentWriterGroup = (UA_WriterGroup *)threadArgPubSub1->pubData;
pubCallback = threadArgPubSub1->pubCallback;
#endif
interval_ms = (UA_UInt64)(threadArgPubSub1->interval_ms * MILLI_SECONDS);
//To synchronize the application along with gating cycle the below calculations are made
//Below calculations are done for monotonic clock
struct timespec currentTimeInTs;
UA_UInt64 addingValueToStartTime;
UA_UInt64 timeToStart;
clock_gettime(CLOCKID, ¤tTimeInTs);
UA_UInt64 currentTimeInNs = (UA_UInt64)((currentTimeInTs.tv_sec * (SECONDS)) + currentTimeInTs.tv_nsec);
currentTimeInNs = currentTimeInNs + threadArgPubSub1->monotonicOffset;
timeToStart = currentTimeInNs + (SECONDS_SLEEP * (UA_UInt64)(SECONDS)); //Adding 5 seconds to start the cycle
if (threadArgPubSub1->operBaseTime != 0){
UA_UInt64 moduloValueOfOperBaseTime = timeToStart % threadArgPubSub1->operBaseTime;
if(moduloValueOfOperBaseTime > interval_ms)
addingValueToStartTime = interval_ms - (moduloValueOfOperBaseTime % interval_ms);
else
addingValueToStartTime = interval_ms - (moduloValueOfOperBaseTime);
timeToStart = timeToStart + addingValueToStartTime;
timeToStart = timeToStart - (threadArgPubSub1->monotonicOffset);
}
else{
timeToStart = timeToStart - timeToStart%interval_ms;
timeToStart = timeToStart - (threadArgPubSub1->monotonicOffset);
}
UA_UInt64 CycleStartTimeS = (UA_UInt64)(timeToStart / (UA_UInt64)(SECONDS));
UA_UInt64 CycleStartTimeNs = (UA_UInt64)(timeToStart - (CycleStartTimeS * (UA_UInt64)(SECONDS)));
nextnanosleeptimePubSubApplication.tv_sec = (__time_t )(CycleStartTimeS);
nextnanosleeptimePubSubApplication.tv_nsec = (__syscall_slong_t)(CycleStartTimeNs);
nanoSecondFieldConversion(&nextnanosleeptimePubSubApplication);
monotonicOffsetValue = threadArgPubSub1->monotonicOffset;
while (*runningSub) {
//Sleep for cycle time
clock_nanosleep(CLOCKID, TIMER_ABSTIME, &nextnanosleeptimePubSubApplication, NULL);
//Call the subscriber callback to receive the data
subCallback(server, currentReaderGroup);
userApplication(monotonicOffsetValue);
#ifdef TWO_WAY_COMMUNICATION
//ToDo:Handled only for without SO_TXTIME
//Call the publish callback to publish the data into the network
pubCallback(server, currentWriterGroup);
#endif
//Calculate nextwakeup time
nextnanosleeptimePubSubApplication.tv_nsec += (__syscall_slong_t)(cycleTimeInMsec * MILLI_SECONDS);
nanoSecondFieldConversion(&nextnanosleeptimePubSubApplication);
}
sleep(1);
runningServer = UA_FALSE;
return (void*)NULL;
}
/**
* **Thread creation**
*
* The threadcreation functionality creates thread with given threadpriority, coreaffinity. The function returns the threadID of the newly
* created thread.
*/
static pthread_t threadCreation(UA_Int16 threadPriority, size_t coreAffinity, void *(*thread) (void *), char *applicationName, \
void *serverConfig){
/* Core affinity set */
cpu_set_t cpuset;
pthread_t threadID;
struct sched_param schedParam;
UA_Int32 returnValue = 0;
UA_Int32 errorSetAffinity = 0;
/* Return the ID for thread */
threadID = pthread_self();
schedParam.sched_priority = threadPriority;
returnValue = pthread_setschedparam(threadID, SCHED_FIFO, &schedParam);
if (returnValue != 0) {
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,"pthread_setschedparam: failed\n");
exit(1);
}
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,\
"\npthread_setschedparam:%s Thread priority is %d \n", \
applicationName, schedParam.sched_priority);
CPU_ZERO(&cpuset);
CPU_SET(coreAffinity, &cpuset);
errorSetAffinity = pthread_setaffinity_np(threadID, sizeof(cpu_set_t), &cpuset);
if (errorSetAffinity) {
fprintf(stderr, "pthread_setaffinity_np: %s\n", strerror(errorSetAffinity));
exit(1);
}
returnValue = pthread_create(&threadID, NULL, thread, serverConfig);
if (returnValue != 0)
UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,":%s Cannot create thread\n", applicationName);
if (CPU_ISSET(coreAffinity, &cpuset))
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,"%s CPU CORE: %zu\n", applicationName, coreAffinity);
return threadID;
}
/**
* **Creation of nodes**
*
* The addServerNodes function is used to create the publisher and subscriber
* nodes.
*/
static void addServerNodes(UA_Server *server) {
UA_NodeId objectId;
UA_NodeId newNodeId;
UA_ObjectAttributes object = UA_ObjectAttributes_default;
object.displayName = UA_LOCALIZEDTEXT("en-US", "Counter Object");
UA_Server_addObjectNode(server, UA_NODEID_NULL,
UA_NODEID_NUMERIC(0, UA_NS0ID_OBJECTSFOLDER),
UA_NODEID_NUMERIC(0, UA_NS0ID_ORGANIZES),
UA_QUALIFIEDNAME(1, "Counter Object"), UA_NODEID_NULL,
object, NULL, &objectId);
#ifdef TWO_WAY_COMMUNICATION
UA_VariableAttributes publisherAttr = UA_VariableAttributes_default;
UA_UInt64 publishValue = 0;
publisherAttr.accessLevel = UA_ACCESSLEVELMASK_READ | UA_ACCESSLEVELMASK_WRITE;
publisherAttr.dataType = UA_TYPES[UA_TYPES_UINT64].typeId;
UA_Variant_setScalar(&publisherAttr.value, &publishValue, &UA_TYPES[UA_TYPES_UINT64]);