diff --git a/Source/Plugins/NetworkEvents/NetworkEvents.cpp b/Source/Plugins/NetworkEvents/NetworkEvents.cpp index d61ddc987..4134a99b7 100644 --- a/Source/Plugins/NetworkEvents/NetworkEvents.cpp +++ b/Source/Plugins/NetworkEvents/NetworkEvents.cpp @@ -35,200 +35,51 @@ const int MAX_MESSAGE_LENGTH = 64000; #include #endif - -StringTS::StringTS() -{ - str = nullptr; - len= 0; - timestamp = 0; -} - - -std::vector StringTS::splitString (char sep) -{ - String S ((const char*)str, len); - String curr; - - std::list ls; - for (int k = 0; k < S.length(); ++k) - { - if (S[k] != sep) - { - curr+=S[k]; - } - else - { - ls.push_back (curr); - while (S[k] == sep && k < S.length()) - ++k; - - curr = ""; - if (S[k] != sep && k < S.length()) - curr += S[k]; - } - } - if (S.length() > 0) - { - if (S[S.length() - 1] != sep) - ls.push_back (curr); - } - - std::vector Svec (ls.begin(), ls.end()); - return Svec; -} - - -StringTS::StringTS (MidiMessage& event) -{ - const uint8* dataptr = event.getRawData(); - const int bufferSize = event.getRawDataSize(); - len = bufferSize - 6 - 8; // -6 for initial event prefix, -8 for timestamp at the end - - memcpy (×tamp, dataptr + 6 + len, 8); // remember to skip first six bytes - str = new uint8[len]; - memcpy (str,dataptr + 6, len); -} - - -StringTS& StringTS::operator= (const StringTS& rhs) -{ - delete (str); - len = rhs.len; - str = new uint8[len]; - memcpy (str,rhs.str,len); - timestamp = rhs.timestamp; - - return *this; -} - - -String StringTS::getString() -{ - return String ((const char*)str,len); -} - - -StringTS::StringTS (String S) -{ - Time t; - str = new uint8[S.length()]; - memcpy (str, S.toRawUTF8(), S.length()); - timestamp = t.getHighResolutionTicks(); - - len = S.length(); -} - - -StringTS::StringTS (String S, int64 ts_software) -{ - str = new uint8[S.length()]; - memcpy (str, S.toRawUTF8(), S.length()); - timestamp = ts_software; - - len = S.length(); -} - - -StringTS::StringTS (const StringTS& s) -{ - str = new uint8[s.len]; - memcpy (str, s.str, s.len); - timestamp = s.timestamp; - len = s.len; -} - - -StringTS::StringTS (unsigned char* buf, int _len, int64 ts_software) - : len (_len) - ,timestamp (ts_software) -{ - str = new juce::uint8[len]; - for (int k = 0; k < len; ++k) - str[k] = buf[k]; -} - - -StringTS::~StringTS() -{ - delete str; -} - - -/*********************************************/ -void* NetworkEvents::zmqcontext = nullptr; +NetworkEvents::ZMQContext* NetworkEvents::sharedContext = nullptr; +CriticalSection NetworkEvents::sharedContextLock{}; NetworkEvents::NetworkEvents() : GenericProcessor ("Network Events") , Thread ("NetworkThread") - , threshold (200.0) - , bufferZone (5.0f) - , state (false) + , makeNewSocket (true) + , requestedPort (5556) + , boundPort (0) { setProcessorType (PROCESSOR_TYPE_SOURCE); - createZmqContext(); - - firstTime = true; - responder = nullptr; - urlport = 5556; - threadRunning = false; - - opensocket(); + startThread(); sendSampleCount = false; // disable updating the continuous buffer sample counts, // since this processor only sends events - shutdown = false; } -void NetworkEvents::setNewListeningPort (int port) +void NetworkEvents::setNewListeningPort(uint16 port) { - // first, close existing thread. - closesocket(); - - // allow some time for thread to quit -#ifdef WIN32 - Sleep (300); -#else - usleep (300 * 1000); -#endif - - urlport = port; - opensocket(); + requestedPort = port; + makeNewSocket = true; } NetworkEvents::~NetworkEvents() { - shutdown = true; - closesocket(); + if (!stopThread(1000)) + { + jassertfalse; // shouldn't block for more than 100 ms, something's wrong + std::cerr << "Network thread timeout. Forcing thread termination, system could be left in an unstable state" << std::endl; + } } -bool NetworkEvents::closesocket() +String NetworkEvents::getCurrPortString() const { - std::cout << "Disabling network node" << std::endl; + return getPortString(boundPort); +} -#ifdef ZEROMQ - if (threadRunning) - { - lock.enter(); - zmq_close (responder); - zmq_ctx_destroy (zmqcontext); // this will cause the thread to exit - zmqcontext = nullptr; - lock.exit(); - - if (!stopThread(500)) - { - std::cerr << "Network thread timeout. Forcing thread termination, system could be lefr in an unstable state" << std::endl; - } - - if (! shutdown) - createZmqContext();// and this will take care that processor graph doesn't attempt to delete the context again - - } -#endif - return true; + +void NetworkEvents::restartConnection() +{ + makeNewSocket = true; } @@ -253,181 +104,19 @@ AudioProcessorEditor* NetworkEvents::createEditor () } -void NetworkEvents::setParameter (int parameterIndex, float newValue) -{ - /* - editor->updateParameterButtons(parameterIndex); - - Parameter& p = parameters.getReference(parameterIndex); - p.setValue(newValue, 0); - - threshold = newValue; - */ - //std::cout << float(p[0]) << std::endl; -} - - -void NetworkEvents::initSimulation() -{ - Time t; - - const int64 secondsToTicks = t.getHighResolutionTicksPerSecond(); - simulationStartTime = 3 * secondsToTicks + t.getHighResolutionTicks(); // start 10 seconds after - - simulation.push (StringTS ("ClearDesign", simulationStartTime)); - simulation.push (StringTS ("NewDesign Test", simulationStartTime + 0.5 * secondsToTicks)); - simulation.push (StringTS ("AddCondition Name GoRight TrialTypes 1 2 3", simulationStartTime + 0.6 * secondsToTicks)); - simulation.push (StringTS ("AddCondition Name GoLeft TrialTypes 4 5 6", simulationStartTime + 0.6 * secondsToTicks)); -} - - -void NetworkEvents::simulateDesignAndTrials () -{ - Time t; - while (simulation.size() > 0) - { - int64 currenttime = t.getHighResolutionTicks(); - StringTS S = simulation.front(); - if (currenttime > S.timestamp) - { - // handle special messages - handleSpecialMessages (S); - - postTimestamppedStringToMidiBuffer (S); - //getUIComponent()->getLogWindow()->addLineToLog(S.getString()); - simulation.pop(); - } - else - { - break; - } - } - -} - -void NetworkEvents::postTimestamppedStringToMidiBuffer (StringTS s) +void NetworkEvents::postTimestamppedStringToMidiBuffer (const StringTS& s) { MetaDataValueArray md; md.add(new MetaDataValue(MetaDataDescriptor::INT64, 1, &s.timestamp)); - TextEventPtr event = TextEvent::createTextEvent(messageChannel, CoreServices::getGlobalTimestamp(), String::fromUTF8(reinterpret_cast(s.str), s.len), md); + TextEventPtr event = TextEvent::createTextEvent(messageChannel, CoreServices::getGlobalTimestamp(), s.str, md); addEvent(messageChannel, event, 0); } -void NetworkEvents::simulateStopRecord() -{ - Time t; - simulation.push (StringTS ("StopRecord", t.getHighResolutionTicks())); -} - - -void NetworkEvents::simulateStartRecord() -{ - Time t; - simulation.push (StringTS ("StartRecord", t.getHighResolutionTicks())); -} - - -void NetworkEvents::simulateSingleTrial() -{ - std::cout << "Simulating trial." << std::endl; - - const int numTrials = 1; - const float ITI = 0.7; - const float TrialLength = 0.4; - - Time t; - - if (firstTime) - { - firstTime = false; - initSimulation(); - } - - int64 secondsToTicks = t.getHighResolutionTicksPerSecond(); - simulationStartTime = 3 * secondsToTicks + t.getHighResolutionTicks(); // start 10 seconds after - - // trial every 5 seconds - for (int k = 0; k < numTrials; ++k) - { - simulation.push (StringTS ("TrialStart", simulationStartTime + ITI * k * secondsToTicks)); - - if (k % 2 == 0) - // 100 ms after trial start - simulation.push (StringTS ("TrialType 2", simulationStartTime + (ITI * k + 0.1) * secondsToTicks)); - else - // 100 ms after trial start - simulation.push (StringTS ("TrialType 4", simulationStartTime + (ITI * k + 0.1) * secondsToTicks)); - - // 100 ms after trial start - simulation.push (StringTS ("TrialAlign", simulationStartTime + (ITI * k + 0.1) * secondsToTicks)); - // 300 ms after trial start - simulation.push (StringTS ("TrialOutcome 1", simulationStartTime + (ITI * k + 0.3) * secondsToTicks)); - // 400 ms after trial start - simulation.push (StringTS ("TrialEnd", simulationStartTime + (ITI * k + TrialLength) * secondsToTicks)); - } -} - - -String NetworkEvents::handleSpecialMessages (StringTS msg) +String NetworkEvents::handleSpecialMessages(const String& s) { - /* - std::vector input = msg.splitString(' '); - if (input[0] == "StartRecord") - { - getUIComponent()->getLogWindow()->addLineToLog("Remote triggered start recording"); - - if (input.size() > 1) - { - getUIComponent()->getLogWindow()->addLineToLog("Remote setting session name to "+input[1]); - // session name was also given. - getProcessorGraph()->getRecordNode()->setDirectoryName(input[1]); - } - const MessageManagerLock mmLock; - getControlPanel()->recordButton->setToggleState(true,true); - return String("OK"); - // getControlPanel()->placeMessageInQueue("StartRecord"); - } if (input[0] == "SetSessionName") - { - getProcessorGraph()->getRecordNode()->setDirectoryName(input[1]); - } else if (input[0] == "StopRecord") - { - const MessageManagerLock mmLock; - //getControlPanel()->placeMessageInQueue("StopRecord"); - getControlPanel()->recordButton->setToggleState(false,true); - return String("OK"); - } else if (input[0] == "ProcessorCommunication") - { - ProcessorGraph *g = getProcessorGraph(); - Array p = g->getListOfProcessors(); - for (int k=0;kgetName().toLowerCase() == input[1].toLowerCase()) - { - String Query=""; - for (int i=2;iinterProcessorCommunication(Query); - } - } - - return String("OK"); - } - - */ - - /** Start/stop data acquisition */ - String s = msg.getString(); - /** Command is first substring */ - StringArray inputs = StringArray::fromTokens (s, " "); - String cmd = String (inputs[0]); + String cmd = s.initialSectionNotContaining(" "); const MessageManagerLock mmLock; if (cmd.compareIgnoreCase ("StartAcquisition") == 0) @@ -448,13 +137,12 @@ String NetworkEvents::handleSpecialMessages (StringTS msg) } else if (String ("StartRecord").compareIgnoreCase (cmd) == 0) { - if (! CoreServices::getRecordingStatus() - && CoreServices::getAcquisitionStatus()) + if (! CoreServices::getRecordingStatus()) { /** First set optional parameters (name/value pairs)*/ if (s.contains ("=")) { - String params = s.substring (cmd.length()); + String params = s.substring (cmd.length()).trim(); StringPairArray dict = parseNetworkMessage (params); StringArray keys = dict.getAllKeys(); @@ -535,89 +223,107 @@ void NetworkEvents::process (AudioSampleBuffer& buffer) { setTimestampAndSamples(CoreServices::getGlobalTimestamp(),0); - lock.enter(); + ScopedLock lock(queueLock); while (! networkMessagesQueue.empty()) { - StringTS msg = networkMessagesQueue.front(); + const StringTS& msg = networkMessagesQueue.front(); postTimestamppedStringToMidiBuffer (msg); - CoreServices::sendStatusMessage ( ("Network event received: " + msg.getString()).toRawUTF8()); networkMessagesQueue.pop(); } - - lock.exit(); -} - - -void NetworkEvents::opensocket() -{ - startThread(); } void NetworkEvents::run() { #ifdef ZEROMQ - responder = zmq_socket (zmqcontext, ZMQ_REP); - String url= String ("tcp://*:") + String (urlport); - int rc = zmq_bind (responder, url.toRawUTF8()); + HeapBlock buffer(MAX_MESSAGE_LENGTH); - if (rc != 0) + // responder should always be valid (bound to a port) if it is non-null + ScopedPointer responder(new Responder(0)); // use any available port as default + if (responder->isValid()) { - // failed to open socket? - std::cout << "Failed to open socket: " << zmq_strerror (zmq_errno()) << std::endl; - return; + boundPort = responder->getBoundPort(); + } + else + { + responder = nullptr; + boundPort = 0; } - threadRunning = true; - unsigned char* buffer = new unsigned char[MAX_MESSAGE_LENGTH]; - int result = -1; + // purposely don't call updatePortString - makeNewSocket will be true on startup, + // so wait to try the requested port (5556) before updating the editor. - while (threadRunning) + while (!threadShouldExit()) { - result = zmq_recv (responder, buffer, MAX_MESSAGE_LENGTH - 1, 0); // blocking + // change socket if necessary + while (makeNewSocket.exchange(false)) + { + uint16 nextPort = requestedPort; // (maybe the newly entered port on the editor text box) + if (nextPort > 0 && nextPort == boundPort) // i.e. this is a restart + { + responder = nullptr; // destroy old one, which frees the port + boundPort = 0; + } - juce::int64 timestamp_software = timer.getHighResolutionTicks(); + if (nextPort == 0) + { + CoreServices::sendStatusMessage("NetworkEvents: Selecting port automatically"); + } + + ScopedPointer newResponder(new Responder(nextPort)); + if (newResponder->isValid()) + { + // replace the current socket with the newly created socket + responder = newResponder; + boundPort = responder->getBoundPort(); + } + else + { + newResponder->reportErr("Failed to connect to port " + String(nextPort)); + } - if (result < 0) // will only happen when responder dies. - break; + updatePortString(boundPort); + } - StringTS Msg (buffer, result, timestamp_software); - if (result > 0) + // if we don't have a vaild (connected) socket, keep looping until we do + if (responder == nullptr) { - lock.enter(); - networkMessagesQueue.push (Msg); - lock.exit(); + wait(100); + continue; + } - //std::cout << "Received message!" << std::endl; - // handle special messages - String response = handleSpecialMessages (Msg); + int result = responder->receive(buffer); // times out after RECV_TIMEOUT_MS ms - zmq_send (responder, response.getCharPointer(), response.length(), 0); - } - else + juce::int64 timestamp_software = Time::getHighResolutionTicks(); + + if (result == -1) { - String zeroMessageError = "Recieved Zero Message?!?!?"; - //std::cout << "Received Zero Message!" << std::endl; + jassert(responder->getErr() == EAGAIN); // if not, figure out why! + continue; + } - zmq_send (responder, zeroMessageError.getCharPointer(), zeroMessageError.length(), 0); + // received message. read string from the buffer. + String msg = String::fromUTF8(buffer, result); + { + ScopedLock lock(queueLock); + networkMessagesQueue.push({ msg, timestamp_software }); } - } - zmq_close (responder); + CoreServices::sendStatusMessage("Network event received: " + msg); + //std::cout << "Received message!" << std::endl; - delete[] buffer; - threadRunning = false; + String response = handleSpecialMessages(msg); + + if (responder->send(response) == -1) + { + jassertfalse; // figure out why this is failing! + } + } - return; #endif } -int NetworkEvents::getDefaultNumOutputs() const -{ - return 0; -} - bool NetworkEvents::isReady() { return true; @@ -644,7 +350,9 @@ void NetworkEvents::setEnabledState (bool newState) void NetworkEvents::saveCustomParametersToXml (XmlElement* parentElement) { XmlElement* mainNode = parentElement->createNewChildElement ("NETWORKEVENTS"); - mainNode->setAttribute ("port", urlport); + uint16 currBoundPort = boundPort; + // save the actual bound port if any, otherwise the last attempted port. + mainNode->setAttribute ("port", currBoundPort ? currBoundPort : requestedPort.load()); } @@ -656,69 +364,247 @@ void NetworkEvents::loadCustomParametersFromXml() { if (mainNode->hasTagName ("NETWORKEVENTS")) { - setNewListeningPort (mainNode->getIntAttribute("port")); + auto port = static_cast(mainNode->getIntAttribute("port")); + if (port != 0) + { + setNewListeningPort(port); + } } } } } -void NetworkEvents::createZmqContext() +StringPairArray NetworkEvents::parseNetworkMessage(StringRef msg) +{ + StringArray args = StringArray::fromTokens(msg, " ", "'\""); + args.removeEmptyStrings(); + + StringPairArray dict; + for (const String& arg : args) + { + int iEq = arg.indexOfChar('='); + if (iEq >= 0) + { + String key = arg.substring(0, iEq); + String val = arg.substring(iEq + 1).unquoted(); + dict.set(key, val); + } + } + + return dict; +} + + +void NetworkEvents::updatePortString(uint16 port) +{ + auto ed = static_cast(getEditor()); + if (ed) + { + const MessageManagerLock mmLock; + ed->setPortText(getPortString(port)); + } +} + + +String NetworkEvents::getEndpoint(uint16 port) +{ + return "tcp://*:" + (port == 0 ? "*" : String(port)); +} + + +String NetworkEvents::getPortString(uint16 port) { #ifdef ZEROMQ - lock.enter(); - if (zmqcontext == nullptr) - zmqcontext = zmq_ctx_new(); //<-- this is only available in version 3+ - lock.exit(); + if (port == 0) + { + return ""; + } + + return String(port); +#else + return ""; #endif } -StringPairArray NetworkEvents::parseNetworkMessage (String msg) +/*** ZMQContext ***/ + +NetworkEvents::ZMQContext::ZMQContext(const ScopedLock& lock) +#ifdef ZEROMQ + : context(zmq_ctx_new()) +#endif { - StringArray splitted; - splitted.addTokens (msg, "=", ""); + // sharedContextLock should already be held here + sharedContext = this; +} - StringPairArray dict = StringPairArray(); - String key = ""; - String value = ""; +// ZMQContext is a ReferenceCountedObject with a pointer in each instance's +// socket pointer, so this only happens when the last instance is destroyed. +NetworkEvents::ZMQContext::~ZMQContext() +{ + ScopedLock lock(sharedContextLock); + sharedContext = nullptr; +#ifdef ZEROMQ + zmq_ctx_destroy(context); +#endif +} - for (int i = 0; i < splitted.size() - 1; ++i) - { - String s1 = splitted[i]; - String s2 = splitted[i + 1]; +void* NetworkEvents::ZMQContext::createSocket() +{ +#ifdef ZEROMQ + jassert(context != nullptr); + return zmq_socket(context, ZMQ_REP); +#else + return nullptr; +#endif +} - /** Get key */ - if (! key.isEmpty()) + +/*** Responder ***/ + +const int NetworkEvents::Responder::RECV_TIMEOUT_MS = 100; + +NetworkEvents::Responder::Responder(uint16 port) + : socket (nullptr) + , valid (false) + , boundPort (0) + , lastErrno (0) +{ + { + ScopedLock lock(sharedContextLock); + if (sharedContext == nullptr) { - if (s1.contains (" ")) - { - int i1 = s1.lastIndexOf (" "); - key = s1.substring (i1 + 1); - } - else - { - key = s1; - } + // first one, create the context + context = new ZMQContext(lock); } else { - key = s1.trim(); + // use already-created context + context = sharedContext; } + } + +#ifdef ZEROMQ + socket = context->createSocket(); + if (!socket) + { + lastErrno = zmq_errno(); + return; + } - /** Get value */ - if (i < splitted.size() - 2) + // set socket to timeout when receiving rather than blocking forever + if (zmq_setsockopt(socket, ZMQ_RCVTIMEO, &RECV_TIMEOUT_MS, sizeof(RECV_TIMEOUT_MS)) == -1) + { + lastErrno = zmq_errno(); + return; + } + + // bind to endpoint + if (zmq_bind(socket, getEndpoint(port).toRawUTF8()) == -1) + { + lastErrno = zmq_errno(); + return; + } + + // if requested port was 0, find out which port was actually used + if (port == 0) + { + const size_t BUF_LEN = 32; + size_t len = BUF_LEN; + char endpoint[BUF_LEN]; + if (zmq_getsockopt(socket, ZMQ_LAST_ENDPOINT, endpoint, &len) == -1) { - int i1 = s2.lastIndexOf (" "); - value = s2.substring (0, i1); + lastErrno = zmq_errno(); + return; } - else + + port = String(endpoint).getTrailingIntValue(); + } + + jassert(port > 0); + valid = true; + boundPort = port; +#endif +} + + +NetworkEvents::Responder::~Responder() +{ +#ifdef ZEROMQ + if (socket) + { + if (boundPort != 0) { - value = s2; + // unbind/disconnect to free the port (critical for restarts) + zmq_unbind(socket, getEndpoint(boundPort).toRawUTF8()); } - dict.set (key, value); + int linger = 0; + zmq_setsockopt(socket, ZMQ_LINGER, &linger, sizeof(linger)); + zmq_close(socket); } +#endif +} - return dict; + +int NetworkEvents::Responder::getErr() const +{ + return lastErrno; +} + + +void NetworkEvents::Responder::reportErr(const String& message) const +{ +#ifdef ZEROMQ + String msg = "NetworkEvents: " + message + " (" + zmq_strerror(lastErrno) + ")"; + std::cout << msg << std::endl; + CoreServices::sendStatusMessage(msg); +#endif +}; + + +bool NetworkEvents::Responder::isValid() const +{ + return valid; +} + + +uint16 NetworkEvents::Responder::getBoundPort() const +{ + return boundPort; +} + + +int NetworkEvents::Responder::receive(void* buf) +{ +#ifdef ZEROMQ + int res = zmq_recv(socket, buf, MAX_MESSAGE_LENGTH, 0); + if (res == -1) + { + lastErrno = zmq_errno(); + } + else + { + res = jmin(res, MAX_MESSAGE_LENGTH); + } + return res; +#else + return -1; +#endif +} + + +int NetworkEvents::Responder::send(StringRef response) +{ +#ifdef ZEROMQ + int res = zmq_send(socket, response, response.length(), 0); + if (res == -1) + { + lastErrno = zmq_errno(); + } + return res; +#else + return -1; +#endif } diff --git a/Source/Plugins/NetworkEvents/NetworkEvents.h b/Source/Plugins/NetworkEvents/NetworkEvents.h index 1f92a2822..19acb680c 100644 --- a/Source/Plugins/NetworkEvents/NetworkEvents.h +++ b/Source/Plugins/NetworkEvents/NetworkEvents.h @@ -39,28 +39,7 @@ #include #include - -class StringTS -{ -public: - StringTS(); - StringTS(MidiMessage& event); - StringTS(String S); - StringTS(String S, int64 ts_software); - StringTS(const StringTS& s); - StringTS(unsigned char* buf, int _len, int64 ts_software); - ~StringTS(); - - std::vector splitString(char sep); - String getString(); - - StringTS& operator= (const StringTS& rhs); - - juce::uint8* str; - int len; - juce::int64 timestamp; -}; - +#include /** Sends incoming TCP/IP messages from 0MQ to the events buffer @@ -80,8 +59,6 @@ class NetworkEvents : public GenericProcessor void process (AudioSampleBuffer& buffer) override; - void setParameter (int parameterIndex, float newValue) override; - void createEventChannels() override; void setEnabledState (bool newState) override; @@ -96,56 +73,107 @@ class NetworkEvents : public GenericProcessor // ========================================================================= - int getDefaultNumOutputs() const; + void run() override; - //int64 getExtrapolatedHardwareTimestamp (int64 softwareTS) const; + // passing 0 corresponds to wildcard ("*") and picks any available port + void setNewListeningPort (uint16 port); - String handleSpecialMessages (StringTS msg); - std::vector splitString (String S, char sep); - - void initSimulation(); - void simulateDesignAndTrials (); - void simulateSingleTrial(); - void simulateStartRecord(); - void simulateStopRecord(); - void run(); - void opensocket(); - bool closesocket(); - - void postTimestamppedStringToMidiBuffer (StringTS s); - void setNewListeningPort (int port); - - int urlport; - String socketStatus; - std::atomic threadRunning; + // gets a string for the editor's port input to reflect current urlport + String getCurrPortString() const; + void restartConnection(); private: - void createZmqContext(); + struct StringTS + { + String str; + int64 timestamp; + }; + + class ZMQContext : public ReferenceCountedObject + { + public: + ZMQContext(const ScopedLock& lock); + ~ZMQContext() override; + void* createSocket(); + + typedef ReferenceCountedObjectPtr Ptr; + + private: + void* context; + + JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR(ZMQContext); + }; + + // RAII wrapper for REP socket + class Responder + { + public: + // creates socket from given context and tries to bind to port. + // if port is 0, chooses an available ephemeral port. + Responder(uint16 port); + ~Responder(); + + // returns the latest errno value + int getErr() const; + + // output last error on stdout and status bar, including the passed message + void reportErr(const String& message) const; + + bool isValid() const; + + // returns the port if the socket was successfully bound to one, else 0 + // if not, or if the socket is invalid, returns 0. + uint16 getBoundPort() const; + + // receives message into buf (blocking call). + // returns the number of bytes actually received, or -1 if there is an error. + int receive(void* buf); + + // sends a message. returns the same as zmq_send. + int send(StringRef response); + + private: + ZMQContext::Ptr context; + void* socket; + bool valid; + uint16 boundPort; + int lastErrno; + + static const int RECV_TIMEOUT_MS; + + JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR(Responder); + }; + + void postTimestamppedStringToMidiBuffer(const StringTS& s); + + String handleSpecialMessages(const String& s); //* Split network message into name/value pairs (name1=val1 name2=val2 etc) */ - StringPairArray parseNetworkMessage (String msg); + StringPairArray parseNetworkMessage(StringRef msg); - StringTS createStringTS (String S, int64 t); + // updates urlport and the port input on the editor (< 0 indicates not connected) + void updatePortString(uint16 port); - static void* zmqcontext; - void* responder; + // get an endpoint url for the given port (using 0 to represent *) + static String getEndpoint(uint16 port); - float threshold; - float bufferZone; + // get a representation of the given port for use on the editor + static String getPortString(uint16 port); - bool state; - bool shutdown; - bool firstTime; + // share a "dumb" pointer that doesn't take part in reference counting. + // want the context to be terminated by the time the static members are + // destroyed (see: https://github.com/zeromq/libzmq/issues/1708) + static ZMQContext* sharedContext; + static CriticalSection sharedContextLock; - Time timer; + std::atomic makeNewSocket; // port change or restart needed (depending on requestedPort) + std::atomic requestedPort; // never set by the thread; 0 means any free port + std::atomic boundPort; // only set by the thread; 0 means no connection std::queue networkMessagesQueue; - std::queue simulation; - - CriticalSection lock; - int64 simulationStartTime; - + CriticalSection queueLock; + const EventChannel* messageChannel{ nullptr }; JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (NetworkEvents); diff --git a/Source/Plugins/NetworkEvents/NetworkEventsEditor.cpp b/Source/Plugins/NetworkEvents/NetworkEventsEditor.cpp index c096881c2..cb62caa18 100644 --- a/Source/Plugins/NetworkEvents/NetworkEventsEditor.cpp +++ b/Source/Plugins/NetworkEvents/NetworkEventsEditor.cpp @@ -42,72 +42,27 @@ NetworkEventsEditor::NetworkEventsEditor(GenericProcessor* parentNode, bool useD restartConnection->addListener(this); addAndMakeVisible(restartConnection); - - /* - trialSimulation = new UtilityButton("Trial",Font("Default", 15, Font::plain)); - trialSimulation->setBounds(20,25,80,18); - trialSimulation->addListener(this); - addAndMakeVisible(trialSimulation); - - - startRecord = new UtilityButton("Start Record",Font("Default", 15, Font::plain)); - startRecord->setBounds(20,55,100,18); - startRecord->addListener(this); - addAndMakeVisible(startRecord); - */ - - labelPort = new Label("Port", String(p->urlport)); + labelPort = new Label("Port", p->getCurrPortString()); labelPort->setBounds(70,85,80,18); labelPort->setFont(Font("Default", 15, Font::plain)); labelPort->setColour(Label::textColourId, Colours::white); - - - -// NetworkEvents *processor = (NetworkEvents*) getProcessor(); - - //if (processor->threadRunning) - labelPort->setColour(Label::backgroundColourId, Colours::grey); -// else -// labelPort->setColour(Label::backgroundColourId, Colours::red); - - + labelPort->setColour(Label::backgroundColourId, Colours::grey); labelPort->setEditable(true); labelPort->addListener(this); addAndMakeVisible(labelPort); setEnabledState(false); - } void NetworkEventsEditor::buttonEvent(Button* button) { - //NetworkEvents *processor = (NetworkEvents*) getProcessor(); if (button == restartConnection) { NetworkEvents *p= (NetworkEvents *)getProcessor(); - p->setNewListeningPort(p->urlport); - } - /* - if (button == trialSimulation) - { - processor->simulateSingleTrial(); - - } else if (button == startRecord) - { - if (startRecord->getLabel() == "Start Record") - { - processor->simulateStartRecord(); - startRecord->setLabel("Stop Record"); - } else if (startRecord->getLabel() == "Stop Record") - { - processor->simulateStopRecord(); - startRecord->setLabel("Start Record"); - } + p->restartConnection(); } - */ - } void NetworkEventsEditor::setLabelColor(juce::Colour color) @@ -116,14 +71,27 @@ void NetworkEventsEditor::setLabelColor(juce::Colour color) } -void NetworkEventsEditor::labelTextChanged(juce::Label *label) +void NetworkEventsEditor::setPortText(const String& text) { - if (label == labelPort) - { - Value val = label->getTextValue(); + labelPort->setText(text, dontSendNotification); +} - NetworkEvents *p= (NetworkEvents *)getProcessor(); - p->setNewListeningPort(val.getValue()); + +void NetworkEventsEditor::labelTextChanged(juce::Label *label) +{ + if (label == labelPort) + { + NetworkEvents *p = (NetworkEvents *)getProcessor(); + + uint16 port; + if (!portFromString(label->getText(), &port)) + { + CoreServices::sendStatusMessage("NetworkEvents: Invalid port"); + setPortText(p->getCurrPortString()); + return; + } + + p->setNewListeningPort(port); } } @@ -134,3 +102,24 @@ NetworkEventsEditor::~NetworkEventsEditor() } +bool NetworkEventsEditor::portFromString(const String& portString, uint16* port) +{ + if (portString.trim() == "*") // wildcard, special case + { + *port = 0; + return true; + } + + if (portString.indexOfAnyOf("0123456789") == -1) + { + return false; + } + + int32 portInput = portString.getIntValue(); + if (portInput <= 0 || portInput > (1 << 16) - 1) + { + return false; + } + *port = static_cast(portInput); + return true; +} diff --git a/Source/Plugins/NetworkEvents/NetworkEventsEditor.h b/Source/Plugins/NetworkEvents/NetworkEventsEditor.h index f96f16796..ddb95999d 100644 --- a/Source/Plugins/NetworkEvents/NetworkEventsEditor.h +++ b/Source/Plugins/NetworkEvents/NetworkEventsEditor.h @@ -47,13 +47,18 @@ class NetworkEventsEditor : public GenericEditor,public Label::Listener void buttonEvent(Button* button); void labelTextChanged(juce::Label *); void setLabelColor(juce::Colour color); + void setPortText(const String& text); private: + // interprets input string as a port number (1-65535); returns false if invalid + // or out of range, else sets *port to parsed value. as a special case, if portString + // is "*", sets *port to 0 and returns true. + static bool portFromString(const String& portString, uint16* port); + ScopedPointer restartConnection; ScopedPointer