diff --git a/tests/DCPS/BuiltInTopicTest/DataReaderListener.cpp b/tests/DCPS/BuiltInTopicTest/DataReaderListener.cpp index fca0c488428..bd2f00ef194 100644 --- a/tests/DCPS/BuiltInTopicTest/DataReaderListener.cpp +++ b/tests/DCPS/BuiltInTopicTest/DataReaderListener.cpp @@ -14,12 +14,12 @@ using namespace std; DataReaderListenerImpl::DataReaderListenerImpl() : num_reads_(0), - publication_handle_ (::DDS::HANDLE_NIL), - post_restart_publication_handle_ (::DDS::HANDLE_NIL) + publication_handle_(::DDS::HANDLE_NIL), + post_restart_publication_handle_(::DDS::HANDLE_NIL) { } -DataReaderListenerImpl::~DataReaderListenerImpl () +DataReaderListenerImpl::~DataReaderListenerImpl() { } @@ -29,9 +29,8 @@ void DataReaderListenerImpl::on_data_available(DDS::DataReader_ptr reader) try { ::Messenger::MessageDataReader_var message_dr = ::Messenger::MessageDataReader::_narrow(reader); - if (CORBA::is_nil (message_dr.in ())) { - ACE_ERROR((LM_ERROR, ACE_TEXT( - "(%P|%t) DataReaderListener: read: _narrow failed.\n"))); + if (!message_dr) { + ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) DataReaderListener: read: _narrow failed.\n"))); exit(1); } @@ -41,12 +40,10 @@ void DataReaderListenerImpl::on_data_available(DDS::DataReader_ptr reader) if (status == DDS::RETCODE_OK) { - if (si.valid_data) - { + if (si.valid_data) { if (si.publication_handle == ::DDS::HANDLE_NIL - || (si.publication_handle != this->publication_handle_ - && si.publication_handle != this->post_restart_publication_handle_)) - { + || (si.publication_handle != this->publication_handle_ + && si.publication_handle != this->post_restart_publication_handle_)) { ACE_ERROR((LM_ERROR, ACE_TEXT( "(%P|%t) DataReaderListener: ERROR: publication_handle validate failed.\n"))); exit(1); @@ -54,13 +51,9 @@ void DataReaderListenerImpl::on_data_available(DDS::DataReader_ptr reader) ACE_DEBUG((LM_DEBUG, ACE_TEXT( "(%P|%t) DataReaderListener: Message count = %i\n"), message.count)); - } - else if (si.instance_state == DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE) - { + } else if (si.instance_state == DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE) { ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) instance is disposed\n"))); - } - else if (si.instance_state == DDS::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE) - { + } else if (si.instance_state == DDS::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE) { ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) instance is unregistered\n"))); } } else if (status == DDS::RETCODE_NO_DATA) { @@ -71,39 +64,38 @@ void DataReaderListenerImpl::on_data_available(DDS::DataReader_ptr reader) "(%P|%t) DataReaderListener: ERROR: read Message: Error: %i\n"), status)); } } catch (CORBA::Exception& e) { - e._tao_print_exception( - "DataReaderListener: Exception caught in read:", stderr); + e._tao_print_exception("DataReaderListener: Exception caught in read:", stderr); exit(1); } } -void DataReaderListenerImpl::on_requested_deadline_missed ( - DDS::DataReader_ptr, - const DDS::RequestedDeadlineMissedStatus &) +void DataReaderListenerImpl::on_requested_deadline_missed( + DDS::DataReader_ptr, + const DDS::RequestedDeadlineMissedStatus&) { ACE_ERROR((LM_ERROR, ACE_TEXT( "(%P|%t) DataReaderListenerImpl::on_requested_deadline_missed\n"))); } -void DataReaderListenerImpl::on_requested_incompatible_qos ( - DDS::DataReader_ptr, - const DDS::RequestedIncompatibleQosStatus &) +void DataReaderListenerImpl::on_requested_incompatible_qos( + DDS::DataReader_ptr, + const DDS::RequestedIncompatibleQosStatus&) { ACE_ERROR((LM_ERROR, ACE_TEXT( "(%P|%t) DataReaderListenerImpl::on_requested_incompatible_qos\n"))); } -void DataReaderListenerImpl::on_liveliness_changed ( - DDS::DataReader_ptr, - const DDS::LivelinessChangedStatus &) +void DataReaderListenerImpl::on_liveliness_changed( + DDS::DataReader_ptr, + const DDS::LivelinessChangedStatus&) { ACE_DEBUG((LM_DEBUG, ACE_TEXT( "(%P|%t) DataReaderListenerImpl::on_liveliness_changed\n"))); } -void DataReaderListenerImpl::on_subscription_matched ( - DDS::DataReader_ptr, - const DDS::SubscriptionMatchedStatus & status) +void DataReaderListenerImpl::on_subscription_matched( + DDS::DataReader_ptr, + const DDS::SubscriptionMatchedStatus& status) { ACE_DEBUG((LM_DEBUG, ACE_TEXT( "(%P|%t) DataReaderListenerImpl::on_subscription_matched handle=%i\n"), @@ -128,8 +120,8 @@ bool DataReaderListenerImpl::read_bit_instance() } else if (publication_handle_ != ::DDS::HANDLE_NIL) { handle = publication_handle_; } else { - ACE_DEBUG((LM_ERROR, ACE_TEXT( - "(%P|%t) Can't read bit instance, pre and post restart handles are invalid.\n"))); + ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: DataReaderListenerImpl::read_bit_instance:" + " Can't read bit instance, pre and post restart handles are invalid.\n")); return false; } DDS::PublicationBuiltinTopicDataSeq data; @@ -142,32 +134,32 @@ bool DataReaderListenerImpl::read_bit_instance() switch (ret) { case DDS::RETCODE_OK: ACE_DEBUG((LM_DEBUG, ACE_TEXT( - "(%P|%t) read bit instance returned ok\n"))); - return false; + "(%P|%t) DataReaderListenerImpl::read_bit_instance: returned ok\n"))); + return true; case DDS::RETCODE_NO_DATA: - ACE_ERROR((LM_WARNING, ACE_TEXT( - "(%P|%t) read bit instance returned no data\n"))); + ACE_ERROR((LM_NOTICE, ACE_TEXT( + "(%P|%t) NOTICE: DataReaderListenerImpl::read_bit_instance: returned no data. Retrying...\n"))); break; case DDS::RETCODE_BAD_PARAMETER: - ACE_ERROR((LM_ERROR, ACE_TEXT( - "(%P|%t) read bit instance returned bad parameter\n"))); + ACE_ERROR((LM_NOTICE, ACE_TEXT( + "(%P|%t) NOTICE: DataReaderListenerImpl::read_bit_instance: returned bad parameter. Retrying...\n"))); break; default: ACE_ERROR((LM_ERROR, ACE_TEXT( - "(%P|%t) ERROR read bit instance returned %i\n"), ret)); - return true; + "(%P|%t) ERROR: DataReaderListenerImpl::read_bit_instance: returned %i\n"), ret)); + return false; } ACE_OS::sleep(ACE_Time_Value(0, 100000)); } ACE_ERROR((LM_ERROR, ACE_TEXT( - "(%P|%t) ERROR read bit instance: giving up after retries\n"))); - return true; + "(%P|%t) ERROR: DataReaderListenerImpl::read_bit_instance: giving up after retries\n"))); + return false; } void DataReaderListenerImpl::on_sample_rejected( - DDS::DataReader_ptr, - const DDS::SampleRejectedStatus&) + DDS::DataReader_ptr, + const DDS::SampleRejectedStatus&) { ACE_ERROR((LM_ERROR, ACE_TEXT( "(%P|%t) DataReaderListenerImpl::on_sample_rejected\n"))); @@ -181,31 +173,31 @@ void DataReaderListenerImpl::on_sample_lost( "(%P|%t) DataReaderListenerImpl::on_sample_lost\n"))); } -void DataReaderListenerImpl::on_subscription_disconnected ( +void DataReaderListenerImpl::on_subscription_disconnected( DDS::DataReader_ptr, - const ::OpenDDS::DCPS::SubscriptionDisconnectedStatus &) + const ::OpenDDS::DCPS::SubscriptionDisconnectedStatus&) { ACE_ERROR((LM_WARNING, ACE_TEXT( "(%P|%t) DataReaderListenerImpl::on_subscription_disconnected\n"))); } -void DataReaderListenerImpl::on_subscription_reconnected ( +void DataReaderListenerImpl::on_subscription_reconnected( DDS::DataReader_ptr, - const ::OpenDDS::DCPS::SubscriptionReconnectedStatus &) + const ::OpenDDS::DCPS::SubscriptionReconnectedStatus&) { ACE_ERROR((LM_WARNING, ACE_TEXT( "(%P|%t) DataReaderListenerImpl::on_subscription_reconnected\n"))); } -void DataReaderListenerImpl::on_subscription_lost ( +void DataReaderListenerImpl::on_subscription_lost( DDS::DataReader_ptr, - const ::OpenDDS::DCPS::SubscriptionLostStatus &) + const ::OpenDDS::DCPS::SubscriptionLostStatus&) { ACE_ERROR((LM_WARNING, ACE_TEXT( "(%P|%t) DataReaderListenerImpl::on_subscription_lost\n"))); } -void DataReaderListenerImpl::set_builtin_datareader ( +void DataReaderListenerImpl::set_builtin_datareader( DDS::DataReader_ptr builtin) { builtin_ = DDS::DataReader::_duplicate(builtin); diff --git a/tests/DCPS/BuiltInTopicTest/DataReaderListener.h b/tests/DCPS/BuiltInTopicTest/DataReaderListener.h index 5ffa0d08c7c..4bc3db26aa7 100644 --- a/tests/DCPS/BuiltInTopicTest/DataReaderListener.h +++ b/tests/DCPS/BuiltInTopicTest/DataReaderListener.h @@ -11,28 +11,27 @@ #endif /* ACE_LACKS_PRAGMA_ONCE */ class DataReaderListenerImpl - : public virtual OpenDDS::DCPS::LocalObject -{ + : public virtual OpenDDS::DCPS::LocalObject { public: - DataReaderListenerImpl (); + DataReaderListenerImpl(); - virtual ~DataReaderListenerImpl (void); + virtual ~DataReaderListenerImpl(void); - virtual void on_requested_deadline_missed ( + virtual void on_requested_deadline_missed( DDS::DataReader_ptr reader, - const DDS::RequestedDeadlineMissedStatus & status); + const DDS::RequestedDeadlineMissedStatus& status); - virtual void on_requested_incompatible_qos ( + virtual void on_requested_incompatible_qos( DDS::DataReader_ptr reader, - const DDS::RequestedIncompatibleQosStatus & status); + const DDS::RequestedIncompatibleQosStatus& status); - virtual void on_liveliness_changed ( + virtual void on_liveliness_changed( DDS::DataReader_ptr reader, - const DDS::LivelinessChangedStatus & status); + const DDS::LivelinessChangedStatus& status); - virtual void on_subscription_matched ( + virtual void on_subscription_matched( DDS::DataReader_ptr reader, - const DDS::SubscriptionMatchedStatus & status); + const DDS::SubscriptionMatchedStatus& status); virtual void on_sample_rejected( DDS::DataReader_ptr reader, @@ -45,23 +44,23 @@ class DataReaderListenerImpl DDS::DataReader_ptr reader, const DDS::SampleLostStatus& status); - virtual void on_subscription_disconnected ( + virtual void on_subscription_disconnected( DDS::DataReader_ptr reader, - const ::OpenDDS::DCPS::SubscriptionDisconnectedStatus & status); + const ::OpenDDS::DCPS::SubscriptionDisconnectedStatus& status); - virtual void on_subscription_reconnected ( + virtual void on_subscription_reconnected( DDS::DataReader_ptr reader, - const ::OpenDDS::DCPS::SubscriptionReconnectedStatus & status); + const ::OpenDDS::DCPS::SubscriptionReconnectedStatus& status); - virtual void on_subscription_lost ( + virtual void on_subscription_lost( DDS::DataReader_ptr reader, - const ::OpenDDS::DCPS::SubscriptionLostStatus & status); + const ::OpenDDS::DCPS::SubscriptionLostStatus& status); long num_reads() const { return num_reads_; } - void set_builtin_datareader (DDS::DataReader_ptr builtin); + void set_builtin_datareader(DDS::DataReader_ptr builtin); bool read_bit_instance(); diff --git a/tests/DCPS/BuiltInTopicTest/Writer.cpp b/tests/DCPS/BuiltInTopicTest/Writer.cpp index 71320b1ea77..638fbfdd03e 100644 --- a/tests/DCPS/BuiltInTopicTest/Writer.cpp +++ b/tests/DCPS/BuiltInTopicTest/Writer.cpp @@ -2,9 +2,10 @@ // #include "Writer.h" #include "MessengerTypeSupportC.h" +#include "tests/Utils/ExceptionStreams.h" + #include #include -#include "tests/Utils/ExceptionStreams.h" using namespace std; @@ -12,56 +13,51 @@ const int num_instances_per_writer = 1; extern int num_messages; Writer::Writer(::DDS::DataWriter_ptr writer) -: writer_ (::DDS::DataWriter::_duplicate (writer)), - finished_instances_ (0), - timeout_writes_ (0) + : writer_(::DDS::DataWriter::_duplicate(writer)), + finished_instances_(0), + timeout_writes_(0) { } void -Writer::start () +Writer::start() { ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Writer::start\n"))); // Lanuch num_instances_per_writer threads. // Each thread writes one instance which uses the thread id as the // key value. - if (activate (THR_NEW_LWP | THR_JOINABLE, num_instances_per_writer) == -1) { - ACE_ERROR((LM_ERROR, - ACE_TEXT("(%P|%t) Writer::start: activate failed.\n"))); + if (activate(THR_NEW_LWP | THR_JOINABLE, num_instances_per_writer) == -1) { + ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) Writer::start: activate failed.\n"))); exit(1); } } void -Writer::end () +Writer::end() { - ACE_DEBUG((LM_DEBUG, - ACE_TEXT("(%P|%t) Writer::end\n"))); - wait (); + ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Writer::end\n"))); + wait(); } - int -Writer::svc () +Writer::svc() { - ACE_DEBUG((LM_DEBUG, - ACE_TEXT("(%P|%t) Writer::svc begins.\n"))); + ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Writer::svc begins.\n"))); ::DDS::InstanceHandleSeq handles; //Wait for fully association and then send. - while (1) - { - writer_->get_matched_subscriptions(handles); - if (handles.length() > 0) - break; - else - ACE_OS::sleep(1); - } + while (1) { + writer_->get_matched_subscriptions(handles); + if (handles.length() > 0) + break; + else + ACE_OS::sleep(1); + } try { ::Messenger::MessageDataWriter_var message_dw = ::Messenger::MessageDataWriter::_narrow(writer_.in()); - if (CORBA::is_nil (message_dw.in ())) { + if (CORBA::is_nil(message_dw.in())) { ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) Writer::svc: Data Writer could not be narrowed.\n"))); exit(1); @@ -77,20 +73,20 @@ Writer::svc () message.count = 0; ACE_DEBUG((LM_DEBUG, - ACE_TEXT("(%P|%t) %T Writer::svc starting to write.\n"))); - for (int i = 0; i< num_messages; i ++) { + ACE_TEXT("(%P|%t) %T Writer::svc starting to write.\n"))); + for (int i = 0; i < num_messages; ++i) { ::DDS::ReturnCode_t ret = message_dw->write(message, handle); if (ret != ::DDS::RETCODE_OK) { - ACE_ERROR ((LM_ERROR, - ACE_TEXT("(%P|%t) Writer::svc ERROR, ") - ACE_TEXT ("%dth write() returned %d.\n"), - i, ret)); + ACE_ERROR((LM_ERROR, + ACE_TEXT("(%P|%t) Writer::svc ERROR, ") + ACE_TEXT("%dth write() returned %d.\n"), + i, ret)); if (ret == ::DDS::RETCODE_TIMEOUT) { - timeout_writes_ ++; + ++timeout_writes_; } } - message.count++; + ++message.count; ACE_OS::sleep(1); } @@ -100,31 +96,29 @@ Writer::svc () } // wait for datareader finish. - while (1) - { - writer_->get_matched_subscriptions(handles); - if (handles.length() == 0) - break; - else - ACE_OS::sleep(1); - } + while (1) { + writer_->get_matched_subscriptions(handles); + if (handles.length() == 0) + break; + else + ACE_OS::sleep(1); + } ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Writer::svc finished.\n"))); - - finished_instances_ ++; + ++finished_instances_; return 0; } bool -Writer::is_finished () const +Writer::is_finished() const { return finished_instances_ == num_instances_per_writer; } int -Writer::get_timeout_writes () const +Writer::get_timeout_writes() const { - return timeout_writes_.value (); + return timeout_writes_.value(); } diff --git a/tests/DCPS/BuiltInTopicTest/Writer.h b/tests/DCPS/BuiltInTopicTest/Writer.h index ee7102af7f6..8673bfab7e8 100644 --- a/tests/DCPS/BuiltInTopicTest/Writer.h +++ b/tests/DCPS/BuiltInTopicTest/Writer.h @@ -4,23 +4,23 @@ #define WRITER_H #include + #include -class Writer : public ACE_Task_Base -{ +class Writer : public ACE_Task_Base { public: - Writer (::DDS::DataWriter_ptr writer); + Writer(::DDS::DataWriter_ptr writer); - void start (); + void start(); - void end (); + void end(); /** Lanch a thread to write. **/ - virtual int svc (); + virtual int svc(); - bool is_finished () const; + bool is_finished() const; - int get_timeout_writes () const; + int get_timeout_writes() const; private: ::DDS::DataWriter_var writer_; diff --git a/tests/DCPS/BuiltInTopicTest/common.h b/tests/DCPS/BuiltInTopicTest/common.h new file mode 100644 index 00000000000..e599a58843f --- /dev/null +++ b/tests/DCPS/BuiltInTopicTest/common.h @@ -0,0 +1,34 @@ +#ifndef COMMON_H +#define COMMON_H + +#include "tests/Utils/ExceptionStreams.h" + +#include +#include +#include +#include + +#include +#include + +const char* topic_name = "Movie Discussion List"; +const char* topic_type_name = "Messenger"; + +char PART_USER_DATA[] = "Initial DomainParticipant UserData"; +char DW_USER_DATA[] = "Initial DataWriter UserData"; +char DR_USER_DATA[] = "Initial DataReader UserData"; +char TOPIC_DATA[] = "Initial Topic TopicData"; +char GROUP_DATA[] = "Initial GroupData"; +char UPDATED_PART_USER_DATA[] = "Updated DomainParticipant UserData"; +char UPDATED_DW_USER_DATA[] = "Updated DataWriter UserData"; +char UPDATED_DR_USER_DATA[] = "Updated DataReader UserData"; +char UPDATED_TOPIC_DATA[] = "Updated Topic TopicData"; +char UPDATED_GROUP_DATA[] = "Updated GroupData"; + +ACE_TString synch_dir; +ACE_TCHAR mon1_fname[] = ACE_TEXT("monitor1_done"); +ACE_TCHAR mon2_fname[] = ACE_TEXT("monitor2_done"); + +int num_messages = 10; + +#endif /* COMMON_H */ diff --git a/tests/DCPS/BuiltInTopicTest/monitor.cpp b/tests/DCPS/BuiltInTopicTest/monitor.cpp index 65b87f17781..85d1f269f51 100644 --- a/tests/DCPS/BuiltInTopicTest/monitor.cpp +++ b/tests/DCPS/BuiltInTopicTest/monitor.cpp @@ -1,31 +1,13 @@ -// -*- C++ -*- -// ============================================================================ -/** - * @file subscriber.cpp - * - * - * - */ -// ============================================================================ - - -#include -#include -#include "dds/DdsDcpsInfrastructureC.h" -#include "dds/DdsDcpsInfoUtilsC.h" -#include "dds/DdsDcpsSubscriptionC.h" -#include "dds/DCPS/BuiltInTopicUtils.h" -#include "dds/DCPS/Discovery.h" -#include "dds/DCPS/Service_Participant.h" -#include "dds/DCPS/RTPS/RtpsDiscovery.h" - -#include "dds/DCPS/StaticIncludes.h" - -#include -#include "ace/Get_Opt.h" -#include "ace/OS_NS_unistd.h" -#include "tests/Utils/ExceptionStreams.h" #include "tests/Utils/WaitForSample.h" +#include "common.h" + +#include +#include +#include +#include +#include + +#include using namespace std; @@ -34,19 +16,7 @@ unsigned int num_parts = 3; unsigned int num_topics = 2; unsigned int num_subs = 1; unsigned int num_pubs = 1; -const char* topic_name = "Movie Discussion List"; -const char* topic_type_name = "Messenger"; - -char PART_USER_DATA[] = "Initial DomainParticipant UserData"; -char DW_USER_DATA[] = "Initial DataWriter UserData"; -char DR_USER_DATA[] = "Initial DataReader UserData"; -char TOPIC_DATA[] = "Initial Topic TopicData"; -char GROUP_DATA[] = "Initial GroupData"; -char UPDATED_PART_USER_DATA[] = "Updated DomainParticipant UserData"; -char UPDATED_DW_USER_DATA[] = "Updated DataWriter UserData"; -char UPDATED_DR_USER_DATA[] = "Updated DataReader UserData"; -char UPDATED_TOPIC_DATA[] = "Updated Topic TopicData"; -char UPDATED_GROUP_DATA[] = "Updated GroupData"; +unsigned int dps_with_user_data = 2; char* CUR_PART_USER_DATA = PART_USER_DATA; char* CUR_DW_USER_DATA = DW_USER_DATA; @@ -54,37 +24,30 @@ char* CUR_DR_USER_DATA = DR_USER_DATA; char* CUR_TOPIC_DATA = TOPIC_DATA; char* CUR_GROUP_DATA = GROUP_DATA; -unsigned int dps_with_user_data = 2; -ACE_TString synch_dir; -ACE_TCHAR synch_fname[] = ACE_TEXT("monitor1_done"); - -int -parse_args (int argc, ACE_TCHAR *argv[]) +int parse_args(int argc, ACE_TCHAR *argv[]) { - ACE_Get_Opt get_opts (argc, argv, ACE_TEXT("T:l:d:t:s:p:u")); + ACE_Get_Opt get_opts(argc, argv, ACE_TEXT("T:l:d:t:s:p:u")); int c; - while ((c = get_opts ()) != -1) - { - switch (c) - { + while ((c = get_opts()) != -1) { + switch (c) { case 'T': - synch_dir = get_opts.opt_arg (); + synch_dir = get_opts.opt_arg(); break; case 'l': - delay_before_read_sec = ACE_OS::atoi (get_opts.opt_arg ()); + delay_before_read_sec = ACE_OS::atoi(get_opts.opt_arg()); break; case 'd': - num_parts = ACE_OS::atoi (get_opts.opt_arg ()); + num_parts = ACE_OS::atoi(get_opts.opt_arg()); break; case 't': - num_topics = ACE_OS::atoi (get_opts.opt_arg ()); + num_topics = ACE_OS::atoi(get_opts.opt_arg()); break; case 's': - num_subs = ACE_OS::atoi (get_opts.opt_arg ()); + num_subs = ACE_OS::atoi(get_opts.opt_arg()); break; case 'p': - num_pubs = ACE_OS::atoi (get_opts.opt_arg ()); + num_pubs = ACE_OS::atoi(get_opts.opt_arg()); break; case 'u': // Check with reset qos data. CUR_PART_USER_DATA = UPDATED_PART_USER_DATA; @@ -95,7 +58,7 @@ parse_args (int argc, ACE_TCHAR *argv[]) break; case '?': default: - ACE_ERROR_RETURN ((LM_ERROR, + ACE_ERROR_RETURN((LM_ERROR, "usage: %s " "-l " "-d " @@ -105,7 +68,7 @@ parse_args (int argc, ACE_TCHAR *argv[]) "-u " "-T " "\n", - argv [0]), + argv[0]), -1); } } @@ -114,574 +77,482 @@ parse_args (int argc, ACE_TCHAR *argv[]) } -int ACE_TMAIN (int argc, ACE_TCHAR *argv[]) +int ACE_TMAIN(int argc, ACE_TCHAR *argv[]) { - try - { - ACE_DEBUG ((LM_DEBUG, "(%P|%t) monitor main\n")); + try { + ACE_DEBUG((LM_DEBUG, "(%P|%t) monitor main\n")); - DDS::DomainParticipantFactory_var dpf; - DDS::DomainParticipant_var participant; + DDS::DomainParticipantFactory_var dpf; + DDS::DomainParticipant_var participant; - dpf = TheParticipantFactoryWithArgs(argc, argv); + dpf = TheParticipantFactoryWithArgs(argc, argv); + if (parse_args(argc, argv) == -1) { + return -1; + } - if (parse_args (argc, argv) == -1) { - return -1; + if (CUR_PART_USER_DATA == UPDATED_PART_USER_DATA) { + // wait for Monitor 1 done + FILE* fp = ACE_OS::fopen((synch_dir + mon1_fname).c_str(), ACE_TEXT("r")); + while (fp == 0) { + ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) waiting monitor1 done ...\n"))); + ACE_OS::sleep(1); + fp = ACE_OS::fopen((synch_dir + mon1_fname).c_str(), ACE_TEXT("r")); } - - if (CUR_PART_USER_DATA == UPDATED_PART_USER_DATA) - { - // wait for Monitor 1 done - FILE* fp = ACE_OS::fopen ((synch_dir + synch_fname).c_str (), ACE_TEXT("r")); - int i = 0; - while (fp == 0 && i < 15) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT("(%P|%t) waiting monitor1 done ...\n"))); - ACE_OS::sleep (1); - ++ i; - fp = ACE_OS::fopen ((synch_dir + synch_fname).c_str (), ACE_TEXT("r")); - } - - if (fp != 0) - ACE_OS::fclose (fp); + if (fp != 0) { + ACE_OS::fclose(fp); } + } - participant = dpf->create_participant(111, - PARTICIPANT_QOS_DEFAULT, - DDS::DomainParticipantListener::_nil(), - ::OpenDDS::DCPS::DEFAULT_STATUS_MASK); - if (CORBA::is_nil (participant.in ())) { - ACE_ERROR((LM_ERROR, "(%P|%t) monitor: create_participant failed.\n")); - return 1 ; - } + participant = dpf->create_participant(111, + PARTICIPANT_QOS_DEFAULT, + DDS::DomainParticipantListener::_nil(), + ::OpenDDS::DCPS::DEFAULT_STATUS_MASK); + if (!participant) { + ACE_ERROR((LM_ERROR, "(%P|%t) monitor: create_participant failed.\n")); + return 1; + } - OpenDDS::DCPS::Discovery_rch disc = - TheServiceParticipant->get_discovery(participant->get_domain_id()); + OpenDDS::DCPS::Discovery_rch disc = + TheServiceParticipant->get_discovery(participant->get_domain_id()); - OpenDDS::DCPS::DomainParticipantImpl* part_svt - = dynamic_cast(participant.in()); + OpenDDS::DCPS::DomainParticipantImpl* part_svt + = dynamic_cast(participant.in()); - if (!part_svt) { - ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) monitor: ") - ACE_TEXT("failed to obtain DomainParticipantImpl.\n"))); - return 1; - } - - const bool ignoredEntitiesAreInBIT = - !dynamic_cast(disc.in()); - const bool ownEntitiesAreInBIT = ignoredEntitiesAreInBIT; + if (!part_svt) { + ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) monitor: ") + ACE_TEXT("failed to obtain DomainParticipantImpl.\n"))); + return 1; + } - // give time for BIT datareader/datawriter fully association. - ACE_OS::sleep(3); + const bool ignoredEntitiesAreInBIT = !dynamic_cast(disc.in()); + const bool ownEntitiesAreInBIT = ignoredEntitiesAreInBIT; - if (delay_before_read_sec > 0) { - ACE_DEBUG((LM_DEBUG,"(%P|%t) monitor: SLEEPING BEFORE READING!\n")); - ACE_OS::sleep (delay_before_read_sec); - } + // give time for BIT datareader/datawriter fully association. + ACE_OS::sleep(3); - ::DDS::Subscriber_var bit_subscriber - = participant->get_builtin_subscriber () ; + if (delay_before_read_sec > 0) { + ACE_DEBUG((LM_DEBUG,"(%P|%t) monitor: SLEEPING BEFORE READING!\n")); + ACE_OS::sleep(delay_before_read_sec); + } - ::DDS::DataReader_var part_rdr = - bit_subscriber->lookup_datareader (OpenDDS::DCPS::BUILT_IN_PARTICIPANT_TOPIC) ; + ::DDS::Subscriber_var bit_subscriber = participant->get_builtin_subscriber(); + ::DDS::DataReader_var part_rdr = bit_subscriber->lookup_datareader(OpenDDS::DCPS::BUILT_IN_PARTICIPANT_TOPIC); - ::DDS::ParticipantBuiltinTopicDataDataReader_var part_reader - = ::DDS::ParticipantBuiltinTopicDataDataReader::_narrow (part_rdr.in ()); - if (CORBA::is_nil (part_reader.in ())) - { - ACE_ERROR((LM_ERROR, "(%P|%t) monitor: failed to get BUILT_IN_PARTICIPANT_TOPIC datareader.\n")); - return 1; - } + ::DDS::ParticipantBuiltinTopicDataDataReader_var part_reader + = ::DDS::ParticipantBuiltinTopicDataDataReader::_narrow(part_rdr.in()); + if (!part_reader) { + ACE_ERROR((LM_ERROR, "(%P|%t) monitor: failed to get BUILT_IN_PARTICIPANT_TOPIC datareader.\n")); + return 1; + } #ifndef OPENDDS_SAFETY_PROFILE - // BUILT_IN_TOPIC_TOPIC unsupported in SAFETY PROFILE with RTPS - ::DDS::DataReader_var topic_rdr = - bit_subscriber->lookup_datareader (OpenDDS::DCPS::BUILT_IN_TOPIC_TOPIC); - ::DDS::TopicBuiltinTopicDataDataReader_var topic_reader - = ::DDS::TopicBuiltinTopicDataDataReader::_narrow (topic_rdr.in ()); - if (CORBA::is_nil (topic_reader.in ())) - { - ACE_ERROR((LM_ERROR, "(%P|%t) monitor: failed to get BUILT_IN_TOPIC_TOPIC datareader.\n")); - return 1; - } + // BUILT_IN_TOPIC_TOPIC unsupported in SAFETY PROFILE with RTPS + ::DDS::DataReader_var topic_rdr = + bit_subscriber->lookup_datareader(OpenDDS::DCPS::BUILT_IN_TOPIC_TOPIC); + ::DDS::TopicBuiltinTopicDataDataReader_var topic_reader + = ::DDS::TopicBuiltinTopicDataDataReader::_narrow(topic_rdr.in()); + if (!topic_reader) { + ACE_ERROR((LM_ERROR, "(%P|%t) monitor: failed to get BUILT_IN_TOPIC_TOPIC datareader.\n")); + return 1; + } #endif - ::DDS::ReturnCode_t ret; - CORBA::ULong len = 0; + ::DDS::ReturnCode_t ret; + CORBA::ULong len = 0; - ::DDS::DataReader_var subscription_rdr = - bit_subscriber->lookup_datareader (OpenDDS::DCPS::BUILT_IN_SUBSCRIPTION_TOPIC) ; - ::DDS::SubscriptionBuiltinTopicDataDataReader_var sub_reader - = ::DDS::SubscriptionBuiltinTopicDataDataReader::_narrow (subscription_rdr.in ()); - if (CORBA::is_nil (sub_reader.in ())) - { - ACE_ERROR((LM_ERROR, "(%P|%t) monitor: failed to get BUILT_IN_SUBSCRIPTION_TOPIC datareader.\n")); - return 1; - } - - ACE_DEBUG((LM_DEBUG, "(%P|%t) monitor: waiting for subscription sample\n")); - Utils::waitForSample(subscription_rdr); - - ::DDS::DataReader_var publication_rdr = - bit_subscriber->lookup_datareader (OpenDDS::DCPS::BUILT_IN_PUBLICATION_TOPIC) ; - ::DDS::PublicationBuiltinTopicDataDataReader_var pub_reader - = ::DDS::PublicationBuiltinTopicDataDataReader::_narrow (publication_rdr.in ()); - if (CORBA::is_nil (pub_reader.in ())) - { - ACE_ERROR((LM_ERROR, "(%P|%t) monitor: failed to get BUILT_IN_PUBLICATION_TOPIC datareader.\n")); - return 1; - } + ::DDS::DataReader_var subscription_rdr = + bit_subscriber->lookup_datareader(OpenDDS::DCPS::BUILT_IN_SUBSCRIPTION_TOPIC); + ::DDS::SubscriptionBuiltinTopicDataDataReader_var sub_reader + = ::DDS::SubscriptionBuiltinTopicDataDataReader::_narrow(subscription_rdr.in()); + if (!sub_reader) { + ACE_ERROR((LM_ERROR, "(%P|%t) monitor: failed to get BUILT_IN_SUBSCRIPTION_TOPIC datareader.\n")); + return 1; + } - ACE_DEBUG((LM_DEBUG, "(%P|%t) monitor: waiting for publication sample\n")); - Utils::waitForSample(publication_rdr); - - ::DDS::SampleInfoSeq pubinfos(10); - ::DDS::PublicationBuiltinTopicDataSeq pubdata(10); - ret = pub_reader->read (pubdata, - pubinfos, - 10, - ::DDS::ANY_SAMPLE_STATE, - ::DDS::ANY_VIEW_STATE, - ::DDS::ALIVE_INSTANCE_STATE); - - if (ret != ::DDS::RETCODE_OK && ret != ::DDS::RETCODE_NO_DATA) - { - ACE_ERROR_RETURN ((LM_ERROR, - "(%P|%t) monitor: failed to read BIT publication data.\n"), - 1); - } + ACE_DEBUG((LM_DEBUG, "(%P|%t) monitor: waiting for subscription sample\n")); + Utils::waitForSample(subscription_rdr); - len = pubdata.length (); - bool pubWasIgnored = false; - if (len != num_pubs) - { - if (!ignoredEntitiesAreInBIT && len == num_pubs - 1) - { - pubWasIgnored = true; - ACE_DEBUG((LM_INFO, "(%P|%t) monitor: pub assumed to be ignored\n")); - } - else - { - ACE_ERROR_RETURN((LM_ERROR, "(%P|%t) monitor: read %d BIT pub data," - "expected %d pubs.\n", len, num_pubs), - 1); - } - } + ::DDS::DataReader_var publication_rdr = + bit_subscriber->lookup_datareader(OpenDDS::DCPS::BUILT_IN_PUBLICATION_TOPIC); + ::DDS::PublicationBuiltinTopicDataDataReader_var pub_reader + = ::DDS::PublicationBuiltinTopicDataDataReader::_narrow(publication_rdr.in()); + if (!pub_reader) { + ACE_ERROR((LM_ERROR, "(%P|%t) monitor: failed to get BUILT_IN_PUBLICATION_TOPIC datareader.\n")); + return 1; + } - CORBA::ULong num_dws_with_data = 0; - for (CORBA::ULong i = 0; i < len; ++i) - { - if (ACE_OS::strcmp (pubdata[i].topic_name.in (), topic_name) != 0) - { - ACE_ERROR_RETURN ((LM_ERROR, - "(%P|%t) monitor: got datawriter topic name \"%C\", expected topic name \"%C\"\n", - pubdata[i].topic_name.in (), topic_name), - 1); - } - if (ACE_OS::strcmp (pubdata[i].type_name.in (), topic_type_name) != 0) - { - ACE_ERROR_RETURN ((LM_ERROR, - "(%P|%t) monitor: got datawriter topic type name \"%C\", expected topic type name \"%C\"\n", - pubdata[i].type_name.in (), topic_type_name), - 1); - } + ACE_DEBUG((LM_DEBUG, "(%P|%t) monitor: waiting for publication sample\n")); + Utils::waitForSample(publication_rdr); + + ::DDS::SampleInfoSeq pubinfos(10); + ::DDS::PublicationBuiltinTopicDataSeq pubdata(10); + ret = pub_reader->read(pubdata, + pubinfos, + 10, + ::DDS::ANY_SAMPLE_STATE, + ::DDS::ANY_VIEW_STATE, + ::DDS::ALIVE_INSTANCE_STATE); + + if (ret != ::DDS::RETCODE_OK && ret != ::DDS::RETCODE_NO_DATA) { + ACE_ERROR_RETURN((LM_ERROR, "(%P|%t) monitor: failed to read BIT publication data.\n"), + 1); + } - ACE_DEBUG((LM_DEBUG, "(%P|%t) monitor: DataWriter: key = %d, %x, %x.\n", - pubdata[i].key.value[0], pubdata[i].key.value[1], pubdata[i].key.value[2])); - - CORBA::ULong user_data_len = static_cast(ACE_OS::strlen (CUR_DW_USER_DATA)); - CORBA::ULong topic_data_len = static_cast(ACE_OS::strlen (CUR_TOPIC_DATA)); - CORBA::ULong group_data_len = static_cast(ACE_OS::strlen (CUR_GROUP_DATA)); - - if (pubdata[i].user_data.value.length () == user_data_len - && pubdata[i].topic_data.value.length () == topic_data_len - && pubdata[i].group_data.value.length () == group_data_len) - { - if (ACE_OS::strncmp (reinterpret_cast (pubdata[i].user_data.value.get_buffer()), CUR_DW_USER_DATA, user_data_len) == 0 - && ACE_OS::strncmp (reinterpret_cast (pubdata[i].topic_data.value.get_buffer()), CUR_TOPIC_DATA, topic_data_len) == 0 - && ACE_OS::strncmp (reinterpret_cast (pubdata[i].group_data.value.get_buffer()), CUR_GROUP_DATA, group_data_len) == 0) - { - ++ num_dws_with_data; - } - } + len = pubdata.length(); + bool pubWasIgnored = false; + if (len != num_pubs) { + if (!ignoredEntitiesAreInBIT && len == num_pubs - 1) { + pubWasIgnored = true; + ACE_DEBUG((LM_INFO, "(%P|%t) monitor: pub assumed to be ignored\n")); + } else { + ACE_ERROR_RETURN((LM_ERROR, "(%P|%t) monitor: read %d BIT pub data," + "expected %d pubs.\n", len, num_pubs), + 1); } + } - if (num_dws_with_data == len) - { - ACE_DEBUG((LM_DEBUG, "(%P|%t) monitor: DataWriter changeable qos test PASSED.\n")); + CORBA::ULong num_dws_with_data = 0; + for (CORBA::ULong i = 0; i < len; ++i) { + if (ACE_OS::strcmp(pubdata[i].topic_name.in(), topic_name) != 0) { + ACE_ERROR_RETURN((LM_ERROR, + "(%P|%t) monitor: got datawriter topic name \"%C\", expected topic name \"%C\"\n", + pubdata[i].topic_name.in(), topic_name), 1); } - else - { - ACE_ERROR_RETURN ((LM_ERROR, - "(%P|%t) monitor: DataWriter changeable qos test FAILED.\n"), - 1); + if (ACE_OS::strcmp(pubdata[i].type_name.in(), topic_type_name) != 0) { + ACE_ERROR_RETURN((LM_ERROR, + "(%P|%t) monitor: got datawriter topic type name \"%C\", expected topic type name \"%C\"\n", + pubdata[i].type_name.in(), topic_type_name), 1); } - ::DDS::SampleInfoSeq subinfos(10); - ::DDS::SubscriptionBuiltinTopicDataSeq subdata(10); - ret = sub_reader->read (subdata, - subinfos, - 10, - ::DDS::ANY_SAMPLE_STATE, - ::DDS::ANY_VIEW_STATE, - ::DDS::ALIVE_INSTANCE_STATE); - - if (ret != ::DDS::RETCODE_OK && ret != ::DDS::RETCODE_NO_DATA) - { - ACE_ERROR_RETURN ((LM_ERROR, - "(%P|%t) monitor: failed to read BIT subsciption data.\n"), - 1); - } + ACE_DEBUG((LM_DEBUG, "(%P|%t) monitor: DataWriter: key = %d, %x, %x.\n", + pubdata[i].key.value[0], pubdata[i].key.value[1], pubdata[i].key.value[2])); - len = subdata.length (); + CORBA::ULong user_data_len = static_cast(ACE_OS::strlen(CUR_DW_USER_DATA)); + CORBA::ULong topic_data_len = static_cast(ACE_OS::strlen(CUR_TOPIC_DATA)); + CORBA::ULong group_data_len = static_cast(ACE_OS::strlen(CUR_GROUP_DATA)); - if (len != num_subs) - { - if (!pubWasIgnored && !ignoredEntitiesAreInBIT && len == num_subs - 1) - { - ACE_DEBUG((LM_INFO, "(%P|%t) monitor: sub assumed to be ignored\n")); - } - else - { - ACE_ERROR_RETURN((LM_ERROR, - "(%P|%t) monitor: read %d BIT sub data, " - "expected %d subs.\n", len, num_subs), - 1); + if (pubdata[i].user_data.value.length() == user_data_len + && pubdata[i].topic_data.value.length() == topic_data_len + && pubdata[i].group_data.value.length() == group_data_len) { + if (ACE_OS::strncmp(reinterpret_cast(pubdata[i].user_data.value.get_buffer()), CUR_DW_USER_DATA, user_data_len) == 0 + && ACE_OS::strncmp(reinterpret_cast(pubdata[i].topic_data.value.get_buffer()), CUR_TOPIC_DATA, topic_data_len) == 0 + && ACE_OS::strncmp(reinterpret_cast(pubdata[i].group_data.value.get_buffer()), CUR_GROUP_DATA, group_data_len) == 0) { + ++num_dws_with_data; } } + } - CORBA::ULong num_drs_with_data = 0; - for (CORBA::ULong i = 0; i < len; ++i) - { - if (ACE_OS::strcmp (subdata[i].topic_name.in (), topic_name) != 0) - { - ACE_ERROR_RETURN ((LM_ERROR, - "(%P|%t) monitor: got datareader topic name \"%C\", expected topic name \"%C\"\n", - subdata[i].topic_name.in (), topic_name), - 1); - } - if (ACE_OS::strcmp (subdata[i].type_name.in (), topic_type_name) != 0) - { - ACE_ERROR_RETURN ((LM_ERROR, - "(%P|%t) monitor: got datareader topic type name \"%C\", expected topic type name \"%C\"\n", - subdata[i].type_name.in (), topic_type_name), - 1); - } + if (num_dws_with_data == len) { + ACE_DEBUG((LM_DEBUG, "(%P|%t) monitor: DataWriter changeable qos test PASSED.\n")); + } else { + ACE_ERROR_RETURN((LM_ERROR, "(%P|%t) monitor: DataWriter changeable qos test FAILED.\n"), 1); + } - ACE_DEBUG((LM_DEBUG, "(%P|%t) monitor: DataReader: key = %d, %x, %x\n", - subdata[i].key.value[0], subdata[i].key.value[1], subdata[i].key.value[2])); - - CORBA::ULong user_data_len = static_cast(ACE_OS::strlen (CUR_DR_USER_DATA)); - CORBA::ULong topic_data_len = static_cast(ACE_OS::strlen (CUR_TOPIC_DATA)); - CORBA::ULong group_data_len = static_cast(ACE_OS::strlen (CUR_GROUP_DATA)); - - if (subdata[i].user_data.value.length () == user_data_len - && subdata[i].topic_data.value.length () == topic_data_len - && subdata[i].group_data.value.length () == group_data_len) - { - if (ACE_OS::strncmp (reinterpret_cast (subdata[i].user_data.value.get_buffer()), CUR_DR_USER_DATA, user_data_len) == 0 - && ACE_OS::strncmp (reinterpret_cast (subdata[i].topic_data.value.get_buffer()), CUR_TOPIC_DATA, topic_data_len) == 0 - && ACE_OS::strncmp (reinterpret_cast (subdata[i].group_data.value.get_buffer()), CUR_GROUP_DATA, group_data_len) == 0) - { - ++ num_drs_with_data; - } - } + ::DDS::SampleInfoSeq subinfos(10); + ::DDS::SubscriptionBuiltinTopicDataSeq subdata(10); + ret = sub_reader->read(subdata, + subinfos, + 10, + ::DDS::ANY_SAMPLE_STATE, + ::DDS::ANY_VIEW_STATE, + ::DDS::ALIVE_INSTANCE_STATE); + + if (ret != ::DDS::RETCODE_OK && ret != ::DDS::RETCODE_NO_DATA) { + ACE_ERROR_RETURN((LM_ERROR, "(%P|%t) monitor: failed to read BIT subsciption data.\n"), 1); + } + + len = subdata.length(); + + if (len != num_subs) { + if (!pubWasIgnored && !ignoredEntitiesAreInBIT && len == num_subs - 1) { + ACE_DEBUG((LM_INFO, "(%P|%t) monitor: sub assumed to be ignored\n")); + } else { + ACE_ERROR_RETURN((LM_ERROR, + "(%P|%t) monitor: read %d BIT sub data, " + "expected %d subs.\n", len, num_subs), 1); } + } - if (num_drs_with_data == len) - { - ACE_DEBUG((LM_DEBUG, "(%P|%t) monitor: DataReader changeable qos test PASSED.\n")); + CORBA::ULong num_drs_with_data = 0; + for (CORBA::ULong i = 0; i < len; ++i) { + if (ACE_OS::strcmp(subdata[i].topic_name.in(), topic_name) != 0) { + ACE_ERROR_RETURN((LM_ERROR, + "(%P|%t) monitor: got datareader topic name \"%C\", expected topic name \"%C\"\n", + subdata[i].topic_name.in(), topic_name), 1); } - else - { - ACE_ERROR_RETURN ((LM_ERROR, - "(%P|%t) monitor: DataReader changeable qos test FAILED.\n"), - 1); + if (ACE_OS::strcmp(subdata[i].type_name.in(), topic_type_name) != 0) { + ACE_ERROR_RETURN((LM_ERROR, + "(%P|%t) monitor: got datareader topic type name \"%C\", expected topic type name \"%C\"\n", + subdata[i].type_name.in(), topic_type_name), 1); } - // wait before checking discovered participants - ACE_OS::sleep(1); + ACE_DEBUG((LM_DEBUG, "(%P|%t) monitor: DataReader: key = %d, %x, %x\n", + subdata[i].key.value[0], subdata[i].key.value[1], subdata[i].key.value[2])); - { - ::DDS::InstanceHandleSeq handles; - const DDS::ReturnCode_t discpart_error = - participant->get_discovered_participants(handles); - if (discpart_error) { - ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: monitor: get_discovered_participants failed: %C\n", - OpenDDS::DCPS::retcode_to_string(discpart_error))); - return 1; - } - if (handles.length() == 0) { - ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: monitor: get_discovered_participants gave no handles, " - "but we expected some\n")); - return 1; - } + CORBA::ULong user_data_len = static_cast(ACE_OS::strlen(CUR_DR_USER_DATA)); + CORBA::ULong topic_data_len = static_cast(ACE_OS::strlen(CUR_TOPIC_DATA)); + CORBA::ULong group_data_len = static_cast(ACE_OS::strlen(CUR_GROUP_DATA)); - CORBA::ULong len = handles.length (); - if (len != num_parts - 1) - { - ACE_ERROR_RETURN ((LM_ERROR, - "(%P|%t) ERROR: monitor: get_discovered_participants expected %d got %d.\n", - num_parts, len), - 1); + if (subdata[i].user_data.value.length() == user_data_len + && subdata[i].topic_data.value.length() == topic_data_len + && subdata[i].group_data.value.length() == group_data_len) { + if (ACE_OS::strncmp(reinterpret_cast(subdata[i].user_data.value.get_buffer()), CUR_DR_USER_DATA, user_data_len) == 0 + && ACE_OS::strncmp(reinterpret_cast(subdata[i].topic_data.value.get_buffer()), CUR_TOPIC_DATA, topic_data_len) == 0 + && ACE_OS::strncmp(reinterpret_cast(subdata[i].group_data.value.get_buffer()), CUR_GROUP_DATA, group_data_len) == 0) { + ++num_drs_with_data; } + } + } - for (CORBA::ULong i = 0; i < len; ++ i) - { - ACE_DEBUG((LM_DEBUG, - ACE_TEXT("(%P|%t) monitor: participant %d examining participant handle %d.\n"), - participant->get_instance_handle(), - handles[i] - )); - ::DDS::ParticipantBuiltinTopicData data; - participant->get_discovered_participant_data(data, handles[i]); - - OpenDDS::DCPS::RepoId id = OpenDDS::DCPS::bit_key_to_repo_id(data.key); - - if (part_svt->lookup_handle(id) != handles[i]) - { - ACE_ERROR((LM_ERROR, "(%P|%t) monitor: get_discovered_participant_data test failed.\n")); - return 1; - } - } + if (num_drs_with_data == len) { + ACE_DEBUG((LM_DEBUG, "(%P|%t) monitor: DataReader changeable qos test PASSED.\n")); + } else { + ACE_ERROR_RETURN((LM_ERROR, "(%P|%t) monitor: DataReader changeable qos test FAILED.\n"), 1); + } - if (participant->ignore_participant (handles[0]) != ::DDS::RETCODE_OK) - { - ACE_ERROR((LM_ERROR, "(%P|%t) monitor: ignore_participant failed.\n")); + { + // Wait for the expected number of participants to be discovered. + ::DDS::InstanceHandleSeq handles; + while (handles.length() < num_parts - 1) { + ACE_OS::sleep(1); + handles.length(0); + const DDS::ReturnCode_t discpart_error = participant->get_discovered_participants(handles); + if (discpart_error != DDS::RETCODE_OK) { + ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: monitor: get_discovered_participants failed: %C\n", + OpenDDS::DCPS::retcode_to_string(discpart_error))); return 1; } + } + + for (CORBA::ULong i = 0; i < handles.length(); ++i) { + ACE_DEBUG((LM_DEBUG, + ACE_TEXT("(%P|%t) monitor: participant %d examining participant handle %d.\n"), + participant->get_instance_handle(), handles[i])); + ::DDS::ParticipantBuiltinTopicData data; + participant->get_discovered_participant_data(data, handles[i]); - handles.length (0); - if (participant->get_discovered_participants (handles) != ::DDS::RETCODE_OK - || handles.length () != num_parts - 2) - { - ACE_ERROR((LM_ERROR, ACE_TEXT ("(%P|%t) monitor: get_discovered_participant ") - ACE_TEXT ("skip ignored participant test failed.\n"))); + OpenDDS::DCPS::RepoId id = OpenDDS::DCPS::bit_key_to_repo_id(data.key); + + if (part_svt->lookup_handle(id) != handles[i]) { + ACE_ERROR((LM_ERROR, "(%P|%t) monitor: get_discovered_participant_data test failed.\n")); return 1; } + } - if (!ignoredEntitiesAreInBIT) --num_parts; + if (participant->ignore_participant(handles[0]) != ::DDS::RETCODE_OK) { + ACE_ERROR((LM_ERROR, "(%P|%t) monitor: ignore_participant failed.\n")); + return 1; + } - ACE_DEBUG((LM_DEBUG, "(%P|%t) monitor: discover participants test PASSED.\n")); + handles.length(0); + if (participant->get_discovered_participants(handles) != ::DDS::RETCODE_OK + || handles.length() != num_parts - 2) { + ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) monitor: get_discovered_participant ") + ACE_TEXT("skip ignored participant test failed.\n"))); + return 1; } - ::DDS::ParticipantBuiltinTopicDataSeq partdata; - const CORBA::ULong expected_part_count = num_parts - !ownEntitiesAreInBIT; - - while (partdata.length() != expected_part_count) { - // Do not constantly poll. - const ACE_Time_Value delay(0, 100000); // 100ms - ACE_OS::sleep(delay); - ACE_DEBUG((LM_DEBUG, "(%P|%t) monitor: waiting for participant sample expected %u have %u\n", expected_part_count, partdata.length())); - Utils::waitForSample(part_rdr); - ::DDS::SampleInfoSeq pinfos(10); - ::DDS::ParticipantBuiltinTopicDataSeq pdata(10); - ret = part_reader->read(pdata, - pinfos, - 10, - ::DDS::ANY_SAMPLE_STATE, - ::DDS::ANY_VIEW_STATE, - ::DDS::ALIVE_INSTANCE_STATE); - - if (ret != ::DDS::RETCODE_OK && ret != ::DDS::RETCODE_NO_DATA) { - ACE_ERROR_RETURN ((LM_ERROR, - "(%P|%t) monitor: failed to read BIT participant data.\n"), - 1); - } + if (!ignoredEntitiesAreInBIT) { + --num_parts; + } - for (CORBA::ULong i = 0; i < pdata.length(); ++i) { - const CORBA::ULong idx = partdata.length(); - partdata.length(idx + 1); - partdata[idx] = pdata[i]; - } + ACE_DEBUG((LM_DEBUG, "(%P|%t) monitor: discover participants test PASSED.\n")); + } + + ::DDS::ParticipantBuiltinTopicDataSeq partdata; + const CORBA::ULong expected_part_count = num_parts - !ownEntitiesAreInBIT; + + while (partdata.length() != expected_part_count) { + // Do not constantly poll. + const ACE_Time_Value delay(0, 100000); // 100ms + ACE_OS::sleep(delay); + ACE_DEBUG((LM_DEBUG, "(%P|%t) monitor: waiting for participant sample expected %u have %u\n", + expected_part_count, partdata.length())); + Utils::waitForSample(part_rdr); + ::DDS::SampleInfoSeq pinfos(10); + ::DDS::ParticipantBuiltinTopicDataSeq pdata(10); + ret = part_reader->read(pdata, + pinfos, + 10, + ::DDS::ANY_SAMPLE_STATE, + ::DDS::ANY_VIEW_STATE, + ::DDS::ALIVE_INSTANCE_STATE); + + if (ret != ::DDS::RETCODE_OK && ret != ::DDS::RETCODE_NO_DATA) { + ACE_ERROR_RETURN((LM_ERROR, + "(%P|%t) monitor: failed to read BIT participant data.\n"), + 1); } - CORBA::ULong cur_dps_with_user_data = 0; - CORBA::ULong user_data_len = static_cast(ACE_OS::strlen (CUR_PART_USER_DATA)); + for (CORBA::ULong i = 0; i < pdata.length(); ++i) { + const CORBA::ULong idx = partdata.length(); + partdata.length(idx + 1); + partdata[idx] = pdata[i]; + } + } - for (CORBA::ULong i = 0; i < partdata.length(); ++i) - { - ACE_DEBUG((LM_DEBUG, "(%P|%t) monitor: Participant: key = %d, %x, %x\n", - partdata[i].key.value[0], partdata[i].key.value[1], partdata[i].key.value[2])); + CORBA::ULong cur_dps_with_user_data = 0; + CORBA::ULong user_data_len = static_cast(ACE_OS::strlen(CUR_PART_USER_DATA)); - CORBA::ULong cur_len = partdata[i].user_data.value.length (); + for (CORBA::ULong i = 0; i < partdata.length(); ++i) { + ACE_DEBUG((LM_DEBUG, "(%P|%t) monitor: Participant: key = %d, %x, %x\n", + partdata[i].key.value[0], partdata[i].key.value[1], partdata[i].key.value[2])); - if ((cur_len == user_data_len) - && (ACE_OS::strncmp (reinterpret_cast (partdata[i].user_data.value.get_buffer()), - CUR_PART_USER_DATA, - user_data_len) == 0)) - { - ++cur_dps_with_user_data; - } - } + CORBA::ULong cur_len = partdata[i].user_data.value.length(); - if (cur_dps_with_user_data == dps_with_user_data - !ignoredEntitiesAreInBIT) - { - ACE_DEBUG((LM_DEBUG, "(%P|%t) monitor: DomainParticipant changeable qos test PASSED.\n")); - } - else - { - ACE_ERROR_RETURN ((LM_ERROR, - "(%P|%t) monitor: DomainParticipant changeable qos test FAILED.\n"), - 1); + if (cur_len == user_data_len && + ACE_OS::strncmp(reinterpret_cast(partdata[i].user_data.value.get_buffer()), + CUR_PART_USER_DATA, + user_data_len) == 0) { + ++cur_dps_with_user_data; } + } -#ifndef OPENDDS_SAFETY_PROFILE - { - ACE_DEBUG((LM_DEBUG, "(%P|%t) monitor: waiting for topic sample\n")); - Utils::waitForSample(topic_rdr); - ::DDS::InstanceHandleSeq handles; - if (participant->get_discovered_topics (handles) != ::DDS::RETCODE_OK) - { - ACE_ERROR((LM_ERROR, "(%P|%t) monitor: get_discovered_topics test failed.\n")); - return 1; - } + if (cur_dps_with_user_data == dps_with_user_data - !ignoredEntitiesAreInBIT) { + ACE_DEBUG((LM_DEBUG, "(%P|%t) monitor: DomainParticipant changeable qos test PASSED.\n")); + } else { + ACE_ERROR_RETURN((LM_ERROR, + "(%P|%t) monitor: DomainParticipant changeable qos test FAILED.\n"), + 1); + } - CORBA::ULong len = handles.length (); - if (len != num_topics) - { - ACE_ERROR_RETURN ((LM_ERROR, - "(%P|%t) monitor: ERROR: get_discovered_topics expected %d got %d.\n", - num_topics, len), - 1); - } +#ifndef OPENDDS_SAFETY_PROFILE + { + ACE_DEBUG((LM_DEBUG, "(%P|%t) monitor: waiting for topic sample\n")); + Utils::waitForSample(topic_rdr); + ::DDS::InstanceHandleSeq handles; + if (participant->get_discovered_topics(handles) != ::DDS::RETCODE_OK) { + ACE_ERROR((LM_ERROR, "(%P|%t) monitor: get_discovered_topics test failed.\n")); + return 1; + } - for (CORBA::ULong i = 0; i < len; ++ i) - { - ::DDS::TopicBuiltinTopicData data; - participant->get_discovered_topic_data(data, handles[i]); + CORBA::ULong len = handles.length(); + if (len != num_topics) { + ACE_ERROR_RETURN((LM_ERROR, + "(%P|%t) monitor: ERROR: get_discovered_topics expected %d got %d.\n", + num_topics, len), 1); + } - OpenDDS::DCPS::Discovery_rch disc = - TheServiceParticipant->get_discovery(participant->get_domain_id()); - OpenDDS::DCPS::RepoId id = OpenDDS::DCPS::bit_key_to_repo_id(data.key); + for (CORBA::ULong i = 0; i < len; ++i) { + ::DDS::TopicBuiltinTopicData data; + participant->get_discovered_topic_data(data, handles[i]); - if (part_svt->lookup_handle(id) != handles[i]) - { - ACE_ERROR((LM_ERROR, "(%P|%t) monitor: get_discovered_topic_data test failed.\n")); - return 1; - } - } + OpenDDS::DCPS::Discovery_rch disc = + TheServiceParticipant->get_discovery(participant->get_domain_id()); + OpenDDS::DCPS::RepoId id = OpenDDS::DCPS::bit_key_to_repo_id(data.key); - if (len && participant->ignore_topic(handles[0]) != ::DDS::RETCODE_OK) - { - ACE_ERROR((LM_ERROR, "(%P|%t) monitor: ignore_topic failed.\n")); + if (part_svt->lookup_handle(id) != handles[i]) { + ACE_ERROR((LM_ERROR, "(%P|%t) monitor: get_discovered_topic_data test failed.\n")); return 1; } + } - handles.length (0); - if (len && - (participant->get_discovered_topics(handles) != ::DDS::RETCODE_OK - || handles.length() != num_topics - 1)) - { - ACE_ERROR((LM_ERROR, "(%P|%t) monitor: get_discovered_topics " - "skip ignored topic test failed with len = %d, " - "handles.length() = %d, num_topics = %d\n", - len, handles.length(), num_topics)); - return 1; - } + if (len && participant->ignore_topic(handles[0]) != ::DDS::RETCODE_OK) { + ACE_ERROR((LM_ERROR, "(%P|%t) monitor: ignore_topic failed.\n")); + return 1; + } - ACE_DEBUG((LM_DEBUG, "(%P|%t) monitor: discover topics test PASSED.\n")); + handles.length(0); + if (len && (participant->get_discovered_topics(handles) != ::DDS::RETCODE_OK + || handles.length() != num_topics - 1)) { + ACE_ERROR((LM_ERROR, "(%P|%t) monitor: get_discovered_topics " + "skip ignored topic test failed with len = %d, " + "handles.length() = %d, num_topics = %d\n", + len, handles.length(), num_topics)); + return 1; } - ::DDS::SampleInfoSeq topicinfos(10); - ::DDS::TopicBuiltinTopicDataSeq topicdata(10); - ACE_DEBUG((LM_DEBUG, "(%P|%t) monitor: waiting for topic sample\n")); - Utils::waitForSample(topic_rdr); - ret = topic_reader->read (topicdata, - topicinfos, - 10, - ::DDS::ANY_SAMPLE_STATE, - ::DDS::ANY_VIEW_STATE, - ::DDS::ANY_INSTANCE_STATE); - - if (ret != ::DDS::RETCODE_OK && ret != ::DDS::RETCODE_NO_DATA) - { - ACE_ERROR_RETURN ((LM_ERROR, - "(%P|%t) monitor: failed to read BIT topic data.\n"), - 1); - } + ACE_DEBUG((LM_DEBUG, "(%P|%t) monitor: discover topics test PASSED.\n")); + } + + ::DDS::SampleInfoSeq topicinfos(10); + ::DDS::TopicBuiltinTopicDataSeq topicdata(10); + ACE_DEBUG((LM_DEBUG, "(%P|%t) monitor: waiting for topic sample\n")); + Utils::waitForSample(topic_rdr); + ret = topic_reader->read(topicdata, + topicinfos, + 10, + ::DDS::ANY_SAMPLE_STATE, + ::DDS::ANY_VIEW_STATE, + ::DDS::ANY_INSTANCE_STATE); + + if (ret != ::DDS::RETCODE_OK && ret != ::DDS::RETCODE_NO_DATA) { + ACE_ERROR_RETURN((LM_ERROR, "(%P|%t) monitor: failed to read BIT topic data.\n"), + 1); + } - len = topicdata.length (); + len = topicdata.length(); + if (len != num_topics) { + ACE_ERROR_RETURN((LM_ERROR, + "(%P|%t) monitor: ERROR: read %d BIT topic data, expected %d topics.\n", + len, num_topics), 1); + } - if (len != num_topics) - { - ACE_ERROR_RETURN ((LM_ERROR, - "(%P|%t) monitor: ERROR: read %d BIT topic data, expected %d topics.\n", len, num_topics), - 1); + CORBA::ULong num_topics_with_data = 0; + for (CORBA::ULong i = 0; i < len; ++i) { + if (ACE_OS::strcmp(topicdata[i].name.in(), topic_name) != 0) { + ACE_ERROR_RETURN((LM_ERROR, + "(%P|%t) monitor: ERROR: got topic name \"%C\", expected topic name \"%C\"\n", + topicdata[i].name.in(), topic_name), 1); + } + if (ACE_OS::strcmp(topicdata[i].type_name.in(), topic_type_name) != 0) { + ACE_ERROR_RETURN((LM_ERROR, + "(%P|%t) monitor: got topic type name \"%C\", expected topic type name \"%C\"\n", + topicdata[i].type_name.in(), topic_type_name), 1); } - CORBA::ULong num_topics_with_data = 0; - for (CORBA::ULong i = 0; i < len; ++i) - { - if (ACE_OS::strcmp (topicdata[i].name.in (), topic_name) != 0) - { - ACE_ERROR_RETURN ((LM_ERROR, - "(%P|%t) monitor: ERROR: got topic name \"%C\", expected topic name \"%C\"\n", - topicdata[i].name.in (), topic_name), - 1); - } - if (ACE_OS::strcmp (topicdata[i].type_name.in (), topic_type_name) != 0) - { - ACE_ERROR_RETURN ((LM_ERROR, - "(%P|%t) monitor: got topic type name \"%C\", expected topic type name \"%C\"\n", - topicdata[i].type_name.in (), topic_type_name), - 1); - } - - ACE_DEBUG((LM_DEBUG, "(%P|%t) monitor: Topic: key = %d, %x, %x, name = %C, " - "type_name=%C\n", - topicdata[i].key.value[0], topicdata[i].key.value[1], topicdata[i].key.value[2], - topicdata[i].name.in (), topicdata[i].type_name.in ())); + ACE_DEBUG((LM_DEBUG, "(%P|%t) monitor: Topic: key = %d, %x, %x, name = %C, " + "type_name=%C\n", + topicdata[i].key.value[0], topicdata[i].key.value[1], topicdata[i].key.value[2], + topicdata[i].name.in(), topicdata[i].type_name.in())); - CORBA::ULong topic_data_len = static_cast(ACE_OS::strlen (CUR_TOPIC_DATA)); + CORBA::ULong topic_data_len = static_cast(ACE_OS::strlen(CUR_TOPIC_DATA)); - if ((topicdata[i].topic_data.value.length () == topic_data_len) - && (ACE_OS::strncmp (reinterpret_cast (topicdata[i].topic_data.value.get_buffer()), - CUR_TOPIC_DATA, - topic_data_len) == 0)) - { - ++ num_topics_with_data; - } + if (topicdata[i].topic_data.value.length() == topic_data_len && + ACE_OS::strncmp(reinterpret_cast(topicdata[i].topic_data.value.get_buffer()), + CUR_TOPIC_DATA, + topic_data_len) == 0) { + ++num_topics_with_data; } + } - if (num_topics_with_data == num_topics) - { - ACE_DEBUG((LM_DEBUG, "(%P|%t) monitor: Topic changeable qos test PASSED.\n")); - } - else - { - ACE_ERROR_RETURN ((LM_ERROR, - "(%P|%t) monitor: Topic changeable qos test FAILED.\n"), - 1); - } + if (num_topics_with_data == num_topics) { + ACE_DEBUG((LM_DEBUG, "(%P|%t) monitor: Topic changeable qos test PASSED.\n")); + } else { + ACE_ERROR_RETURN((LM_ERROR, "(%P|%t) monitor: Topic changeable qos test FAILED.\n"), + 1); + } #endif - participant->delete_contained_entities(); - dpf->delete_participant(participant.in ()); - TheServiceParticipant->shutdown (); - - if (CUR_PART_USER_DATA == PART_USER_DATA) - { - // Create synch file. - ACE_TString path = synch_dir + synch_fname; - ACE_DEBUG((LM_DEBUG, "(%P|%t) monitor1 creating %C\n", path.c_str())); - FILE* fp = ACE_OS::fopen (path.c_str (), ACE_TEXT("w")); - if (fp != 0) - { - // Write one byte so that PerlACE::waitforfile_timed works. - const char c = 'c'; - ACE_OS::fwrite(&c, 1, 1, fp); - ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) monitor1 is done\n"))); - ACE_OS::fclose (fp); - } - } + participant->delete_contained_entities(); + dpf->delete_participant(participant.in()); + TheServiceParticipant->shutdown(); - ACE_DEBUG ((LM_DEBUG, "(%P|%t) monitor main done\n")); + const ACE_TCHAR* synch_fname = 0; + if (CUR_PART_USER_DATA == PART_USER_DATA) { + synch_fname = mon1_fname; + } else if (CUR_PART_USER_DATA == UPDATED_PART_USER_DATA) { + synch_fname = mon2_fname; } - catch (CORBA::Exception& e) { - e._tao_print_exception( - "monitor: SUB: Exception caught in main ():", stderr); + // Create synch file. + ACE_TString path = synch_dir + synch_fname; + ACE_DEBUG((LM_DEBUG, "(%P|%t) %C creating %C\n", + CUR_PART_USER_DATA == PART_USER_DATA ? "monitor1" : "monitor2", + path.c_str())); + FILE* fp = ACE_OS::fopen(path.c_str(), ACE_TEXT("w")); + if (fp != 0) { + // Write one byte so that PerlACE::waitforfile_timed works. + const char c = 'c'; + ACE_OS::fwrite(&c, 1, 1, fp); + ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) %C is done\n"), + CUR_PART_USER_DATA == PART_USER_DATA ? "monitor1" : "monitor2")); + ACE_OS::fclose(fp); + } + + ACE_DEBUG((LM_DEBUG, "(%P|%t) monitor main done\n")); + + } catch (CORBA::Exception& e) { + e._tao_print_exception("monitor: SUB: Exception caught in main():", stderr); return 1; } diff --git a/tests/DCPS/BuiltInTopicTest/publisher.cpp b/tests/DCPS/BuiltInTopicTest/publisher.cpp index b614ba22a2a..49183fb1574 100644 --- a/tests/DCPS/BuiltInTopicTest/publisher.cpp +++ b/tests/DCPS/BuiltInTopicTest/publisher.cpp @@ -1,64 +1,32 @@ -// -*- C++ -*- -// ============================================================================ -/** - * @file publisher.cpp - * - * - * - */ -// ============================================================================ - -#include "MessengerTypeSupportImpl.h" #include "Writer.h" -#include -#include -#include - -#include "dds/DCPS/StaticIncludes.h" +#include "MessengerTypeSupportImpl.h" +#include "common.h" -#include -#include "ace/Get_Opt.h" -#include "tests/Utils/ExceptionStreams.h" +#include using namespace std; -char PART_USER_DATA[] = "Initial DomainParticipant UserData"; -char DW_USER_DATA[] = "Initial DataWriter UserData"; -char TOPIC_DATA[] = "Initial Topic TopicData"; -char GROUP_DATA[] = "Initial GroupData"; -char UPDATED_PART_USER_DATA[] = "Updated DomainParticipant UserData"; -char UPDATED_DW_USER_DATA[] = "Updated DataWriter UserData"; -char UPDATED_TOPIC_DATA[] = "Updated Topic TopicData"; -char UPDATED_GROUP_DATA[] = "Updated GroupData"; - -ACE_TString synch_dir; -ACE_TCHAR synch_fname[] = ACE_TEXT("monitor1_done"); - -int num_messages = 10; - -int parse_args (int argc, ACE_TCHAR *argv[]) +int parse_args(int argc, ACE_TCHAR *argv[]) { - ACE_Get_Opt get_opts (argc, argv, "T:n:"); + ACE_Get_Opt get_opts(argc, argv, "T:n:"); int c; - while ((c = get_opts ()) != -1) - { - switch (c) - { + while ((c = get_opts()) != -1) { + switch (c) { case 'T': - synch_dir = get_opts.opt_arg (); + synch_dir = get_opts.opt_arg(); break; case 'n': - num_messages = ACE_OS::atoi (get_opts.opt_arg ()); + num_messages = ACE_OS::atoi(get_opts.opt_arg()); break; case '?': default: - ACE_ERROR_RETURN ((LM_ERROR, + ACE_ERROR_RETURN((LM_ERROR, "usage: %s " "-n " "-T " "\n", - argv [0]), + argv[0]), -1); } } @@ -66,183 +34,180 @@ int parse_args (int argc, ACE_TCHAR *argv[]) return 0; } -int ACE_TMAIN(int argc, ACE_TCHAR *argv[]) { - try - { - ACE_DEBUG ((LM_DEBUG, "(%P|%t) publisher main\n")); - - DDS::DomainParticipantFactory_var dpf = - TheParticipantFactoryWithArgs(argc, argv); - - if (parse_args (argc, argv) == -1) { - return -1; - } - - DDS::DomainParticipantQos partQos; - dpf->get_default_participant_qos(partQos); - - // set up user data in DP qos - CORBA::ULong part_user_data_len - = static_cast(ACE_OS::strlen (PART_USER_DATA)); - partQos.user_data.value.length (part_user_data_len); - partQos.user_data.value.replace (part_user_data_len, - part_user_data_len, - reinterpret_cast(PART_USER_DATA)); - - DDS::DomainParticipant_var participant = - dpf->create_participant(111, - partQos, - DDS::DomainParticipantListener::_nil(), - ::OpenDDS::DCPS::DEFAULT_STATUS_MASK); - if (CORBA::is_nil (participant.in ())) { - ACE_ERROR((LM_ERROR, ACE_TEXT( - "(%P|%t) publisher: create_participant failed."))); - return 1; - } - - ::Messenger::MessageTypeSupport_var ts = new ::Messenger::MessageTypeSupportImpl(); - - if (DDS::RETCODE_OK != ts->register_type(participant.in (), "Messenger")) { - ACE_ERROR((LM_ERROR, ACE_TEXT( - "(%P|%t) publisher: register_type failed.\n"))); - exit(1); - } - - DDS::TopicQos topic_qos; - participant->get_default_topic_qos(topic_qos); - - // set up topic data in topic qos - CORBA::ULong topic_data_len = static_cast(ACE_OS::strlen (TOPIC_DATA)); - topic_qos.topic_data.value.length (topic_data_len); - topic_qos.topic_data.value.replace (topic_data_len, topic_data_len, reinterpret_cast(TOPIC_DATA)); - - DDS::Topic_var topic = - participant->create_topic ("Movie Discussion List", - "Messenger", - topic_qos, - DDS::TopicListener::_nil(), - ::OpenDDS::DCPS::DEFAULT_STATUS_MASK); - if (CORBA::is_nil (topic.in ())) { - ACE_ERROR((LM_ERROR, ACE_TEXT( - "(%P|%t) publisher: create_topic failed.\n"))); - exit(1); - } - - DDS::PublisherQos pub_qos; - participant->get_default_publisher_qos (pub_qos); - - // set up group data in group qos - CORBA::ULong group_data_len = static_cast (ACE_OS::strlen (GROUP_DATA)); - pub_qos.group_data.value.length (group_data_len); - pub_qos.group_data.value.replace (group_data_len, group_data_len, reinterpret_cast(GROUP_DATA)); - - DDS::Publisher_var pub = - participant->create_publisher(pub_qos, - DDS::PublisherListener::_nil(), - ::OpenDDS::DCPS::DEFAULT_STATUS_MASK); - if (CORBA::is_nil (pub.in ())) { - ACE_ERROR((LM_ERROR, ACE_TEXT( - "(%P|%t) publisher: create_publisher failed.\n"))); - exit(1); - } - - // Create the datawriter - DDS::DataWriterQos dw_qos; - pub->get_default_datawriter_qos (dw_qos); - dw_qos.durability.kind = DDS::TRANSIENT_LOCAL_DURABILITY_QOS; - dw_qos.reliability.kind = ::DDS::RELIABLE_RELIABILITY_QOS; - dw_qos.resource_limits.max_samples_per_instance = 1000; - dw_qos.history.kind = ::DDS::KEEP_ALL_HISTORY_QOS; - dw_qos.representation.value.length(1); +int ACE_TMAIN(int argc, ACE_TCHAR *argv[]) +{ + try { + ACE_DEBUG((LM_DEBUG, "(%P|%t) publisher main\n")); + + DDS::DomainParticipantFactory_var dpf = TheParticipantFactoryWithArgs(argc, argv); + if (parse_args(argc, argv) == -1) { + return -1; + } + + DDS::DomainParticipantQos partQos; + dpf->get_default_participant_qos(partQos); + + // set up user data in DP qos + CORBA::ULong part_user_data_len + = static_cast(ACE_OS::strlen(PART_USER_DATA)); + partQos.user_data.value.length(part_user_data_len); + partQos.user_data.value.replace(part_user_data_len, + part_user_data_len, + reinterpret_cast(PART_USER_DATA)); + + DDS::DomainParticipant_var participant = + dpf->create_participant(111, + partQos, + DDS::DomainParticipantListener::_nil(), + ::OpenDDS::DCPS::DEFAULT_STATUS_MASK); + if (!participant) { + ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) publisher: create_participant failed."))); + return 1; + } + + ::Messenger::MessageTypeSupport_var ts = new ::Messenger::MessageTypeSupportImpl(); + + if (DDS::RETCODE_OK != ts->register_type(participant.in(), topic_type_name)) { + ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) publisher: register_type failed.\n"))); + exit(1); + } + + DDS::TopicQos topic_qos; + participant->get_default_topic_qos(topic_qos); + + // set up topic data in topic qos + CORBA::ULong topic_data_len = static_cast(ACE_OS::strlen(TOPIC_DATA)); + topic_qos.topic_data.value.length(topic_data_len); + topic_qos.topic_data.value.replace(topic_data_len, topic_data_len, reinterpret_cast(TOPIC_DATA)); + + DDS::Topic_var topic = participant->create_topic(topic_name, + topic_type_name, + topic_qos, + DDS::TopicListener::_nil(), + ::OpenDDS::DCPS::DEFAULT_STATUS_MASK); + if (!topic) { + ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) publisher: create_topic failed.\n"))); + exit(1); + } + + DDS::PublisherQos pub_qos; + participant->get_default_publisher_qos(pub_qos); + + // set up group data in group qos + CORBA::ULong group_data_len = static_cast(ACE_OS::strlen(GROUP_DATA)); + pub_qos.group_data.value.length(group_data_len); + pub_qos.group_data.value.replace(group_data_len, group_data_len, reinterpret_cast(GROUP_DATA)); + + DDS::Publisher_var pub = participant->create_publisher(pub_qos, + DDS::PublisherListener::_nil(), + ::OpenDDS::DCPS::DEFAULT_STATUS_MASK); + if (!pub) { + ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) publisher: create_publisher failed.\n"))); + exit(1); + } + + // Create the datawriter + DDS::DataWriterQos dw_qos; + pub->get_default_datawriter_qos(dw_qos); + dw_qos.durability.kind = DDS::TRANSIENT_LOCAL_DURABILITY_QOS; + dw_qos.reliability.kind = ::DDS::RELIABLE_RELIABILITY_QOS; + dw_qos.resource_limits.max_samples_per_instance = 1000; + dw_qos.history.kind = ::DDS::KEEP_ALL_HISTORY_QOS; + dw_qos.representation.value.length(1); #if defined(OPENDDS_SAFETY_PROFILE) - dw_qos.representation.value[0] = DDS::XCDR2_DATA_REPRESENTATION; + dw_qos.representation.value[0] = DDS::XCDR2_DATA_REPRESENTATION; #else - dw_qos.representation.value[0] = OpenDDS::DCPS::UNALIGNED_CDR_DATA_REPRESENTATION; + dw_qos.representation.value[0] = OpenDDS::DCPS::UNALIGNED_CDR_DATA_REPRESENTATION; #endif - // set up user data in DW qos - CORBA::ULong dw_user_data_len = static_cast(ACE_OS::strlen (DW_USER_DATA)); - dw_qos.user_data.value.length (dw_user_data_len); - dw_qos.user_data.value.replace (dw_user_data_len, - dw_user_data_len, - reinterpret_cast(DW_USER_DATA)); - - DDS::DataWriter_var dw = - pub->create_datawriter(topic.in (), - dw_qos, - DDS::DataWriterListener::_nil(), - ::OpenDDS::DCPS::DEFAULT_STATUS_MASK); - if (CORBA::is_nil (dw.in ())) { - ACE_ERROR((LM_ERROR, ACE_TEXT( - "(%P|%t) publisher: create_datawriter failed.\n"))); - exit(1); - } - - // wait for Monitor 1 done - FILE* fp = ACE_OS::fopen ((synch_dir + synch_fname).c_str (), ACE_TEXT("r")); - int i = 0; - while (fp == 0 && i < 15) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT("(%P|%t) waiting monitor1 done ...\n"))); - ACE_OS::sleep (1); - ++ i; - fp = ACE_OS::fopen ((synch_dir + synch_fname).c_str (), ACE_TEXT("r")); - } - - if (fp != 0) - ACE_OS::fclose (fp); - - // Now change the changeable qos. The second monitor should get the updated qos from BIT. - - part_user_data_len = static_cast(ACE_OS::strlen (UPDATED_PART_USER_DATA)); - partQos.user_data.value.length (part_user_data_len); - partQos.user_data.value.replace (part_user_data_len, - part_user_data_len, - reinterpret_cast(UPDATED_PART_USER_DATA)); - participant->set_qos (partQos); - - dw_user_data_len = static_cast(ACE_OS::strlen (UPDATED_DW_USER_DATA)); - dw_qos.user_data.value.length (dw_user_data_len); - dw_qos.user_data.value.replace (dw_user_data_len, - dw_user_data_len, - reinterpret_cast(UPDATED_DW_USER_DATA)); - dw->set_qos (dw_qos); - - group_data_len = static_cast (ACE_OS::strlen (UPDATED_GROUP_DATA)); - pub_qos.group_data.value.length (group_data_len); - pub_qos.group_data.value.replace (group_data_len, - group_data_len, - reinterpret_cast(UPDATED_GROUP_DATA)); - pub->set_qos (pub_qos); - - topic_data_len = static_cast(ACE_OS::strlen (UPDATED_TOPIC_DATA)); - topic_qos.topic_data.value.length (topic_data_len); - topic_qos.topic_data.value.replace (topic_data_len, - topic_data_len, - reinterpret_cast(UPDATED_TOPIC_DATA)); - topic->set_qos (topic_qos); - - Writer* writer = new Writer(dw.in()); - - writer->start (); - while ( !writer->is_finished()) { - ACE_Time_Value small_time(0,250000); - ACE_OS::sleep (small_time); - } - - // Cleanup - writer->end (); - delete writer; - participant->delete_contained_entities(); - dpf->delete_participant(participant.in ()); - TheServiceParticipant->shutdown (); - - ACE_DEBUG ((LM_DEBUG, "(%P|%t) publisher main done\n")); - } - catch (CORBA::Exception& e) { - e._tao_print_exception("publisher: PUB: Exception caught in main.cpp:"); + // set up user data in DW qos + CORBA::ULong dw_user_data_len = static_cast(ACE_OS::strlen(DW_USER_DATA)); + dw_qos.user_data.value.length(dw_user_data_len); + dw_qos.user_data.value.replace(dw_user_data_len, + dw_user_data_len, + reinterpret_cast(DW_USER_DATA)); + + DDS::DataWriter_var dw = pub->create_datawriter(topic.in(), + dw_qos, + DDS::DataWriterListener::_nil(), + ::OpenDDS::DCPS::DEFAULT_STATUS_MASK); + if (!dw) { + ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) publisher: create_datawriter failed.\n"))); + exit(1); + } + + // wait for Monitor 1 done + FILE* fp = ACE_OS::fopen((synch_dir + mon1_fname).c_str(), ACE_TEXT("r")); + while (fp == 0) { + ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) waiting monitor1 done ...\n"))); + ACE_OS::sleep(1); + fp = ACE_OS::fopen((synch_dir + mon1_fname).c_str(), ACE_TEXT("r")); + } + if (fp != 0) { + ACE_OS::fclose(fp); + } + + // Now change the changeable qos. The second monitor should get the updated qos from BIT. + part_user_data_len = static_cast(ACE_OS::strlen(UPDATED_PART_USER_DATA)); + partQos.user_data.value.length(part_user_data_len); + partQos.user_data.value.replace(part_user_data_len, + part_user_data_len, + reinterpret_cast(UPDATED_PART_USER_DATA)); + participant->set_qos(partQos); + + dw_user_data_len = static_cast(ACE_OS::strlen(UPDATED_DW_USER_DATA)); + dw_qos.user_data.value.length(dw_user_data_len); + dw_qos.user_data.value.replace(dw_user_data_len, + dw_user_data_len, + reinterpret_cast(UPDATED_DW_USER_DATA)); + dw->set_qos(dw_qos); + + group_data_len = static_cast(ACE_OS::strlen(UPDATED_GROUP_DATA)); + pub_qos.group_data.value.length(group_data_len); + pub_qos.group_data.value.replace(group_data_len, + group_data_len, + reinterpret_cast(UPDATED_GROUP_DATA)); + pub->set_qos(pub_qos); + + topic_data_len = static_cast(ACE_OS::strlen(UPDATED_TOPIC_DATA)); + topic_qos.topic_data.value.length(topic_data_len); + topic_qos.topic_data.value.replace(topic_data_len, + topic_data_len, + reinterpret_cast(UPDATED_TOPIC_DATA)); + topic->set_qos(topic_qos); + + Writer* writer = new Writer(dw.in()); + + writer->start(); + while (!writer->is_finished()) { + ACE_Time_Value small_time(0,250000); + ACE_OS::sleep(small_time); + } + + // Cleanup + writer->end(); + delete writer; + + // Wait for monitor2 to finish + fp = ACE_OS::fopen((synch_dir + mon2_fname).c_str(), ACE_TEXT("r")); + while (fp == 0) { + ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) waiting monitor2 done ...\n"))); + ACE_OS::sleep(1); + fp = ACE_OS::fopen((synch_dir + mon2_fname).c_str(), ACE_TEXT("r")); + } + if (fp != 0) { + ACE_OS::fclose(fp); + } + + participant->delete_contained_entities(); + dpf->delete_participant(participant.in()); + TheServiceParticipant->shutdown(); + + ACE_DEBUG((LM_DEBUG, "(%P|%t) publisher main done\n")); + + } catch (CORBA::Exception& e) { + e._tao_print_exception("publisher: PUB: Exception caught in main():", stderr); exit(1); } diff --git a/tests/DCPS/BuiltInTopicTest/run_test.pl b/tests/DCPS/BuiltInTopicTest/run_test.pl index 62d1b8971d5..30f13ac873f 100755 --- a/tests/DCPS/BuiltInTopicTest/run_test.pl +++ b/tests/DCPS/BuiltInTopicTest/run_test.pl @@ -33,16 +33,13 @@ $test->process("monitor2", "monitor", "-u $rtps_mon $rtps_cfg"); $test->add_temporary_file("monitor1", "monitor1_done"); +$test->add_temporary_file("monitor2", "monitor2_done"); $test->start_process("monitor1", "-T"); - $test->start_process("publisher", "-T"); $test->start_process("subscriber", "-T"); - -sleep (15); - $test->start_process("monitor2", "-T"); -my $status = $test->finish(300, "monitor1"); +my $status = $test->finish(180, "monitor1"); exit $status; diff --git a/tests/DCPS/BuiltInTopicTest/subscriber.cpp b/tests/DCPS/BuiltInTopicTest/subscriber.cpp index 4cf00d77e43..87fedfee313 100644 --- a/tests/DCPS/BuiltInTopicTest/subscriber.cpp +++ b/tests/DCPS/BuiltInTopicTest/subscriber.cpp @@ -1,66 +1,32 @@ -// -*- C++ -*- -// ============================================================================ -/** - * @file subscriber.cpp - * - * - * - */ -// ============================================================================ - - #include "DataReaderListener.h" #include "MessengerTypeSupportImpl.h" -#include -#include -#include -#include "dds/DCPS/BuiltInTopicUtils.h" +#include "common.h" -#include "dds/DCPS/StaticIncludes.h" - -#include -#include "ace/Get_Opt.h" -#include "tests/Utils/ExceptionStreams.h" +#include using namespace std; -char PART_USER_DATA[] = "Initial DomainParticipant UserData"; -char DR_USER_DATA[] = "Initial DataReader UserData"; -char TOPIC_DATA[] = "Initial Topic TopicData"; -char GROUP_DATA[] = "Initial GroupData"; -char UPDATED_PART_USER_DATA[] = "Updated DomainParticipant UserData"; -char UPDATED_DR_USER_DATA[] = "Updated DataReader UserData"; -char UPDATED_TOPIC_DATA[] = "Updated Topic TopicData"; -char UPDATED_GROUP_DATA[] = "Updated GroupData"; - -ACE_TString synch_dir; -ACE_TCHAR synch_fname[] = ACE_TEXT("monitor1_done"); - -int num_messages = 10; - -int parse_args (int argc, ACE_TCHAR *argv[]) +int parse_args(int argc, ACE_TCHAR *argv[]) { - ACE_Get_Opt get_opts (argc, argv, "T:n:"); + ACE_Get_Opt get_opts(argc, argv, "T:n:"); int c; - while ((c = get_opts ()) != -1) - { - switch (c) - { + while ((c = get_opts()) != -1) { + switch (c) { case 'T': - synch_dir = get_opts.opt_arg (); + synch_dir = get_opts.opt_arg(); break; case 'n': - num_messages = ACE_OS::atoi (get_opts.opt_arg ()); + num_messages = ACE_OS::atoi(get_opts.opt_arg()); break; case '?': default: - ACE_ERROR_RETURN ((LM_ERROR, + ACE_ERROR_RETURN((LM_ERROR, "usage: %s " "-n " "-T " "\n", - argv [0]), + argv[0]), -1); } } @@ -71,213 +37,199 @@ int parse_args (int argc, ACE_TCHAR *argv[]) int ACE_TMAIN(int argc, ACE_TCHAR *argv[]) { int result = 0; - ACE_DEBUG ((LM_DEBUG, "(%P|%t) subscriber main\n")); - try - { - DDS::DomainParticipantFactory_var dpf; - DDS::DomainParticipant_var participant; - - dpf = TheParticipantFactoryWithArgs(argc, argv); - - if (parse_args (argc, argv) == -1) { - return -1; - } - - DDS::DomainParticipantQos partQos; - dpf->get_default_participant_qos(partQos); - - // set up user data in DP qos - CORBA::ULong part_user_data_len - = static_cast(ACE_OS::strlen (PART_USER_DATA)); - partQos.user_data.value.length (part_user_data_len); - partQos.user_data.value.replace (part_user_data_len, - part_user_data_len, - reinterpret_cast(PART_USER_DATA)); - - participant = dpf->create_participant(111, - partQos, - DDS::DomainParticipantListener::_nil(), - ::OpenDDS::DCPS::DEFAULT_STATUS_MASK); - if (CORBA::is_nil (participant.in ())) { - ACE_ERROR((LM_ERROR, ACE_TEXT( - "(%P|%t) subscriber: create_participant failed.\n"))); - return 1 ; - } - - ::Messenger::MessageTypeSupport_var mts = new ::Messenger::MessageTypeSupportImpl(); - - if (DDS::RETCODE_OK != mts->register_type(participant.in (), "Messenger")) { - ACE_ERROR((LM_ERROR, ACE_TEXT( - "(%P|%t) subscriber: Failed to register the MessageTypeTypeSupport.\n"))); - exit(1); - } - - DDS::TopicQos topic_qos; - participant->get_default_topic_qos(topic_qos); - - // set up topic data in topic qos - CORBA::ULong topic_data_len = static_cast(ACE_OS::strlen (TOPIC_DATA)); - topic_qos.topic_data.value.length (topic_data_len); - topic_qos.topic_data.value.replace (topic_data_len, topic_data_len, reinterpret_cast(TOPIC_DATA)); - - DDS::Topic_var topic = participant->create_topic("Movie Discussion List", - "Messenger", - topic_qos, - DDS::TopicListener::_nil(), - ::OpenDDS::DCPS::DEFAULT_STATUS_MASK); - if (CORBA::is_nil (topic.in ())) { - ACE_ERROR((LM_ERROR, ACE_TEXT( - "(%P|%t) subscriber: Failed to create_topic.\n"))); - exit(1); - } - - // Create the subscriber - - DDS::SubscriberQos sub_qos; - participant->get_default_subscriber_qos (sub_qos); - - // set up group data in subscriber qos - CORBA::ULong group_data_len = static_cast (ACE_OS::strlen (GROUP_DATA)); - sub_qos.group_data.value.length (group_data_len); - sub_qos.group_data.value.replace (group_data_len, group_data_len, reinterpret_cast(GROUP_DATA)); - - DDS::Subscriber_var sub = - participant->create_subscriber(sub_qos, - DDS::SubscriberListener::_nil(), - ::OpenDDS::DCPS::DEFAULT_STATUS_MASK); - if (CORBA::is_nil (sub.in ())) { - ACE_ERROR((LM_ERROR, ACE_TEXT( - "(%P|%t) subscriber: Failed to create_subscriber.\n"))); - exit(1); - } - - // activate the listener - DDS::DataReaderListener_var listener (new DataReaderListenerImpl); - DataReaderListenerImpl* listener_servant = - dynamic_cast(listener.in()); - if (!listener_servant) - { - ACE_ERROR_RETURN((LM_ERROR, - ACE_TEXT("(%P|%t) %N:%l main()") - ACE_TEXT(" ERROR: failed to obtain DataReaderListenerImpl!\n")), -1); - } - - DDS::Subscriber_var builtin = participant->get_builtin_subscriber(); - DDS::DataReader_var bitdr = - builtin->lookup_datareader(OpenDDS::DCPS::BUILT_IN_PUBLICATION_TOPIC); - listener_servant->set_builtin_datareader(bitdr.in()); - - if (CORBA::is_nil (listener.in ())) { - ACE_ERROR((LM_ERROR, ACE_TEXT( - "(%P|%t) subscriber: listener is nil.\n"))); - exit(1); - } - if (!listener_servant) { - ACE_ERROR_RETURN((LM_ERROR, - ACE_TEXT("(%P|%t) %N:%l main()") - ACE_TEXT(" ERROR: listener_servant is nil (dynamic_cast failed)!\n")), -1); - } - - // Create the Datareaders - DDS::DataReaderQos dr_qos; - sub->get_default_datareader_qos (dr_qos); - dr_qos.representation.value.length(1); + ACE_DEBUG((LM_DEBUG, "(%P|%t) subscriber main\n")); + try { + DDS::DomainParticipantFactory_var dpf; + DDS::DomainParticipant_var participant; + + dpf = TheParticipantFactoryWithArgs(argc, argv); + if (parse_args(argc, argv) == -1) { + return -1; + } + + DDS::DomainParticipantQos partQos; + dpf->get_default_participant_qos(partQos); + + // set up user data in DP qos + CORBA::ULong part_user_data_len = static_cast(ACE_OS::strlen(PART_USER_DATA)); + partQos.user_data.value.length(part_user_data_len); + partQos.user_data.value.replace(part_user_data_len, + part_user_data_len, + reinterpret_cast(PART_USER_DATA)); + + participant = dpf->create_participant(111, + partQos, + DDS::DomainParticipantListener::_nil(), + ::OpenDDS::DCPS::DEFAULT_STATUS_MASK); + if (!participant) { + ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) subscriber: create_participant failed.\n"))); + return 1; + } + + ::Messenger::MessageTypeSupport_var mts = new ::Messenger::MessageTypeSupportImpl(); + + if (DDS::RETCODE_OK != mts->register_type(participant.in(), topic_type_name)) { + ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) subscriber: Failed to register the MessageTypeTypeSupport.\n"))); + exit(1); + } + + DDS::TopicQos topic_qos; + participant->get_default_topic_qos(topic_qos); + + // set up topic data in topic qos + CORBA::ULong topic_data_len = static_cast(ACE_OS::strlen(TOPIC_DATA)); + topic_qos.topic_data.value.length(topic_data_len); + topic_qos.topic_data.value.replace(topic_data_len, topic_data_len, reinterpret_cast(TOPIC_DATA)); + + DDS::Topic_var topic = participant->create_topic(topic_name, + topic_type_name, + topic_qos, + DDS::TopicListener::_nil(), + ::OpenDDS::DCPS::DEFAULT_STATUS_MASK); + if (!topic) { + ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) subscriber: Failed to create_topic.\n"))); + exit(1); + } + + // Create the subscriber + DDS::SubscriberQos sub_qos; + participant->get_default_subscriber_qos(sub_qos); + + // set up group data in subscriber qos + CORBA::ULong group_data_len = static_cast(ACE_OS::strlen(GROUP_DATA)); + sub_qos.group_data.value.length(group_data_len); + sub_qos.group_data.value.replace(group_data_len, group_data_len, reinterpret_cast(GROUP_DATA)); + + DDS::Subscriber_var sub = participant->create_subscriber(sub_qos, + DDS::SubscriberListener::_nil(), + ::OpenDDS::DCPS::DEFAULT_STATUS_MASK); + if (!sub) { + ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) subscriber: Failed to create_subscriber.\n"))); + exit(1); + } + + // activate the listener + DDS::DataReaderListener_var listener(new DataReaderListenerImpl); + if (!listener) { + ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) subscriber: listener is nil.\n"))); + exit(1); + } + + DataReaderListenerImpl* listener_servant = dynamic_cast(listener.in()); + if (!listener_servant) { + ACE_ERROR_RETURN((LM_ERROR, + ACE_TEXT("(%P|%t) %N:%l main()") + ACE_TEXT(" ERROR: failed to obtain DataReaderListenerImpl!\n")), -1); + } + + DDS::Subscriber_var builtin = participant->get_builtin_subscriber(); + DDS::DataReader_var bitdr = builtin->lookup_datareader(OpenDDS::DCPS::BUILT_IN_PUBLICATION_TOPIC); + listener_servant->set_builtin_datareader(bitdr.in()); + + // Create the Datareaders + DDS::DataReaderQos dr_qos; + sub->get_default_datareader_qos(dr_qos); + dr_qos.representation.value.length(1); #if defined(OPENDDS_SAFETY_PROFILE) - dr_qos.representation.value[0] = DDS::XCDR2_DATA_REPRESENTATION; + dr_qos.representation.value[0] = DDS::XCDR2_DATA_REPRESENTATION; #else - dr_qos.representation.value[0] = OpenDDS::DCPS::UNALIGNED_CDR_DATA_REPRESENTATION; + dr_qos.representation.value[0] = OpenDDS::DCPS::UNALIGNED_CDR_DATA_REPRESENTATION; #endif - // set up user data in DR qos - CORBA::ULong dr_user_data_len = static_cast(ACE_OS::strlen (DR_USER_DATA)); - dr_qos.user_data.value.length (dr_user_data_len); - dr_qos.user_data.value.replace (dr_user_data_len, - dr_user_data_len, - reinterpret_cast(DR_USER_DATA)); - - DDS::DataReader_var dr = sub->create_datareader(topic.in (), - dr_qos, - listener.in (), - ::OpenDDS::DCPS::DEFAULT_STATUS_MASK); - if (CORBA::is_nil (dr.in ())) { - ACE_ERROR((LM_ERROR, ACE_TEXT( - "(%P|%t) subscriber: create_datareader failed.\n"))); - exit(1); - } - - // Wait for Monitor 1 done. - FILE* fp = ACE_OS::fopen ((synch_dir + synch_fname).c_str (), ACE_TEXT("r")); - int i = 0; - while (fp == 0 && i < 15) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT("(%P|%t) waiting monitor1 done ...\n"))); - ACE_OS::sleep (1); - ++i; - fp = ACE_OS::fopen ((synch_dir + synch_fname).c_str (), ACE_TEXT("r")); - } - if (fp) { - ACE_OS::fclose (fp); - } - - // Now change the changeable qos. The second monitor should get the updated qos from BIT. - part_user_data_len = static_cast(ACE_OS::strlen (UPDATED_PART_USER_DATA)); - partQos.user_data.value.length (part_user_data_len); - partQos.user_data.value.replace (part_user_data_len, - part_user_data_len, - reinterpret_cast(UPDATED_PART_USER_DATA)); - participant->set_qos (partQos); - - dr_user_data_len = static_cast(ACE_OS::strlen (UPDATED_DR_USER_DATA)); - dr_qos.user_data.value.length (dr_user_data_len); - dr_qos.user_data.value.replace (dr_user_data_len, - dr_user_data_len, - reinterpret_cast(UPDATED_DR_USER_DATA)); - dr->set_qos (dr_qos); - - group_data_len = static_cast (ACE_OS::strlen (UPDATED_GROUP_DATA)); - sub_qos.group_data.value.length (group_data_len); - sub_qos.group_data.value.replace (group_data_len, - group_data_len, - reinterpret_cast(UPDATED_GROUP_DATA)); - sub->set_qos (sub_qos); - - topic_data_len = static_cast(ACE_OS::strlen (UPDATED_TOPIC_DATA)); - topic_qos.topic_data.value.length (topic_data_len); - topic_qos.topic_data.value.replace (topic_data_len, - topic_data_len, - reinterpret_cast(UPDATED_TOPIC_DATA)); - topic->set_qos (topic_qos); - - while ( listener_servant->num_reads() < num_messages) { - ACE_OS::sleep (1); - } - - if (listener_servant->read_bit_instance()) { - ACE_ERROR((LM_ERROR, ACE_TEXT( - "(%P|%t) subscriber: Built in topic read failure.\n"))); - result = 1; - } - - // Detach DataReaderListener to prevent it being called during - // delete_contained_entities() - dr->set_listener(0, 0); - - if (!CORBA::is_nil (participant.in ())) { - participant->delete_contained_entities(); - } - if (!CORBA::is_nil (dpf.in ())) { - dpf->delete_participant(participant.in ()); - } - - TheServiceParticipant->shutdown (); - - ACE_DEBUG ((LM_DEBUG, "(%P|%t) subscriber main done\n")); + // set up user data in DR qos + CORBA::ULong dr_user_data_len = static_cast(ACE_OS::strlen(DR_USER_DATA)); + dr_qos.user_data.value.length(dr_user_data_len); + dr_qos.user_data.value.replace(dr_user_data_len, + dr_user_data_len, + reinterpret_cast(DR_USER_DATA)); + + DDS::DataReader_var dr = sub->create_datareader(topic.in(), + dr_qos, + listener.in(), + ::OpenDDS::DCPS::DEFAULT_STATUS_MASK); + if (!dr) { + ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) subscriber: create_datareader failed.\n"))); + exit(1); + } + + // Wait for Monitor 1 done. + FILE* fp = ACE_OS::fopen((synch_dir + mon1_fname).c_str(), ACE_TEXT("r")); + while (fp == 0) { + ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) waiting monitor1 done ...\n"))); + ACE_OS::sleep(1); + fp = ACE_OS::fopen((synch_dir + mon1_fname).c_str(), ACE_TEXT("r")); + } + if (fp) { + ACE_OS::fclose(fp); + } + + // Now change the changeable qos. The second monitor should get the updated qos from BIT. + part_user_data_len = static_cast(ACE_OS::strlen(UPDATED_PART_USER_DATA)); + partQos.user_data.value.length(part_user_data_len); + partQos.user_data.value.replace(part_user_data_len, + part_user_data_len, + reinterpret_cast(UPDATED_PART_USER_DATA)); + participant->set_qos(partQos); + + dr_user_data_len = static_cast(ACE_OS::strlen(UPDATED_DR_USER_DATA)); + dr_qos.user_data.value.length(dr_user_data_len); + dr_qos.user_data.value.replace(dr_user_data_len, + dr_user_data_len, + reinterpret_cast(UPDATED_DR_USER_DATA)); + dr->set_qos(dr_qos); + + group_data_len = static_cast(ACE_OS::strlen(UPDATED_GROUP_DATA)); + sub_qos.group_data.value.length(group_data_len); + sub_qos.group_data.value.replace(group_data_len, + group_data_len, + reinterpret_cast(UPDATED_GROUP_DATA)); + sub->set_qos(sub_qos); + + topic_data_len = static_cast(ACE_OS::strlen(UPDATED_TOPIC_DATA)); + topic_qos.topic_data.value.length(topic_data_len); + topic_qos.topic_data.value.replace(topic_data_len, + topic_data_len, + reinterpret_cast(UPDATED_TOPIC_DATA)); + topic->set_qos(topic_qos); + + while (listener_servant->num_reads() < num_messages) { + ACE_OS::sleep(1); + } + + if (!listener_servant->read_bit_instance()) { + ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) subscriber: Built in topic read failure.\n"))); + result = 1; } - catch (CORBA::Exception& e) { - e._tao_print_exception( - "subscriber: SUB: Exception caught in main ():", stderr); + + // Detach DataReaderListener to prevent it being called during + // delete_contained_entities() + dr->set_listener(0, 0); + + // Wait for monitor2 to finish + fp = ACE_OS::fopen((synch_dir + mon2_fname).c_str(), ACE_TEXT("r")); + while (fp == 0) { + ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) waiting monitor2 done ...\n"))); + ACE_OS::sleep(1); + fp = ACE_OS::fopen((synch_dir + mon2_fname).c_str(), ACE_TEXT("r")); + } + if (fp != 0) { + ACE_OS::fclose(fp); + } + + if (participant) { + participant->delete_contained_entities(); + } + if (dpf) { + dpf->delete_participant(participant.in()); + } + + TheServiceParticipant->shutdown(); + + ACE_DEBUG((LM_DEBUG, "(%P|%t) subscriber main done\n")); + + } catch (CORBA::Exception& e) { + e._tao_print_exception("subscriber: SUB: Exception caught in main():", stderr); return 1; }