From 1c13ce2997034756cc8cb31e806ef3b7a227a933 Mon Sep 17 00:00:00 2001 From: Ethan Blackwood Date: Mon, 3 Dec 2018 15:44:20 -0600 Subject: [PATCH 01/10] Fix crash on exit, update label on load, use RAII in thread --- .../Plugins/NetworkEvents/NetworkEvents.cpp | 31 +++++++++++-------- Source/Plugins/NetworkEvents/NetworkEvents.h | 5 ++- .../NetworkEvents/NetworkEventsEditor.cpp | 4 +++ .../NetworkEvents/NetworkEventsEditor.h | 1 + 4 files changed, 27 insertions(+), 14 deletions(-) diff --git a/Source/Plugins/NetworkEvents/NetworkEvents.cpp b/Source/Plugins/NetworkEvents/NetworkEvents.cpp index d61ddc987..fd51cae94 100644 --- a/Source/Plugins/NetworkEvents/NetworkEvents.cpp +++ b/Source/Plugins/NetworkEvents/NetworkEvents.cpp @@ -169,7 +169,6 @@ NetworkEvents::NetworkEvents() createZmqContext(); firstTime = true; - responder = nullptr; urlport = 5556; threadRunning = false; @@ -213,7 +212,6 @@ bool NetworkEvents::closesocket() if (threadRunning) { lock.enter(); - zmq_close (responder); zmq_ctx_destroy (zmqcontext); // this will cause the thread to exit zmqcontext = nullptr; lock.exit(); @@ -557,24 +555,24 @@ void NetworkEvents::opensocket() void NetworkEvents::run() { #ifdef ZEROMQ - responder = zmq_socket (zmqcontext, ZMQ_REP); + SocketPtr responder(zmq_socket(zmqcontext, ZMQ_REP), &closeZmqSocket); String url= String ("tcp://*:") + String (urlport); - int rc = zmq_bind (responder, url.toRawUTF8()); + int rc = zmq_bind (responder.get(), url.toRawUTF8()); if (rc != 0) { - // failed to open socket? + // failed to open or bind socket? std::cout << "Failed to open socket: " << zmq_strerror (zmq_errno()) << std::endl; return; } threadRunning = true; - unsigned char* buffer = new unsigned char[MAX_MESSAGE_LENGTH]; + HeapBlock buffer(MAX_MESSAGE_LENGTH); int result = -1; while (threadRunning) { - result = zmq_recv (responder, buffer, MAX_MESSAGE_LENGTH - 1, 0); // blocking + result = zmq_recv (responder.get(), buffer, MAX_MESSAGE_LENGTH - 1, 0); // blocking juce::int64 timestamp_software = timer.getHighResolutionTicks(); @@ -592,20 +590,17 @@ void NetworkEvents::run() // handle special messages String response = handleSpecialMessages (Msg); - zmq_send (responder, response.getCharPointer(), response.length(), 0); + zmq_send (responder.get(), response.getCharPointer(), response.length(), 0); } else { String zeroMessageError = "Recieved Zero Message?!?!?"; //std::cout << "Received Zero Message!" << std::endl; - zmq_send (responder, zeroMessageError.getCharPointer(), zeroMessageError.length(), 0); + zmq_send (responder.get(), zeroMessageError.getCharPointer(), zeroMessageError.length(), 0); } } - zmq_close (responder); - - delete[] buffer; threadRunning = false; return; @@ -656,7 +651,9 @@ void NetworkEvents::loadCustomParametersFromXml() { if (mainNode->hasTagName ("NETWORKEVENTS")) { - setNewListeningPort (mainNode->getIntAttribute("port")); + auto ed = static_cast(getEditor()); + if (!ed) { return; } + ed->setPortString(mainNode->getStringAttribute("port")); } } } @@ -674,6 +671,14 @@ void NetworkEvents::createZmqContext() } +void NetworkEvents::closeZmqSocket(void* socket) +{ +#ifdef ZEROMQ + zmq_close(socket); +#endif +} + + StringPairArray NetworkEvents::parseNetworkMessage (String msg) { StringArray splitted; diff --git a/Source/Plugins/NetworkEvents/NetworkEvents.h b/Source/Plugins/NetworkEvents/NetworkEvents.h index 1f92a2822..e11718f63 100644 --- a/Source/Plugins/NetworkEvents/NetworkEvents.h +++ b/Source/Plugins/NetworkEvents/NetworkEvents.h @@ -39,6 +39,7 @@ #include #include +#include class StringTS { @@ -123,13 +124,15 @@ class NetworkEvents : public GenericProcessor private: void createZmqContext(); + static void closeZmqSocket(void* socket); + typedef std::unique_ptr SocketPtr; + //* Split network message into name/value pairs (name1=val1 name2=val2 etc) */ StringPairArray parseNetworkMessage (String msg); StringTS createStringTS (String S, int64 t); static void* zmqcontext; - void* responder; float threshold; float bufferZone; diff --git a/Source/Plugins/NetworkEvents/NetworkEventsEditor.cpp b/Source/Plugins/NetworkEvents/NetworkEventsEditor.cpp index c096881c2..e36ca8e0e 100644 --- a/Source/Plugins/NetworkEvents/NetworkEventsEditor.cpp +++ b/Source/Plugins/NetworkEvents/NetworkEventsEditor.cpp @@ -115,6 +115,10 @@ void NetworkEventsEditor::setLabelColor(juce::Colour color) labelPort->setColour(Label::backgroundColourId, color); } +void NetworkEventsEditor::setPortString(const String& port) +{ + labelPort->setText(port, sendNotification); +} void NetworkEventsEditor::labelTextChanged(juce::Label *label) { diff --git a/Source/Plugins/NetworkEvents/NetworkEventsEditor.h b/Source/Plugins/NetworkEvents/NetworkEventsEditor.h index f96f16796..99dd91622 100644 --- a/Source/Plugins/NetworkEvents/NetworkEventsEditor.h +++ b/Source/Plugins/NetworkEvents/NetworkEventsEditor.h @@ -47,6 +47,7 @@ class NetworkEventsEditor : public GenericEditor,public Label::Listener void buttonEvent(Button* button); void labelTextChanged(juce::Label *); void setLabelColor(juce::Colour color); + void setPortString(const String& port); private: ScopedPointer restartConnection; From 877677ee28d6516b9c103c0675e44d6a54526af0 Mon Sep 17 00:00:00 2001 From: Ethan Blackwood Date: Mon, 3 Dec 2018 17:56:25 -0600 Subject: [PATCH 02/10] Revert to last good port if a connection fails --- .../Plugins/NetworkEvents/NetworkEvents.cpp | 61 +++++++++++++++---- Source/Plugins/NetworkEvents/NetworkEvents.h | 21 ++++++- .../NetworkEvents/NetworkEventsEditor.cpp | 2 +- 3 files changed, 70 insertions(+), 14 deletions(-) diff --git a/Source/Plugins/NetworkEvents/NetworkEvents.cpp b/Source/Plugins/NetworkEvents/NetworkEvents.cpp index fd51cae94..76a23ec81 100644 --- a/Source/Plugins/NetworkEvents/NetworkEvents.cpp +++ b/Source/Plugins/NetworkEvents/NetworkEvents.cpp @@ -163,9 +163,13 @@ NetworkEvents::NetworkEvents() , threshold (200.0) , bufferZone (5.0f) , state (false) + , connectionErr (var(false)) + , lastGoodPort (0) { setProcessorType (PROCESSOR_TYPE_SOURCE); + connectionErr.addListener(this); + createZmqContext(); firstTime = true; @@ -193,7 +197,15 @@ void NetworkEvents::setNewListeningPort (int port) #endif urlport = port; + connectionErr = false; opensocket(); + + // update editor + auto ed = static_cast(getEditor()); + if (ed) + { + ed->setPortString(String(port)); + } } @@ -204,6 +216,19 @@ NetworkEvents::~NetworkEvents() } +void NetworkEvents::valueChanged(Value& value) +{ + if (value.refersToSameSourceAs(connectionErr)) + { + if ((bool)value.getValue() && lastGoodPort > 0 && lastGoodPort != urlport) + { + // try to connect to the last good port + setNewListeningPort(lastGoodPort); + } + } +} + + bool NetworkEvents::closesocket() { std::cout << "Disabling network node" << std::endl; @@ -555,24 +580,28 @@ void NetworkEvents::opensocket() void NetworkEvents::run() { #ifdef ZEROMQ - SocketPtr responder(zmq_socket(zmqcontext, ZMQ_REP), &closeZmqSocket); + Responder responder(zmqcontext); String url= String ("tcp://*:") + String (urlport); - int rc = zmq_bind (responder.get(), url.toRawUTF8()); + int rc = zmq_bind (responder, url.toRawUTF8()); if (rc != 0) { // failed to open or bind socket? - std::cout << "Failed to open socket: " << zmq_strerror (zmq_errno()) << std::endl; + String msg = String("Network Events failed to open socket: ") + zmq_strerror(zmq_errno()); + std::cout << msg << std::endl; + CoreServices::sendStatusMessage(msg); + connectionErr = true; return; } threadRunning = true; + lastGoodPort = urlport; HeapBlock buffer(MAX_MESSAGE_LENGTH); int result = -1; while (threadRunning) { - result = zmq_recv (responder.get(), buffer, MAX_MESSAGE_LENGTH - 1, 0); // blocking + result = zmq_recv (responder, buffer, MAX_MESSAGE_LENGTH - 1, 0); // blocking juce::int64 timestamp_software = timer.getHighResolutionTicks(); @@ -590,19 +619,18 @@ void NetworkEvents::run() // handle special messages String response = handleSpecialMessages (Msg); - zmq_send (responder.get(), response.getCharPointer(), response.length(), 0); + zmq_send (responder, response.getCharPointer(), response.length(), 0); } else { String zeroMessageError = "Recieved Zero Message?!?!?"; //std::cout << "Received Zero Message!" << std::endl; - zmq_send (responder.get(), zeroMessageError.getCharPointer(), zeroMessageError.length(), 0); + zmq_send (responder, zeroMessageError.getCharPointer(), zeroMessageError.length(), 0); } } threadRunning = false; - return; #endif } @@ -651,9 +679,7 @@ void NetworkEvents::loadCustomParametersFromXml() { if (mainNode->hasTagName ("NETWORKEVENTS")) { - auto ed = static_cast(getEditor()); - if (!ed) { return; } - ed->setPortString(mainNode->getStringAttribute("port")); + setNewListeningPort(mainNode->getIntAttribute("port")); } } } @@ -671,7 +697,14 @@ void NetworkEvents::createZmqContext() } -void NetworkEvents::closeZmqSocket(void* socket) +NetworkEvents::Responder::Responder(void* context) +#ifdef ZEROMQ + : socket(zmq_socket(context, ZMQ_REP)) +#endif +{} + + +NetworkEvents::Responder::~Responder() { #ifdef ZEROMQ zmq_close(socket); @@ -679,6 +712,12 @@ void NetworkEvents::closeZmqSocket(void* socket) } +NetworkEvents::Responder::operator void*() +{ + return socket; +} + + StringPairArray NetworkEvents::parseNetworkMessage (String msg) { StringArray splitted; diff --git a/Source/Plugins/NetworkEvents/NetworkEvents.h b/Source/Plugins/NetworkEvents/NetworkEvents.h index e11718f63..a380deafe 100644 --- a/Source/Plugins/NetworkEvents/NetworkEvents.h +++ b/Source/Plugins/NetworkEvents/NetworkEvents.h @@ -70,6 +70,7 @@ class StringTS */ class NetworkEvents : public GenericProcessor , public Thread + , public Value::Listener { public: NetworkEvents(); @@ -116,6 +117,10 @@ class NetworkEvents : public GenericProcessor void postTimestamppedStringToMidiBuffer (StringTS s); void setNewListeningPort (int port); + // to monitor responderStatus and connect to last good port if necessary + void valueChanged(Value& value) override; + + int urlport; String socketStatus; std::atomic threadRunning; @@ -124,8 +129,20 @@ class NetworkEvents : public GenericProcessor private: void createZmqContext(); - static void closeZmqSocket(void* socket); - typedef std::unique_ptr SocketPtr; + // RAII wrapper for ZMQ socket + class Responder + { + public: + Responder(void* context); + ~Responder(); + operator void*(); + private: + void* socket; + }; + + // allow reconnecting to last good port if connection fails + Value connectionErr; + int lastGoodPort; //* Split network message into name/value pairs (name1=val1 name2=val2 etc) */ StringPairArray parseNetworkMessage (String msg); diff --git a/Source/Plugins/NetworkEvents/NetworkEventsEditor.cpp b/Source/Plugins/NetworkEvents/NetworkEventsEditor.cpp index e36ca8e0e..fb924666e 100644 --- a/Source/Plugins/NetworkEvents/NetworkEventsEditor.cpp +++ b/Source/Plugins/NetworkEvents/NetworkEventsEditor.cpp @@ -117,7 +117,7 @@ void NetworkEventsEditor::setLabelColor(juce::Colour color) void NetworkEventsEditor::setPortString(const String& port) { - labelPort->setText(port, sendNotification); + labelPort->setText(port, dontSendNotification); } void NetworkEventsEditor::labelTextChanged(juce::Label *label) From daedf2a9436e8a9db2f3992b7d0df35882252cbe Mon Sep 17 00:00:00 2001 From: Ethan Blackwood Date: Tue, 4 Dec 2018 18:23:22 -0600 Subject: [PATCH 03/10] Context local to NetworkEvents instance + other changes for memory safety --- .../Plugins/NetworkEvents/NetworkEvents.cpp | 431 ++++++++++++------ Source/Plugins/NetworkEvents/NetworkEvents.h | 92 ++-- .../NetworkEvents/NetworkEventsEditor.cpp | 10 +- 3 files changed, 358 insertions(+), 175 deletions(-) diff --git a/Source/Plugins/NetworkEvents/NetworkEvents.cpp b/Source/Plugins/NetworkEvents/NetworkEvents.cpp index 76a23ec81..2fae4496f 100644 --- a/Source/Plugins/NetworkEvents/NetworkEvents.cpp +++ b/Source/Plugins/NetworkEvents/NetworkEvents.cpp @@ -38,7 +38,6 @@ const int MAX_MESSAGE_LENGTH = 64000; StringTS::StringTS() { - str = nullptr; len= 0; timestamp = 0; } @@ -46,7 +45,7 @@ StringTS::StringTS() std::vector StringTS::splitString (char sep) { - String S ((const char*)str, len); + String S ((const char*)str.getData(), len); String curr; std::list ls; @@ -85,16 +84,15 @@ StringTS::StringTS (MidiMessage& event) len = bufferSize - 6 - 8; // -6 for initial event prefix, -8 for timestamp at the end memcpy (×tamp, dataptr + 6 + len, 8); // remember to skip first six bytes - str = new uint8[len]; + str.malloc(len); memcpy (str,dataptr + 6, len); } StringTS& StringTS::operator= (const StringTS& rhs) { - delete (str); len = rhs.len; - str = new uint8[len]; + str.malloc(len); memcpy (str,rhs.str,len); timestamp = rhs.timestamp; @@ -104,14 +102,14 @@ StringTS& StringTS::operator= (const StringTS& rhs) String StringTS::getString() { - return String ((const char*)str,len); + return String ((const char*)str.getData(),len); } StringTS::StringTS (String S) { Time t; - str = new uint8[S.length()]; + str.malloc(S.length()); memcpy (str, S.toRawUTF8(), S.length()); timestamp = t.getHighResolutionTicks(); @@ -121,7 +119,7 @@ StringTS::StringTS (String S) StringTS::StringTS (String S, int64 ts_software) { - str = new uint8[S.length()]; + str.malloc(S.length()); memcpy (str, S.toRawUTF8(), S.length()); timestamp = ts_software; @@ -131,7 +129,7 @@ StringTS::StringTS (String S, int64 ts_software) StringTS::StringTS (const StringTS& s) { - str = new uint8[s.len]; + str.malloc(s.len); memcpy (str, s.str, s.len); timestamp = s.timestamp; len = s.len; @@ -142,20 +140,13 @@ StringTS::StringTS (unsigned char* buf, int _len, int64 ts_software) : len (_len) ,timestamp (ts_software) { - str = new juce::uint8[len]; + str.malloc(len); for (int k = 0; k < len; ++k) str[k] = buf[k]; } -StringTS::~StringTS() -{ - delete str; -} - - /*********************************************/ -void* NetworkEvents::zmqcontext = nullptr; NetworkEvents::NetworkEvents() : GenericProcessor ("Network Events") @@ -163,95 +154,53 @@ NetworkEvents::NetworkEvents() , threshold (200.0) , bufferZone (5.0f) , state (false) - , connectionErr (var(false)) + , zmqcontext (new ZMQContext()) , lastGoodPort (0) { setProcessorType (PROCESSOR_TYPE_SOURCE); - connectionErr.addListener(this); - - createZmqContext(); - firstTime = true; urlport = 5556; - threadRunning = false; opensocket(); sendSampleCount = false; // disable updating the continuous buffer sample counts, // since this processor only sends events - shutdown = false; } -void NetworkEvents::setNewListeningPort (int port) +void NetworkEvents::setNewListeningPort (uint16 port) { - // first, close existing thread. closesocket(); - - // allow some time for thread to quit -#ifdef WIN32 - Sleep (300); -#else - usleep (300 * 1000); -#endif - - urlport = port; - connectionErr = false; + updatePort(port); opensocket(); - - // update editor - auto ed = static_cast(getEditor()); - if (ed) - { - ed->setPortString(String(port)); - } } NetworkEvents::~NetworkEvents() { - shutdown = true; - closesocket(); -} - - -void NetworkEvents::valueChanged(Value& value) -{ - if (value.refersToSameSourceAs(connectionErr)) - { - if ((bool)value.getValue() && lastGoodPort > 0 && lastGoodPort != urlport) - { - // try to connect to the last good port - setNewListeningPort(lastGoodPort); - } - } + closesocket(true); } -bool NetworkEvents::closesocket() +void NetworkEvents::closesocket(bool shutdown) { std::cout << "Disabling network node" << std::endl; #ifdef ZEROMQ - if (threadRunning) { - lock.enter(); - zmq_ctx_destroy (zmqcontext); // this will cause the thread to exit - zmqcontext = nullptr; - lock.exit(); - - if (!stopThread(500)) - { - std::cerr << "Network thread timeout. Forcing thread termination, system could be lefr in an unstable state" << std::endl; - } - - if (! shutdown) - createZmqContext();// and this will take care that processor graph doesn't attempt to delete the context again - + ScopedLock lock(contextLock); + // reassign to destroy the existing context, which will close the socket and exit the thread. + // must destroy the context to close the socket since the socket has a blocking call. + zmqcontext = shutdown ? nullptr : new ZMQContext(); } + + if (!stopThread(500)) + { + jassertfalse; + std::cerr << "Network thread timeout. Forcing thread termination, system could be left in an unstable state" << std::endl; + } #endif - return true; } @@ -332,7 +281,7 @@ void NetworkEvents::postTimestamppedStringToMidiBuffer (StringTS s) { MetaDataValueArray md; md.add(new MetaDataValue(MetaDataDescriptor::INT64, 1, &s.timestamp)); - TextEventPtr event = TextEvent::createTextEvent(messageChannel, CoreServices::getGlobalTimestamp(), String::fromUTF8(reinterpret_cast(s.str), s.len), md); + TextEventPtr event = TextEvent::createTextEvent(messageChannel, CoreServices::getGlobalTimestamp(), s.getString(), md); addEvent(messageChannel, event, 0); } @@ -558,7 +507,7 @@ void NetworkEvents::process (AudioSampleBuffer& buffer) { setTimestampAndSamples(CoreServices::getGlobalTimestamp(),0); - lock.enter(); + ScopedLock lock(queueLock); while (! networkMessagesQueue.empty()) { StringTS msg = networkMessagesQueue.front(); @@ -566,8 +515,6 @@ void NetworkEvents::process (AudioSampleBuffer& buffer) CoreServices::sendStatusMessage ( ("Network event received: " + msg.getString()).toRawUTF8()); networkMessagesQueue.pop(); } - - lock.exit(); } @@ -580,58 +527,114 @@ void NetworkEvents::opensocket() void NetworkEvents::run() { #ifdef ZEROMQ - Responder responder(zmqcontext); - String url= String ("tcp://*:") + String (urlport); - int rc = zmq_bind (responder, url.toRawUTF8()); - - if (rc != 0) + Responder responder; + bool success; + { + ScopedLock lock(contextLock); // zmqcontext should not become null within this block + if (zmqcontext == nullptr) + { + // must be shutting down + return; + } + success = responder.initialize(*zmqcontext, urlport, lastGoodPort); + } + + if (!success) { // failed to open or bind socket? - String msg = String("Network Events failed to open socket: ") + zmq_strerror(zmq_errno()); + String msg = String("Network Events failed to open socket: ") + zmq_strerror(responder.getErr()); std::cout << msg << std::endl; CoreServices::sendStatusMessage(msg); - connectionErr = true; return; } + + uint16 boundPort = responder.getBoundPort(); + if (urlport != boundPort) + { + // update urlport and the editor. this would happen if a new port couldn't be + // bound to or if 0 was entered, resulting in an ephemeral port being chosen. + + if (urlport == 0) + { + CoreServices::sendStatusMessage("Selecting Network Events port automatically"); + } + else if (boundPort == lastGoodPort) + { + CoreServices::sendStatusMessage("Could not bind to port " + String(urlport) + " (" + + zmq_strerror(responder.getErr()) + "); reverting to port " + String(lastGoodPort)); + } + else + { + jassertfalse; + } + + const MessageManagerLock mmLock; // to update the editor safely from a thread + updatePort(boundPort); + } - threadRunning = true; lastGoodPort = urlport; HeapBlock buffer(MAX_MESSAGE_LENGTH); int result = -1; - while (threadRunning) + while (true) { - result = zmq_recv (responder, buffer, MAX_MESSAGE_LENGTH - 1, 0); // blocking + result = responder.receive(buffer); // blocking juce::int64 timestamp_software = timer.getHighResolutionTicks(); - if (result < 0) // will only happen when responder dies. - break; + if (result < 0) + { + int err = responder.getErr(); + if (err == ETERM || err == ENOTSOCK) + { + // context has been terminated + return; + } + if (err == EINTR) + { + continue; + } + } StringTS Msg (buffer, result, timestamp_software); + String response; if (result > 0) { - lock.enter(); - networkMessagesQueue.push (Msg); - lock.exit(); + { + ScopedLock lock(queueLock); + networkMessagesQueue.push(Msg); + } //std::cout << "Received message!" << std::endl; // handle special messages - String response = handleSpecialMessages (Msg); - - zmq_send (responder, response.getCharPointer(), response.length(), 0); + response = handleSpecialMessages (Msg); } else { - String zeroMessageError = "Recieved Zero Message?!?!?"; + response = "Recieved Zero Message?!?!?"; //std::cout << "Received Zero Message!" << std::endl; - - zmq_send (responder, zeroMessageError.getCharPointer(), zeroMessageError.length(), 0); } + + bool retry; + do + { + retry = false; + if (responder.send(response) < 0) + { + int err = responder.getErr(); + if (err == ETERM || err == ENOTSOCK) + { + // context has been terminated + return; + } + if (err == EINTR) + { + retry = true; + } + } + } while (retry); } - threadRunning = false; - return; #endif } @@ -679,52 +682,20 @@ void NetworkEvents::loadCustomParametersFromXml() { if (mainNode->hasTagName ("NETWORKEVENTS")) { - setNewListeningPort(mainNode->getIntAttribute("port")); + setNewListeningPort(static_cast(mainNode->getIntAttribute("port"))); } } } } -void NetworkEvents::createZmqContext() -{ -#ifdef ZEROMQ - lock.enter(); - if (zmqcontext == nullptr) - zmqcontext = zmq_ctx_new(); //<-- this is only available in version 3+ - lock.exit(); -#endif -} - - -NetworkEvents::Responder::Responder(void* context) -#ifdef ZEROMQ - : socket(zmq_socket(context, ZMQ_REP)) -#endif -{} - - -NetworkEvents::Responder::~Responder() -{ -#ifdef ZEROMQ - zmq_close(socket); -#endif -} - - -NetworkEvents::Responder::operator void*() -{ - return socket; -} - - -StringPairArray NetworkEvents::parseNetworkMessage (String msg) +StringPairArray NetworkEvents::parseNetworkMessage(String msg) { StringArray splitted; - splitted.addTokens (msg, "=", ""); + splitted.addTokens(msg, "=", ""); StringPairArray dict = StringPairArray(); - String key = ""; + String key = ""; String value = ""; for (int i = 0; i < splitted.size() - 1; ++i) @@ -733,12 +704,12 @@ StringPairArray NetworkEvents::parseNetworkMessage (String msg) String s2 = splitted[i + 1]; /** Get key */ - if (! key.isEmpty()) + if (!key.isEmpty()) { - if (s1.contains (" ")) + if (s1.contains(" ")) { - int i1 = s1.lastIndexOf (" "); - key = s1.substring (i1 + 1); + int i1 = s1.lastIndexOf(" "); + key = s1.substring(i1 + 1); } else { @@ -753,16 +724,188 @@ StringPairArray NetworkEvents::parseNetworkMessage (String msg) /** Get value */ if (i < splitted.size() - 2) { - int i1 = s2.lastIndexOf (" "); - value = s2.substring (0, i1); + int i1 = s2.lastIndexOf(" "); + value = s2.substring(0, i1); } else { value = s2; } - dict.set (key, value); + dict.set(key, value); } return dict; } + + +void NetworkEvents::updatePort(uint16 port) +{ + urlport = port; + + auto ed = static_cast(getEditor()); + if (ed) + { + ed->setPortString(String(port)); + } +} + + +/*** ZMQContext ***/ + +NetworkEvents::ZMQContext::ZMQContext() +#ifdef ZEROMQ + : context(zmq_ctx_new()) +#endif +{} + + +NetworkEvents::ZMQContext::~ZMQContext() +{ +#ifdef ZEROMQ + while (zmq_ctx_term(context) == -1 && zmq_errno() == EINTR); +#endif +} + + +void* NetworkEvents::ZMQContext::makeReplySocket() +{ +#ifdef ZEROMQ + return zmq_socket(context, ZMQ_REP); +#else + return nullptr; +#endif +} + + +/*** Responder ***/ + +NetworkEvents::Responder::Responder() + : socket (nullptr) + , boundPort (0) + , lastErrno (0) + , initialized (false) +{} + + +NetworkEvents::Responder::~Responder() +{ +#ifdef ZEROMQ + if (socket) + { + zmq_close(socket); + } +#endif +} + + +bool NetworkEvents::Responder::initialize(ZMQContext& context, uint16 port, uint16 lastGoodPort) +{ + if (initialized) + { + jassertfalse; // should only be initialized once! + return false; + } + initialized = true; + +#ifdef ZEROMQ + socket = context.makeReplySocket(); + if (!socket) + { + lastErrno = zmq_errno(); + return false; + } + + String url("tcp://*:" + (port == 0 ? "*" : String(port))); + int rc = zmq_bind(socket, url.toRawUTF8()); + if (rc == -1) + { + lastErrno = zmq_errno(); + + // try again with last good port, if any + if (lastGoodPort != 0 && lastGoodPort != port) + { + port = lastGoodPort; + url = "tcp://*:" + String(port); + if (zmq_bind(socket, url.toRawUTF8()) == -1) + { + // don't set errno, since the error for the original port is more relevant + return false; + } + } + else + { + return false; + } + } + + // bound successfully! + + // if requested port was 0, find out which port was actually used + if (port == 0) + { + const size_t BUF_LEN = 32; + size_t len = BUF_LEN; + char endpoint[BUF_LEN]; + if (zmq_getsockopt(socket, ZMQ_LAST_ENDPOINT, endpoint, &len) == -1) + { + lastErrno = zmq_errno(); + return false; + } + + port = String(endpoint).getTrailingIntValue(); + jassert(port > 0); + } + + boundPort = port; +#endif + return true; +} + + +int NetworkEvents::Responder::getErr() +{ + int err = lastErrno; + lastErrno = 0; + return err; +} + + +uint16 NetworkEvents::Responder::getBoundPort() const +{ + return boundPort; +} + + +int NetworkEvents::Responder::receive(void* buf) +{ +#ifdef ZEROMQ + int res = zmq_recv(socket, buf, MAX_MESSAGE_LENGTH, 0); // blocking + if (res == -1) + { + lastErrno = zmq_errno(); + } + else + { + res = jmin(res, MAX_MESSAGE_LENGTH); + } + return res; +#else + return -1; +#endif +} + + +int NetworkEvents::Responder::send(StringRef response) +{ +#ifdef ZEROMQ + int res = zmq_send(socket, response, response.length(), 0); + if (res == -1) + { + lastErrno = zmq_errno(); + } + return res; +#else + return -1; +#endif +} diff --git a/Source/Plugins/NetworkEvents/NetworkEvents.h b/Source/Plugins/NetworkEvents/NetworkEvents.h index a380deafe..834a6a232 100644 --- a/Source/Plugins/NetworkEvents/NetworkEvents.h +++ b/Source/Plugins/NetworkEvents/NetworkEvents.h @@ -39,7 +39,6 @@ #include #include -#include class StringTS { @@ -50,14 +49,13 @@ class StringTS StringTS(String S, int64 ts_software); StringTS(const StringTS& s); StringTS(unsigned char* buf, int _len, int64 ts_software); - ~StringTS(); std::vector splitString(char sep); String getString(); StringTS& operator= (const StringTS& rhs); - juce::uint8* str; + HeapBlock str; int len; juce::int64 timestamp; }; @@ -70,7 +68,6 @@ class StringTS */ class NetworkEvents : public GenericProcessor , public Thread - , public Value::Listener { public: NetworkEvents(); @@ -112,50 +109,85 @@ class NetworkEvents : public GenericProcessor void simulateStopRecord(); void run(); void opensocket(); - bool closesocket(); + void closesocket(bool shutdown = false); void postTimestamppedStringToMidiBuffer (StringTS s); - void setNewListeningPort (int port); + void setNewListeningPort (uint16 port); - // to monitor responderStatus and connect to last good port if necessary - void valueChanged(Value& value) override; + uint16 urlport; +private: - int urlport; - String socketStatus; - std::atomic threadRunning; + //* Split network message into name/value pairs (name1=val1 name2=val2 etc) */ + StringPairArray parseNetworkMessage(String msg); + + // Set the urlport and reflect it on the editor + void updatePort(uint16 port); + // RAII wrapper for ZMQ context + class ZMQContext + { + public: + ZMQContext(); + ~ZMQContext(); + void* makeReplySocket(); + private: + void* context; + }; -private: - void createZmqContext(); - // RAII wrapper for ZMQ socket + // RAII wrapper for responder socket class Responder { - public: - Responder(void* context); + public: + Responder(); ~Responder(); - operator void*(); - private: - void* socket; - }; - // allow reconnecting to last good port if connection fails - Value connectionErr; - int lastGoodPort; + // creates socket from given context and tries to bind to port, then lastGoodPort (if nonzero). + // if port is 0, chooses an available ephemeral port. + // returns true on success. + bool initialize(ZMQContext& context, uint16 port, uint16 lastGoodPort); - //* Split network message into name/value pairs (name1=val1 name2=val2 etc) */ - StringPairArray parseNetworkMessage (String msg); + // returns the latest errno value and resets it to 0. + int getErr(); + + // returns the port if the socket was successfully bound to one. + // if not, or if the socket is invalid, returns 0. + uint16 getBoundPort() const; - StringTS createStringTS (String S, int64 t); + // receives message into buf (blocking call). + // returns the number of bytes actually received, or -1 if there is an error. + int receive(void* buf); - static void* zmqcontext; + // sends a message. returns the same as zmq_send. + int send(StringRef response); + + private: + void* socket; + uint16 boundPort; // 0 indicates not bound + int lastErrno; + bool initialized; + }; + + + //class Responder + //{ + //public: + // Responder(void* context); + // ~Responder(); + // operator void*() const; + //private: + // ScopedPointer socket; + // uint16 lastGoodPort; // 0 indicates none + //}; + + ScopedPointer zmqcontext; + uint16 lastGoodPort; float threshold; float bufferZone; bool state; - bool shutdown; bool firstTime; Time timer; @@ -163,7 +195,9 @@ class NetworkEvents : public GenericProcessor std::queue networkMessagesQueue; std::queue simulation; - CriticalSection lock; + CriticalSection queueLock; + CriticalSection contextLock; + int64 simulationStartTime; const EventChannel* messageChannel{ nullptr }; diff --git a/Source/Plugins/NetworkEvents/NetworkEventsEditor.cpp b/Source/Plugins/NetworkEvents/NetworkEventsEditor.cpp index fb924666e..7638870ab 100644 --- a/Source/Plugins/NetworkEvents/NetworkEventsEditor.cpp +++ b/Source/Plugins/NetworkEvents/NetworkEventsEditor.cpp @@ -124,10 +124,16 @@ void NetworkEventsEditor::labelTextChanged(juce::Label *label) { if (label == labelPort) { - Value val = label->getTextValue(); + int32 portInput = label->getText().getIntValue(); + if (portInput < 0 || portInput > (1 << 16) - 1) + { + CoreServices::sendStatusMessage("Warning: port out of range; selecting one automatically"); + portInput = 0; + } + auto port = static_cast(portInput); NetworkEvents *p= (NetworkEvents *)getProcessor(); - p->setNewListeningPort(val.getValue()); + p->setNewListeningPort(port); } } From 1a415044f7515563f50cdb348b903399cc36861e Mon Sep 17 00:00:00 2001 From: Ethan Blackwood Date: Thu, 6 Dec 2018 11:54:31 -0600 Subject: [PATCH 04/10] Simplify StringTS, go back to void* for context, etc. --- .../Plugins/NetworkEvents/NetworkEvents.cpp | 374 ++++++------------ Source/Plugins/NetworkEvents/NetworkEvents.h | 62 +-- 2 files changed, 133 insertions(+), 303 deletions(-) diff --git a/Source/Plugins/NetworkEvents/NetworkEvents.cpp b/Source/Plugins/NetworkEvents/NetworkEvents.cpp index 2fae4496f..81b29ecec 100644 --- a/Source/Plugins/NetworkEvents/NetworkEvents.cpp +++ b/Source/Plugins/NetworkEvents/NetworkEvents.cpp @@ -37,38 +37,55 @@ const int MAX_MESSAGE_LENGTH = 64000; StringTS::StringTS() + : timestamp(0) +{} + + +StringTS::StringTS(String S, int64 ts_software) + : str(S) + , timestamp(ts_software) +{} + + +StringTS::StringTS(MidiMessage& event) + : timestamp(EventBase::getTimestamp(event)) { - len= 0; - timestamp = 0; + if (Event::getEventType(event) != EventChannel::EventChannelTypes::TEXT) + { + return; // only handles text events + } + + const uint8* dataptr = event.getRawData(); + // relying on null terminator to get end of string... + str = String::fromUTF8(reinterpret_cast(dataptr + EVENT_BASE_SIZE)); } -std::vector StringTS::splitString (char sep) +std::vector StringTS::splitString (char sep) const { - String S ((const char*)str.getData(), len); String curr; std::list ls; - for (int k = 0; k < S.length(); ++k) + for (int k = 0; k < str.length(); ++k) { - if (S[k] != sep) + if (str[k] != sep) { - curr+=S[k]; + curr+=str[k]; } else { ls.push_back (curr); - while (S[k] == sep && k < S.length()) + while (str[k] == sep && k < str.length()) ++k; curr = ""; - if (S[k] != sep && k < S.length()) - curr += S[k]; + if (str[k] != sep && k < str.length()) + curr += str[k]; } } - if (S.length() > 0) + if (str.length() > 0) { - if (S[S.length() - 1] != sep) + if (str[str.length() - 1] != sep) ls.push_back (curr); } @@ -77,75 +94,6 @@ std::vector StringTS::splitString (char sep) } -StringTS::StringTS (MidiMessage& event) -{ - const uint8* dataptr = event.getRawData(); - const int bufferSize = event.getRawDataSize(); - len = bufferSize - 6 - 8; // -6 for initial event prefix, -8 for timestamp at the end - - memcpy (×tamp, dataptr + 6 + len, 8); // remember to skip first six bytes - str.malloc(len); - memcpy (str,dataptr + 6, len); -} - - -StringTS& StringTS::operator= (const StringTS& rhs) -{ - len = rhs.len; - str.malloc(len); - memcpy (str,rhs.str,len); - timestamp = rhs.timestamp; - - return *this; -} - - -String StringTS::getString() -{ - return String ((const char*)str.getData(),len); -} - - -StringTS::StringTS (String S) -{ - Time t; - str.malloc(S.length()); - memcpy (str, S.toRawUTF8(), S.length()); - timestamp = t.getHighResolutionTicks(); - - len = S.length(); -} - - -StringTS::StringTS (String S, int64 ts_software) -{ - str.malloc(S.length()); - memcpy (str, S.toRawUTF8(), S.length()); - timestamp = ts_software; - - len = S.length(); -} - - -StringTS::StringTS (const StringTS& s) -{ - str.malloc(s.len); - memcpy (str, s.str, s.len); - timestamp = s.timestamp; - len = s.len; -} - - -StringTS::StringTS (unsigned char* buf, int _len, int64 ts_software) - : len (_len) - ,timestamp (ts_software) -{ - str.malloc(len); - for (int k = 0; k < len; ++k) - str[k] = buf[k]; -} - - /*********************************************/ NetworkEvents::NetworkEvents() @@ -154,11 +102,13 @@ NetworkEvents::NetworkEvents() , threshold (200.0) , bufferZone (5.0f) , state (false) - , zmqcontext (new ZMQContext()) + , zmqcontext (nullptr) , lastGoodPort (0) { setProcessorType (PROCESSOR_TYPE_SOURCE); + createZmqContext(); + firstTime = true; urlport = 5556; @@ -188,18 +138,20 @@ void NetworkEvents::closesocket(bool shutdown) std::cout << "Disabling network node" << std::endl; #ifdef ZEROMQ + void* oldContext = zmqcontext; + zmqcontext = nullptr; // this will cause the thread to exit if socket hasn't been created + zmq_ctx_destroy(oldContext); // this will cause the thread to exit if socket has been created + + if (!stopThread(500)) { - ScopedLock lock(contextLock); - // reassign to destroy the existing context, which will close the socket and exit the thread. - // must destroy the context to close the socket since the socket has a blocking call. - zmqcontext = shutdown ? nullptr : new ZMQContext(); + jassertfalse; + std::cerr << "Network thread timeout. Forcing thread termination, system could be left in an unstable state" << std::endl; } - if (!stopThread(500)) - { - jassertfalse; - std::cerr << "Network thread timeout. Forcing thread termination, system could be left in an unstable state" << std::endl; - } + if (!shutdown) + { + createZmqContext(); // so another socket can be opened later + } #endif } @@ -263,7 +215,7 @@ void NetworkEvents::simulateDesignAndTrials () if (currenttime > S.timestamp) { // handle special messages - handleSpecialMessages (S); + handleSpecialMessages (S.str); postTimestamppedStringToMidiBuffer (S); //getUIComponent()->getLogWindow()->addLineToLog(S.getString()); @@ -281,7 +233,7 @@ void NetworkEvents::postTimestamppedStringToMidiBuffer (StringTS s) { MetaDataValueArray md; md.add(new MetaDataValue(MetaDataDescriptor::INT64, 1, &s.timestamp)); - TextEventPtr event = TextEvent::createTextEvent(messageChannel, CoreServices::getGlobalTimestamp(), s.getString(), md); + TextEventPtr event = TextEvent::createTextEvent(messageChannel, CoreServices::getGlobalTimestamp(), s.str, md); addEvent(messageChannel, event, 0); } @@ -341,7 +293,7 @@ void NetworkEvents::simulateSingleTrial() } -String NetworkEvents::handleSpecialMessages (StringTS msg) +String NetworkEvents::handleSpecialMessages (const String& s) { /* std::vector input = msg.splitString(' '); @@ -394,12 +346,8 @@ String NetworkEvents::handleSpecialMessages (StringTS msg) */ - /** Start/stop data acquisition */ - String s = msg.getString(); - /** Command is first substring */ - StringArray inputs = StringArray::fromTokens (s, " "); - String cmd = String (inputs[0]); + String cmd = s.initialSectionNotContaining(" "); const MessageManagerLock mmLock; if (cmd.compareIgnoreCase ("StartAcquisition") == 0) @@ -420,8 +368,7 @@ String NetworkEvents::handleSpecialMessages (StringTS msg) } else if (String ("StartRecord").compareIgnoreCase (cmd) == 0) { - if (! CoreServices::getRecordingStatus() - && CoreServices::getAcquisitionStatus()) + if (! CoreServices::getRecordingStatus()) { /** First set optional parameters (name/value pairs)*/ if (s.contains ("=")) @@ -512,7 +459,6 @@ void NetworkEvents::process (AudioSampleBuffer& buffer) { StringTS msg = networkMessagesQueue.front(); postTimestamppedStringToMidiBuffer (msg); - CoreServices::sendStatusMessage ( ("Network event received: " + msg.getString()).toRawUTF8()); networkMessagesQueue.pop(); } } @@ -527,28 +473,22 @@ void NetworkEvents::opensocket() void NetworkEvents::run() { #ifdef ZEROMQ - Responder responder; - bool success; - { - ScopedLock lock(contextLock); // zmqcontext should not become null within this block - if (zmqcontext == nullptr) - { - // must be shutting down - return; - } - success = responder.initialize(*zmqcontext, urlport, lastGoodPort); - } + Responder responder(zmqcontext, urlport, lastGoodPort); - if (!success) + uint16 boundPort = responder.getBoundPort(); + if (boundPort == 0) { // failed to open or bind socket? - String msg = String("Network Events failed to open socket: ") + zmq_strerror(responder.getErr()); + int err = responder.getErr(); + String msg = String("Network Events failed to open socket: ") + zmq_strerror(err); std::cout << msg << std::endl; - CoreServices::sendStatusMessage(msg); + if (err != EFAULT && err != ETERM) // errors that could occur normally when context is being terminated + { + CoreServices::sendStatusMessage(msg); + } return; } - uint16 boundPort = responder.getBoundPort(); if (urlport != boundPort) { // update urlport and the editor. this would happen if a new port couldn't be @@ -573,68 +513,43 @@ void NetworkEvents::run() } lastGoodPort = urlport; - HeapBlock buffer(MAX_MESSAGE_LENGTH); + char buffer[MAX_MESSAGE_LENGTH]; int result = -1; while (true) { result = responder.receive(buffer); // blocking - juce::int64 timestamp_software = timer.getHighResolutionTicks(); + juce::int64 timestamp = CoreServices::getGlobalTimestamp(); if (result < 0) { - int err = responder.getErr(); - if (err == ETERM || err == ENOTSOCK) - { - // context has been terminated - return; - } - if (err == EINTR) - { - continue; - } + // context has been terminated + break; } - StringTS Msg (buffer, result, timestamp_software); - String response; - if (result > 0) - { - { - ScopedLock lock(queueLock); - networkMessagesQueue.push(Msg); - } + String msgStr = String::fromUTF8(buffer, result); - //std::cout << "Received message!" << std::endl; - // handle special messages - response = handleSpecialMessages (Msg); - } - else { - response = "Recieved Zero Message?!?!?"; - //std::cout << "Received Zero Message!" << std::endl; + StringTS Msg(msgStr, timestamp); + ScopedLock lock(queueLock); + networkMessagesQueue.push(Msg); } - bool retry; - do + CoreServices::sendStatusMessage("Network event received: " + msgStr); + + //std::cout << "Received message!" << std::endl; + // handle special messages + String response = handleSpecialMessages (msgStr); + + if (responder.send(response) < 0) { - retry = false; - if (responder.send(response) < 0) - { - int err = responder.getErr(); - if (err == ETERM || err == ENOTSOCK) - { - // context has been terminated - return; - } - if (err == EINTR) - { - retry = true; - } - } - } while (retry); + // context has been terminated + break; + } } + jassert(responder.getErr() == ETERM); #endif } @@ -689,50 +604,32 @@ void NetworkEvents::loadCustomParametersFromXml() } -StringPairArray NetworkEvents::parseNetworkMessage(String msg) +void NetworkEvents::createZmqContext() { - StringArray splitted; - splitted.addTokens(msg, "=", ""); - - StringPairArray dict = StringPairArray(); - String key = ""; - String value = ""; - - for (int i = 0; i < splitted.size() - 1; ++i) +#ifdef ZEROMQ + if (zmqcontext == nullptr) { - String s1 = splitted[i]; - String s2 = splitted[i + 1]; + zmqcontext = zmq_ctx_new(); //<-- this is only available in version 3+ + } +#endif +} - /** Get key */ - if (!key.isEmpty()) - { - if (s1.contains(" ")) - { - int i1 = s1.lastIndexOf(" "); - key = s1.substring(i1 + 1); - } - else - { - key = s1; - } - } - else - { - key = s1.trim(); - } - /** Get value */ - if (i < splitted.size() - 2) - { - int i1 = s2.lastIndexOf(" "); - value = s2.substring(0, i1); - } - else +StringPairArray NetworkEvents::parseNetworkMessage(StringRef msg) +{ + StringArray args = StringArray::fromTokens(msg, " ", "'\""); + args.removeEmptyStrings(); + + StringPairArray dict; + for (const String& arg : args) + { + int iEq = arg.indexOfChar('='); + if (iEq >= 0) { - value = s2; + String key = arg.substring(0, iEq); + String val = arg.substring(iEq + 1).unquoted(); + dict.set(key, val); } - - dict.set(key, value); } return dict; @@ -751,69 +648,20 @@ void NetworkEvents::updatePort(uint16 port) } -/*** ZMQContext ***/ - -NetworkEvents::ZMQContext::ZMQContext() -#ifdef ZEROMQ - : context(zmq_ctx_new()) -#endif -{} - - -NetworkEvents::ZMQContext::~ZMQContext() -{ -#ifdef ZEROMQ - while (zmq_ctx_term(context) == -1 && zmq_errno() == EINTR); -#endif -} - - -void* NetworkEvents::ZMQContext::makeReplySocket() -{ -#ifdef ZEROMQ - return zmq_socket(context, ZMQ_REP); -#else - return nullptr; -#endif -} - - /*** Responder ***/ -NetworkEvents::Responder::Responder() +NetworkEvents::Responder::Responder(void* context, uint16 port, uint16 backupPort) : socket (nullptr) , boundPort (0) , lastErrno (0) , initialized (false) -{} - - -NetworkEvents::Responder::~Responder() -{ -#ifdef ZEROMQ - if (socket) - { - zmq_close(socket); - } -#endif -} - - -bool NetworkEvents::Responder::initialize(ZMQContext& context, uint16 port, uint16 lastGoodPort) { - if (initialized) - { - jassertfalse; // should only be initialized once! - return false; - } - initialized = true; - #ifdef ZEROMQ - socket = context.makeReplySocket(); + socket = zmq_socket(context, ZMQ_REP); if (!socket) { lastErrno = zmq_errno(); - return false; + return; } String url("tcp://*:" + (port == 0 ? "*" : String(port))); @@ -823,19 +671,19 @@ bool NetworkEvents::Responder::initialize(ZMQContext& context, uint16 port, uint lastErrno = zmq_errno(); // try again with last good port, if any - if (lastGoodPort != 0 && lastGoodPort != port) + if (backupPort != 0 && backupPort != port) { - port = lastGoodPort; + port = backupPort; url = "tcp://*:" + String(port); if (zmq_bind(socket, url.toRawUTF8()) == -1) { // don't set errno, since the error for the original port is more relevant - return false; + return; } } else { - return false; + return; } } @@ -850,7 +698,7 @@ bool NetworkEvents::Responder::initialize(ZMQContext& context, uint16 port, uint if (zmq_getsockopt(socket, ZMQ_LAST_ENDPOINT, endpoint, &len) == -1) { lastErrno = zmq_errno(); - return false; + return; } port = String(endpoint).getTrailingIntValue(); @@ -859,7 +707,19 @@ bool NetworkEvents::Responder::initialize(ZMQContext& context, uint16 port, uint boundPort = port; #endif - return true; +} + + +NetworkEvents::Responder::~Responder() +{ +#ifdef ZEROMQ + if (socket) + { + int linger = 0; + zmq_setsockopt(socket, ZMQ_LINGER, &linger, sizeof(linger)); + zmq_close(socket); + } +#endif } diff --git a/Source/Plugins/NetworkEvents/NetworkEvents.h b/Source/Plugins/NetworkEvents/NetworkEvents.h index 834a6a232..7d27ab644 100644 --- a/Source/Plugins/NetworkEvents/NetworkEvents.h +++ b/Source/Plugins/NetworkEvents/NetworkEvents.h @@ -44,19 +44,12 @@ class StringTS { public: StringTS(); - StringTS(MidiMessage& event); - StringTS(String S); - StringTS(String S, int64 ts_software); - StringTS(const StringTS& s); - StringTS(unsigned char* buf, int _len, int64 ts_software); + StringTS(String S, int64 ts_software = CoreServices::getGlobalTimestamp()); + StringTS(MidiMessage& event); - std::vector splitString(char sep); - String getString(); + std::vector splitString(char sep) const; - StringTS& operator= (const StringTS& rhs); - - HeapBlock str; - int len; + String str; juce::int64 timestamp; }; @@ -99,7 +92,6 @@ class NetworkEvents : public GenericProcessor //int64 getExtrapolatedHardwareTimestamp (int64 softwareTS) const; - String handleSpecialMessages (StringTS msg); std::vector splitString (String S, char sep); void initSimulation(); @@ -111,42 +103,33 @@ class NetworkEvents : public GenericProcessor void opensocket(); void closesocket(bool shutdown = false); - void postTimestamppedStringToMidiBuffer (StringTS s); void setNewListeningPort (uint16 port); uint16 urlport; private: + void createZmqContext(); + + void postTimestamppedStringToMidiBuffer(StringTS s); + + String handleSpecialMessages(const String& s); //* Split network message into name/value pairs (name1=val1 name2=val2 etc) */ - StringPairArray parseNetworkMessage(String msg); + StringPairArray parseNetworkMessage(StringRef msg); // Set the urlport and reflect it on the editor void updatePort(uint16 port); - // RAII wrapper for ZMQ context - class ZMQContext - { - public: - ZMQContext(); - ~ZMQContext(); - void* makeReplySocket(); - private: - void* context; - }; - - // RAII wrapper for responder socket class Responder { - public: - Responder(); + public: + // creates socket from given context and tries to bind to port, then backupPort (if nonzero). + // if port is 0, chooses an available ephemeral port. + Responder(void* context, uint16 port, uint16 backupPort); ~Responder(); - // creates socket from given context and tries to bind to port, then lastGoodPort (if nonzero). - // if port is 0, chooses an available ephemeral port. - // returns true on success. - bool initialize(ZMQContext& context, uint16 port, uint16 lastGoodPort); + bool initialize(); // returns the latest errno value and resets it to 0. int getErr(); @@ -169,19 +152,7 @@ class NetworkEvents : public GenericProcessor bool initialized; }; - - //class Responder - //{ - //public: - // Responder(void* context); - // ~Responder(); - // operator void*() const; - //private: - // ScopedPointer socket; - // uint16 lastGoodPort; // 0 indicates none - //}; - - ScopedPointer zmqcontext; + void* zmqcontext; uint16 lastGoodPort; float threshold; @@ -196,7 +167,6 @@ class NetworkEvents : public GenericProcessor std::queue simulation; CriticalSection queueLock; - CriticalSection contextLock; int64 simulationStartTime; From 1b32519946be406f40fe70d8e893c3bbb8a80c0f Mon Sep 17 00:00:00 2001 From: Ethan Blackwood Date: Thu, 6 Dec 2018 12:02:52 -0600 Subject: [PATCH 05/10] Make StringTS private nested class --- .../Plugins/NetworkEvents/NetworkEvents.cpp | 121 +++++++++--------- Source/Plugins/NetworkEvents/NetworkEvents.h | 28 ++-- 2 files changed, 75 insertions(+), 74 deletions(-) diff --git a/Source/Plugins/NetworkEvents/NetworkEvents.cpp b/Source/Plugins/NetworkEvents/NetworkEvents.cpp index 81b29ecec..0585ef4db 100644 --- a/Source/Plugins/NetworkEvents/NetworkEvents.cpp +++ b/Source/Plugins/NetworkEvents/NetworkEvents.cpp @@ -35,67 +35,6 @@ const int MAX_MESSAGE_LENGTH = 64000; #include #endif - -StringTS::StringTS() - : timestamp(0) -{} - - -StringTS::StringTS(String S, int64 ts_software) - : str(S) - , timestamp(ts_software) -{} - - -StringTS::StringTS(MidiMessage& event) - : timestamp(EventBase::getTimestamp(event)) -{ - if (Event::getEventType(event) != EventChannel::EventChannelTypes::TEXT) - { - return; // only handles text events - } - - const uint8* dataptr = event.getRawData(); - // relying on null terminator to get end of string... - str = String::fromUTF8(reinterpret_cast(dataptr + EVENT_BASE_SIZE)); -} - - -std::vector StringTS::splitString (char sep) const -{ - String curr; - - std::list ls; - for (int k = 0; k < str.length(); ++k) - { - if (str[k] != sep) - { - curr+=str[k]; - } - else - { - ls.push_back (curr); - while (str[k] == sep && k < str.length()) - ++k; - - curr = ""; - if (str[k] != sep && k < str.length()) - curr += str[k]; - } - } - if (str.length() > 0) - { - if (str[str.length() - 1] != sep) - ls.push_back (curr); - } - - std::vector Svec (ls.begin(), ls.end()); - return Svec; -} - - -/*********************************************/ - NetworkEvents::NetworkEvents() : GenericProcessor ("Network Events") , Thread ("NetworkThread") @@ -648,6 +587,66 @@ void NetworkEvents::updatePort(uint16 port) } +/*** StringTS ***/ + +NetworkEvents::StringTS::StringTS() + : timestamp(0) +{} + + +NetworkEvents::StringTS::StringTS(String S, int64 ts_software) + : str(S) + , timestamp(ts_software) +{} + + +NetworkEvents::StringTS::StringTS(MidiMessage& event) + : timestamp(EventBase::getTimestamp(event)) +{ + if (Event::getEventType(event) != EventChannel::EventChannelTypes::TEXT) + { + return; // only handles text events + } + + const uint8* dataptr = event.getRawData(); + // relying on null terminator to get end of string... + str = String::fromUTF8(reinterpret_cast(dataptr + EVENT_BASE_SIZE)); +} + + +std::vector NetworkEvents::StringTS::splitString(char sep) const +{ + String curr; + + std::list ls; + for (int k = 0; k < str.length(); ++k) + { + if (str[k] != sep) + { + curr += str[k]; + } + else + { + ls.push_back(curr); + while (str[k] == sep && k < str.length()) + ++k; + + curr = ""; + if (str[k] != sep && k < str.length()) + curr += str[k]; + } + } + if (str.length() > 0) + { + if (str[str.length() - 1] != sep) + ls.push_back(curr); + } + + std::vector Svec(ls.begin(), ls.end()); + return Svec; +} + + /*** Responder ***/ NetworkEvents::Responder::Responder(void* context, uint16 port, uint16 backupPort) diff --git a/Source/Plugins/NetworkEvents/NetworkEvents.h b/Source/Plugins/NetworkEvents/NetworkEvents.h index 7d27ab644..a920938f9 100644 --- a/Source/Plugins/NetworkEvents/NetworkEvents.h +++ b/Source/Plugins/NetworkEvents/NetworkEvents.h @@ -37,22 +37,10 @@ #include +#include #include #include -class StringTS -{ -public: - StringTS(); - StringTS(String S, int64 ts_software = CoreServices::getGlobalTimestamp()); - StringTS(MidiMessage& event); - - std::vector splitString(char sep) const; - - String str; - juce::int64 timestamp; -}; - /** Sends incoming TCP/IP messages from 0MQ to the events buffer @@ -108,6 +96,20 @@ class NetworkEvents : public GenericProcessor uint16 urlport; private: + // combines a string and a timestamp + class StringTS + { + public: + StringTS(); + StringTS(String S, int64 ts_software = CoreServices::getGlobalTimestamp()); + StringTS(MidiMessage& event); + + std::vector splitString(char sep) const; + + String str; + juce::int64 timestamp; + }; + void createZmqContext(); void postTimestamppedStringToMidiBuffer(StringTS s); From 4d38f450e7b5eab924c7d4ecf50dfea656302b22 Mon Sep 17 00:00:00 2001 From: Ethan Blackwood Date: Fri, 7 Dec 2018 12:32:31 -0600 Subject: [PATCH 06/10] Let thread restart/port change without exiting --- .../Plugins/NetworkEvents/NetworkEvents.cpp | 526 ++++++++++-------- Source/Plugins/NetworkEvents/NetworkEvents.h | 93 +++- .../NetworkEvents/NetworkEventsEditor.cpp | 56 +- .../NetworkEvents/NetworkEventsEditor.h | 8 +- 4 files changed, 409 insertions(+), 274 deletions(-) diff --git a/Source/Plugins/NetworkEvents/NetworkEvents.cpp b/Source/Plugins/NetworkEvents/NetworkEvents.cpp index 0585ef4db..87b564bc6 100644 --- a/Source/Plugins/NetworkEvents/NetworkEvents.cpp +++ b/Source/Plugins/NetworkEvents/NetworkEvents.cpp @@ -35,63 +35,81 @@ const int MAX_MESSAGE_LENGTH = 64000; #include #endif +NetworkEvents::ZMQContext* NetworkEvents::sharedContext = nullptr; +CriticalSection NetworkEvents::sharedContextLock{}; + NetworkEvents::NetworkEvents() : GenericProcessor ("Network Events") , Thread ("NetworkThread") , threshold (200.0) , bufferZone (5.0f) , state (false) - , zmqcontext (nullptr) - , lastGoodPort (0) + , urlport (0) + , firstTime (true) + , restart (0) + , portString (getPortString()) { setProcessorType (PROCESSOR_TYPE_SOURCE); - createZmqContext(); - - firstTime = true; - urlport = 5556; + if (!setNewListeningPort(5556)) + { + // resort to choosing a port automatically + setNewListeningPort(0); + } - opensocket(); + portString.addListener(this); sendSampleCount = false; // disable updating the continuous buffer sample counts, // since this processor only sends events -} - -void NetworkEvents::setNewListeningPort (uint16 port) -{ - closesocket(); - updatePort(port); - opensocket(); + startThread(); } NetworkEvents::~NetworkEvents() { - closesocket(true); + if (!stopThread(1000)) + { + jassertfalse; // shouldn't block for more than 100 ms, something's wrong + std::cerr << "Network thread timeout. Forcing thread termination, system could be left in an unstable state" << std::endl; + } } -void NetworkEvents::closesocket(bool shutdown) +bool NetworkEvents::setNewListeningPort(uint16 port) { - std::cout << "Disabling network node" << std::endl; - -#ifdef ZEROMQ - void* oldContext = zmqcontext; - zmqcontext = nullptr; // this will cause the thread to exit if socket hasn't been created - zmq_ctx_destroy(oldContext); // this will cause the thread to exit if socket has been created + if (port == 0) + { + CoreServices::sendStatusMessage("NetworkEvents: Selecting port automatically"); + } - if (!stopThread(500)) + ScopedPointer newResponder = Responder::makeResponder(port); + if (newResponder == nullptr) { - jassertfalse; - std::cerr << "Network thread timeout. Forcing thread termination, system could be left in an unstable state" << std::endl; + return false; } - if (!shutdown) + ScopedLock nrLock(nextResponderLock); + nextResponder = newResponder; + return true; +} + + +String NetworkEvents::getPortString() const +{ + uint16 port = urlport; + if (port == 0) { - createZmqContext(); // so another socket can be opened later + return ""; } -#endif + + return String(port); +} + + +void NetworkEvents::restartConnection() +{ + restart = 1; } @@ -232,160 +250,141 @@ void NetworkEvents::simulateSingleTrial() } -String NetworkEvents::handleSpecialMessages (const String& s) +String NetworkEvents::handleSpecialMessages(const String& s) { /* std::vector input = msg.splitString(' '); if (input[0] == "StartRecord") { - getUIComponent()->getLogWindow()->addLineToLog("Remote triggered start recording"); - - if (input.size() > 1) - { - getUIComponent()->getLogWindow()->addLineToLog("Remote setting session name to "+input[1]); - // session name was also given. - getProcessorGraph()->getRecordNode()->setDirectoryName(input[1]); - } - const MessageManagerLock mmLock; - getControlPanel()->recordButton->setToggleState(true,true); - return String("OK"); + getUIComponent()->getLogWindow()->addLineToLog("Remote triggered start recording"); + + if (input.size() > 1) + { + getUIComponent()->getLogWindow()->addLineToLog("Remote setting session name to "+input[1]); + // session name was also given. + getProcessorGraph()->getRecordNode()->setDirectoryName(input[1]); + } + const MessageManagerLock mmLock; + getControlPanel()->recordButton->setToggleState(true,true); + return String("OK"); // getControlPanel()->placeMessageInQueue("StartRecord"); } if (input[0] == "SetSessionName") { - getProcessorGraph()->getRecordNode()->setDirectoryName(input[1]); + getProcessorGraph()->getRecordNode()->setDirectoryName(input[1]); } else if (input[0] == "StopRecord") { - const MessageManagerLock mmLock; - //getControlPanel()->placeMessageInQueue("StopRecord"); - getControlPanel()->recordButton->setToggleState(false,true); - return String("OK"); + const MessageManagerLock mmLock; + //getControlPanel()->placeMessageInQueue("StopRecord"); + getControlPanel()->recordButton->setToggleState(false,true); + return String("OK"); } else if (input[0] == "ProcessorCommunication") { - ProcessorGraph *g = getProcessorGraph(); - Array p = g->getListOfProcessors(); - for (int k=0;kgetName().toLowerCase() == input[1].toLowerCase()) - { - String Query=""; - for (int i=2;i p = g->getListOfProcessors(); + for (int k=0;kgetName().toLowerCase() == input[1].toLowerCase()) + { + String Query=""; + for (int i=2;iinterProcessorCommunication(Query); - } - } + return p[k]->interProcessorCommunication(Query); + } + } - return String("OK"); + return String("OK"); } */ /** Command is first substring */ String cmd = s.initialSectionNotContaining(" "); + String params = s.substring(cmd.length()).trim(); const MessageManagerLock mmLock; - if (cmd.compareIgnoreCase ("StartAcquisition") == 0) + if (cmd.equalsIgnoreCase("StartAcquisition")) { - if (! CoreServices::getAcquisitionStatus()) - { - CoreServices::setAcquisitionStatus (true); - } - return String ("StartedAcquisition"); + CoreServices::setAcquisitionStatus(true); + return "StartedAcquisition"; } - else if (cmd.compareIgnoreCase ("StopAcquisition") == 0) + else if (cmd.equalsIgnoreCase("StopAcquisition")) { - if (CoreServices::getAcquisitionStatus()) - { - CoreServices::setAcquisitionStatus (false); - } - return String ("StoppedAcquisition"); + CoreServices::setAcquisitionStatus(false); + return "StoppedAcquisition"; } - else if (String ("StartRecord").compareIgnoreCase (cmd) == 0) + else if (cmd.equalsIgnoreCase("StartRecord")) { - if (! CoreServices::getRecordingStatus()) + if (!CoreServices::getRecordingStatus()) { /** First set optional parameters (name/value pairs)*/ - if (s.contains ("=")) + StringPairArray dict = parseNetworkMessage(params); + + StringArray keys = dict.getAllKeys(); + for (int i = 0; i < keys.size(); ++i) { - String params = s.substring (cmd.length()); - StringPairArray dict = parseNetworkMessage (params); + String key = keys[i]; + String value = dict[key]; - StringArray keys = dict.getAllKeys(); - for (int i = 0; i < keys.size(); ++i) + if (key.equalsIgnoreCase("CreateNewDir")) { - String key = keys[i]; - String value = dict[key]; - - if (key.compareIgnoreCase ("CreateNewDir") == 0) + if (value.equalsIgnoreCase("1")) { - if (value.compareIgnoreCase ("1") == 0) - { - CoreServices::createNewRecordingDir(); - } - } - else if (key.compareIgnoreCase ("RecDir") == 0) - { - CoreServices::setRecordingDirectory (value); - } - else if (key.compareIgnoreCase ("PrependText") == 0) - { - CoreServices::setPrependTextToRecordingDir (value); - } - else if (key.compareIgnoreCase ("AppendText") == 0) - { - CoreServices::setAppendTextToRecordingDir (value); + CoreServices::createNewRecordingDir(); } } + else if (key.equalsIgnoreCase("RecDir")) + { + CoreServices::setRecordingDirectory(value); + } + else if (key.equalsIgnoreCase("PrependText")) + { + CoreServices::setPrependTextToRecordingDir(value); + } + else if (key.equalsIgnoreCase("AppendText")) + { + CoreServices::setAppendTextToRecordingDir(value); + } } /** Start recording */ - CoreServices::setRecordingStatus (true); - return String ("StartedRecording"); + CoreServices::setRecordingStatus(true); + return "StartedRecording"; } } - else if (String ("StopRecord").compareIgnoreCase (cmd) == 0) + else if (cmd.equalsIgnoreCase("StopRecord")) { - if (CoreServices::getRecordingStatus()) - { - CoreServices::setRecordingStatus (false); - return String ("StoppedRecording"); - } + CoreServices::setRecordingStatus(false); + return "StoppedRecording"; } - else if (cmd.compareIgnoreCase ("IsAcquiring") == 0) + else if (cmd.equalsIgnoreCase("IsAcquiring")) { - String status = CoreServices::getAcquisitionStatus() ? String ("1") : String ("0"); - return status; + return CoreServices::getAcquisitionStatus() ? String("1") : String("0"); } - else if (cmd.compareIgnoreCase ("IsRecording") == 0) + else if (cmd.equalsIgnoreCase("IsRecording")) { - String status = CoreServices::getRecordingStatus() ? String ("1") : String ("0"); - return status; + return CoreServices::getRecordingStatus() ? String("1") : String("0"); } - else if (cmd.compareIgnoreCase ("GetRecordingPath") == 0) + else if (cmd.equalsIgnoreCase("GetRecordingPath")) { File file = CoreServices::RecordNode::getRecordingPath(); - String msg (file.getFullPathName()); - return msg; + return file.getFullPathName(); } - else if (cmd.compareIgnoreCase ("GetRecordingNumber") == 0) + else if (cmd.equalsIgnoreCase("GetRecordingNumber")) { - String status; - status += (CoreServices::RecordNode::getRecordingNumber() + 1); - return status; + return String(CoreServices::RecordNode::getRecordingNumber() + 1); } - else if (cmd.compareIgnoreCase ("GetExperimentNumber") == 0) + else if (cmd.equalsIgnoreCase("GetExperimentNumber")) { - String status; - status += CoreServices::RecordNode::getExperimentNumber(); - return status; + return String(CoreServices::RecordNode::getExperimentNumber()); } - return String ("NotHandled"); + return "NotHandled"; } @@ -403,92 +402,82 @@ void NetworkEvents::process (AudioSampleBuffer& buffer) } -void NetworkEvents::opensocket() -{ - startThread(); -} - - void NetworkEvents::run() { #ifdef ZEROMQ - Responder responder(zmqcontext, urlport, lastGoodPort); - - uint16 boundPort = responder.getBoundPort(); - if (boundPort == 0) + ScopedPointer responder; + char buffer[MAX_MESSAGE_LENGTH]; + bool connected = false; + + while (!threadShouldExit()) { - // failed to open or bind socket? - int err = responder.getErr(); - String msg = String("Network Events failed to open socket: ") + zmq_strerror(err); - std::cout << msg << std::endl; - if (err != EFAULT && err != ETERM) // errors that could occur normally when context is being terminated + // reopen connection if necessary + if (restart.compareAndSetBool(0, 1)) { - CoreServices::sendStatusMessage(msg); - } - return; - } - - if (urlport != boundPort) - { - // update urlport and the editor. this would happen if a new port couldn't be - // bound to or if 0 was entered, resulting in an ephemeral port being chosen. + responder = nullptr; // destroy old one, which frees the port + responder = Responder::makeResponder(urlport); - if (urlport == 0) - { - CoreServices::sendStatusMessage("Selecting Network Events port automatically"); + if (responder != nullptr) + { + connected = true; + updatePort(responder->getBoundPort()); + } } - else if (boundPort == lastGoodPort) + + // switch to responder with new port if necessary + // unfortunately, atomic operations aren't available for smart pointers { - CoreServices::sendStatusMessage("Could not bind to port " + String(urlport) + " (" + - zmq_strerror(responder.getErr()) + "); reverting to port " + String(lastGoodPort)); + const ScopedTryLock nrLock(nextResponderLock); + if (nrLock.isLocked() && nextResponder != nullptr) + { + responder = nextResponder; + connected = true; + updatePort(responder->getBoundPort()); + } } - else + + // if we don't have a vaild (connected) socket, keep looping until we do + if (responder == nullptr) { - jassertfalse; + if (connected) + { + connected = false; + updatePort(0); + } + wait(100); + continue; } - const MessageManagerLock mmLock; // to update the editor safely from a thread - updatePort(boundPort); - } - - lastGoodPort = urlport; - char buffer[MAX_MESSAGE_LENGTH]; - int result = -1; - - while (true) - { - result = responder.receive(buffer); // blocking + int result = responder->receive(buffer); // times out after RECV_TIMEOUT_MS ms - juce::int64 timestamp = CoreServices::getGlobalTimestamp(); + juce::int64 timestamp_software = Time::getHighResolutionTicks(); - if (result < 0) + if (result == -1) { - // context has been terminated - break; + jassert(responder->getErr() == EAGAIN); // if not, figure out why! + continue; } + // received message. read string from the buffer. String msgStr = String::fromUTF8(buffer, result); { - StringTS Msg(msgStr, timestamp); + StringTS Msg(msgStr, timestamp_software); ScopedLock lock(queueLock); networkMessagesQueue.push(Msg); } CoreServices::sendStatusMessage("Network event received: " + msgStr); - //std::cout << "Received message!" << std::endl; - // handle special messages - String response = handleSpecialMessages (msgStr); - - if (responder.send(response) < 0) + + String response = handleSpecialMessages(msgStr); + + if (responder->send(response) == -1) { - // context has been terminated - break; + jassertfalse; // figure out why this is failing! } } - jassert(responder.getErr() == ETERM); #endif } @@ -536,24 +525,17 @@ void NetworkEvents::loadCustomParametersFromXml() { if (mainNode->hasTagName ("NETWORKEVENTS")) { - setNewListeningPort(static_cast(mainNode->getIntAttribute("port"))); + auto port = static_cast(mainNode->getIntAttribute("port")); + if (port != 0) + { + setNewListeningPort(port); + } } } } } -void NetworkEvents::createZmqContext() -{ -#ifdef ZEROMQ - if (zmqcontext == nullptr) - { - zmqcontext = zmq_ctx_new(); //<-- this is only available in version 3+ - } -#endif -} - - StringPairArray NetworkEvents::parseNetworkMessage(StringRef msg) { StringArray args = StringArray::fromTokens(msg, " ", "'\""); @@ -575,15 +557,29 @@ StringPairArray NetworkEvents::parseNetworkMessage(StringRef msg) } +void NetworkEvents::valueChanged(Value& value) +{ + if (value.refersToSameSourceAs(portString)) + { + auto ed = static_cast(getEditor()); + if (ed) + { + ed->setPortText(value.toString()); + } + } +} + + void NetworkEvents::updatePort(uint16 port) { urlport = port; + portString = getPortString(); +} - auto ed = static_cast(getEditor()); - if (ed) - { - ed->setPortString(String(port)); - } + +String NetworkEvents::getEndpoint(uint16 port) +{ + return "tcp://*:" + (port == 0 ? "*" : String(port)); } @@ -647,46 +643,78 @@ std::vector NetworkEvents::StringTS::splitString(char sep) const } +/*** ZMQContext ***/ + +NetworkEvents::ZMQContext::ZMQContext(const ScopedLock& lock) +#ifdef ZEROMQ + : context(zmq_ctx_new()) +#endif +{ + sharedContext = this; +} + +// ZMQContext is a ReferenceCountedObject with a pointer in each instance's +// socket pointer, so this only happens when the last instance is destroyed. +NetworkEvents::ZMQContext::~ZMQContext() +{ + ScopedLock lock(sharedContextLock); + sharedContext = nullptr; +#ifdef ZEROMQ + zmq_ctx_destroy(context); +#endif +} + +void* NetworkEvents::ZMQContext::createSocket() +{ +#ifdef ZEROMQ + jassert(context != nullptr); + return zmq_socket(context, ZMQ_REP); +#endif +} + + /*** Responder ***/ -NetworkEvents::Responder::Responder(void* context, uint16 port, uint16 backupPort) - : socket (nullptr) - , boundPort (0) - , lastErrno (0) - , initialized (false) +NetworkEvents::Responder::Responder(uint16 port) + : socket(nullptr) + , boundPort(0) + , lastErrno(0) { + { + ScopedLock lock(sharedContextLock); + if (sharedContext == nullptr) + { + // first one, create the context + context = new ZMQContext(lock); + } + else + { + // use already-created context + context = sharedContext; + } + } + #ifdef ZEROMQ - socket = zmq_socket(context, ZMQ_REP); + socket = context->createSocket(); if (!socket) { lastErrno = zmq_errno(); return; } - String url("tcp://*:" + (port == 0 ? "*" : String(port))); - int rc = zmq_bind(socket, url.toRawUTF8()); - if (rc == -1) + // set socket to timeout when receiving rather than blocking forever + if (zmq_setsockopt(socket, ZMQ_RCVTIMEO, &RECV_TIMEOUT_MS, sizeof(RECV_TIMEOUT_MS)) == -1) { lastErrno = zmq_errno(); - - // try again with last good port, if any - if (backupPort != 0 && backupPort != port) - { - port = backupPort; - url = "tcp://*:" + String(port); - if (zmq_bind(socket, url.toRawUTF8()) == -1) - { - // don't set errno, since the error for the original port is more relevant - return; - } - } - else - { - return; - } + return; } - // bound successfully! + // bind to endpoint + if (zmq_bind(socket, getEndpoint(port).toRawUTF8()) == -1) + { + lastErrno = zmq_errno(); + return; + } // if requested port was 0, find out which port was actually used if (port == 0) @@ -709,11 +737,37 @@ NetworkEvents::Responder::Responder(void* context, uint16 port, uint16 backupPor } +NetworkEvents::Responder* NetworkEvents::Responder::makeResponder(uint16 port) +{ + ScopedPointer socketPtr = new Responder(port); + uint16 boundPort = socketPtr->getBoundPort(); + if (boundPort == 0) + { + socketPtr->reportErr("Failed to connect to port " + String(port)); + return nullptr; + } + + if (port != 0 && boundPort != port) + { + jassertfalse; // huh? + return nullptr; + } + return socketPtr.release(); +} + + + NetworkEvents::Responder::~Responder() { #ifdef ZEROMQ if (socket) { + if (boundPort != 0) + { + // unbind/disconnect to free the port (critical for restarts) + zmq_unbind(socket, getEndpoint(boundPort).toRawUTF8()); + } + int linger = 0; zmq_setsockopt(socket, ZMQ_LINGER, &linger, sizeof(linger)); zmq_close(socket); @@ -722,14 +776,22 @@ NetworkEvents::Responder::~Responder() } -int NetworkEvents::Responder::getErr() +int NetworkEvents::Responder::getErr() const { - int err = lastErrno; - lastErrno = 0; - return err; + return lastErrno; } +void NetworkEvents::Responder::reportErr(const String& message) const +{ +#ifdef ZEROMQ + String msg = "NetworkEvents: " + message + " (" + zmq_strerror(lastErrno) + ")"; + std::cout << msg << std::endl; + CoreServices::sendStatusMessage(msg); +#endif +}; + + uint16 NetworkEvents::Responder::getBoundPort() const { return boundPort; @@ -739,7 +801,7 @@ uint16 NetworkEvents::Responder::getBoundPort() const int NetworkEvents::Responder::receive(void* buf) { #ifdef ZEROMQ - int res = zmq_recv(socket, buf, MAX_MESSAGE_LENGTH, 0); // blocking + int res = zmq_recv(socket, buf, MAX_MESSAGE_LENGTH, 0); if (res == -1) { lastErrno = zmq_errno(); diff --git a/Source/Plugins/NetworkEvents/NetworkEvents.h b/Source/Plugins/NetworkEvents/NetworkEvents.h index a920938f9..9ab90d4be 100644 --- a/Source/Plugins/NetworkEvents/NetworkEvents.h +++ b/Source/Plugins/NetworkEvents/NetworkEvents.h @@ -49,6 +49,7 @@ */ class NetworkEvents : public GenericProcessor , public Thread + , public Value::Listener { public: NetworkEvents(); @@ -88,12 +89,18 @@ class NetworkEvents : public GenericProcessor void simulateStartRecord(); void simulateStopRecord(); void run(); - void opensocket(); - void closesocket(bool shutdown = false); - void setNewListeningPort (uint16 port); + // passing 0 corresponds to wildcard ("*") and picks any available port + // returns true on success, false on failure + bool setNewListeningPort (uint16 port); - uint16 urlport; + // gets a string for the editor's port input to reflect current urlport + String getPortString() const; + + void restartConnection(); + + // to update the port string from the thread + void valueChanged(Value& value) override; private: // combines a string and a timestamp @@ -110,32 +117,34 @@ class NetworkEvents : public GenericProcessor juce::int64 timestamp; }; - void createZmqContext(); + class ZMQContext : public ReferenceCountedObject + { + public: + ZMQContext(const ScopedLock& lock); + ~ZMQContext() override; + void* createSocket(); - void postTimestamppedStringToMidiBuffer(StringTS s); - - String handleSpecialMessages(const String& s); + typedef ReferenceCountedObjectPtr Ptr; - //* Split network message into name/value pairs (name1=val1 name2=val2 etc) */ - StringPairArray parseNetworkMessage(StringRef msg); - - // Set the urlport and reflect it on the editor - void updatePort(uint16 port); + private: + void* context; + }; - // RAII wrapper for responder socket + // RAII wrapper for REP socket class Responder { public: - // creates socket from given context and tries to bind to port, then backupPort (if nonzero). - // if port is 0, chooses an available ephemeral port. - Responder(void* context, uint16 port, uint16 backupPort); + // tries to create a responder and bind to given port; returns nullptr on failure. + // caller must own and destroy the returned responder if it succeeds. + static Responder* makeResponder(uint16 port); ~Responder(); - bool initialize(); + // returns the latest errno value + int getErr() const; + + // output last error on stdout and status bar, including the passed message + void reportErr(const String& message) const; - // returns the latest errno value and resets it to 0. - int getErr(); - // returns the port if the socket was successfully bound to one. // if not, or if the socket is invalid, returns 0. uint16 getBoundPort() const; @@ -148,14 +157,48 @@ class NetworkEvents : public GenericProcessor int send(StringRef response); private: + // creates socket from given context and tries to bind to port. + // if port is 0, chooses an available ephemeral port. + Responder(uint16 port); + + ZMQContext::Ptr context; void* socket; uint16 boundPort; // 0 indicates not bound int lastErrno; - bool initialized; + + static const int RECV_TIMEOUT_MS = 100; }; + + void postTimestamppedStringToMidiBuffer(StringTS s); - void* zmqcontext; - uint16 lastGoodPort; + String handleSpecialMessages(const String& s); + + //* Split network message into name/value pairs (name1=val1 name2=val2 etc) */ + StringPairArray parseNetworkMessage(StringRef msg); + + // updates urlport and the portString Value (controlling the port input on the editor) + // 0 indicates disconnected. should only be called from the thread! + void updatePort(uint16 port); + + // get an endpoint url for the given port (using 0 to represent *) + static String getEndpoint(uint16 port); + + Value portString; // underlying value of the editor's port input + + // share a "dumb" pointer that doesn't take part in reference counting. + // want the context to be terminated by the time the static members are + // destroyed (see: https://github.com/zeromq/libzmq/issues/1708) + static ZMQContext* sharedContext; + static CriticalSection sharedContextLock; + + // To switch ports, a new socket is created and (if successful) assigned to this pointer, + // and then the thread will switch to using this socket at the next opportunity. + ScopedPointer nextResponder; + CriticalSection nextResponderLock; + + uint16 urlport; // 0 indicates not connected + + Atomic restart; float threshold; float bufferZone; @@ -163,8 +206,6 @@ class NetworkEvents : public GenericProcessor bool state; bool firstTime; - Time timer; - std::queue networkMessagesQueue; std::queue simulation; diff --git a/Source/Plugins/NetworkEvents/NetworkEventsEditor.cpp b/Source/Plugins/NetworkEvents/NetworkEventsEditor.cpp index 7638870ab..b790ca2ea 100644 --- a/Source/Plugins/NetworkEvents/NetworkEventsEditor.cpp +++ b/Source/Plugins/NetworkEvents/NetworkEventsEditor.cpp @@ -56,7 +56,7 @@ NetworkEventsEditor::NetworkEventsEditor(GenericProcessor* parentNode, bool useD addAndMakeVisible(startRecord); */ - labelPort = new Label("Port", String(p->urlport)); + labelPort = new Label("Port", p->getPortString()); labelPort->setBounds(70,85,80,18); labelPort->setFont(Font("Default", 15, Font::plain)); labelPort->setColour(Label::textColourId, Colours::white); @@ -76,7 +76,6 @@ NetworkEventsEditor::NetworkEventsEditor(GenericProcessor* parentNode, bool useD addAndMakeVisible(labelPort); setEnabledState(false); - } @@ -87,7 +86,7 @@ void NetworkEventsEditor::buttonEvent(Button* button) if (button == restartConnection) { NetworkEvents *p= (NetworkEvents *)getProcessor(); - p->setNewListeningPort(p->urlport); + p->restartConnection(); } /* if (button == trialSimulation) @@ -115,25 +114,33 @@ void NetworkEventsEditor::setLabelColor(juce::Colour color) labelPort->setColour(Label::backgroundColourId, color); } -void NetworkEventsEditor::setPortString(const String& port) + +void NetworkEventsEditor::setPortText(const String& text) { - labelPort->setText(port, dontSendNotification); + labelPort->setText(text, dontSendNotification); } + void NetworkEventsEditor::labelTextChanged(juce::Label *label) { - if (label == labelPort) - { - int32 portInput = label->getText().getIntValue(); - if (portInput < 0 || portInput > (1 << 16) - 1) + if (label == labelPort) + { + NetworkEvents *p = (NetworkEvents *)getProcessor(); + bool success = true; + + uint16 port; + if (!portFromString(label->getText(), &port)) { - CoreServices::sendStatusMessage("Warning: port out of range; selecting one automatically"); - portInput = 0; + CoreServices::sendStatusMessage("NetworkEvents: Invalid port"); + success = false; } - auto port = static_cast(portInput); - NetworkEvents *p= (NetworkEvents *)getProcessor(); - p->setNewListeningPort(port); + success = success && p->setNewListeningPort(port); + if (!success) + { + // revert to reflect current port + setPortText(p->getPortString()); + } } } @@ -144,3 +151,24 @@ NetworkEventsEditor::~NetworkEventsEditor() } +bool NetworkEventsEditor::portFromString(const String& portString, uint16* port) +{ + if (portString.trim() == "*") // wildcard, special case + { + *port = 0; + return true; + } + + if (portString.indexOfAnyOf("0123456789") == -1) + { + return false; + } + + int32 portInput = portString.getIntValue(); + if (portInput <= 0 || portInput > (1 << 16) - 1) + { + return false; + } + *port = static_cast(portInput); + return true; +} diff --git a/Source/Plugins/NetworkEvents/NetworkEventsEditor.h b/Source/Plugins/NetworkEvents/NetworkEventsEditor.h index 99dd91622..ddb95999d 100644 --- a/Source/Plugins/NetworkEvents/NetworkEventsEditor.h +++ b/Source/Plugins/NetworkEvents/NetworkEventsEditor.h @@ -47,14 +47,18 @@ class NetworkEventsEditor : public GenericEditor,public Label::Listener void buttonEvent(Button* button); void labelTextChanged(juce::Label *); void setLabelColor(juce::Colour color); - void setPortString(const String& port); + void setPortText(const String& text); private: + // interprets input string as a port number (1-65535); returns false if invalid + // or out of range, else sets *port to parsed value. as a special case, if portString + // is "*", sets *port to 0 and returns true. + static bool portFromString(const String& portString, uint16* port); + ScopedPointer restartConnection; ScopedPointer