Skip to content

Commit

Permalink
Merge pull request #265 from tne-lab/event-broadcaster-robust
Browse files Browse the repository at this point in the history
Improve Event Broadcaster robustness and feedback on error
  • Loading branch information
aacuevas committed Oct 10, 2018
2 parents fc01f00 + 77c86f9 commit 11bc708
Show file tree
Hide file tree
Showing 4 changed files with 191 additions and 31 deletions.
153 changes: 132 additions & 21 deletions Source/Plugins/EventBroadcaster/EventBroadcaster.cpp
Expand Up @@ -11,17 +11,85 @@
#include "EventBroadcaster.h"
#include "EventBroadcasterEditor.h"

std::shared_ptr<void> EventBroadcaster::getZMQContext() {
// Note: C++11 guarantees that initialization of static local variables occurs exactly once, even
// if multiple threads attempt to initialize the same static local variable concurrently.
EventBroadcaster::ZMQContext* EventBroadcaster::sharedContext = nullptr;
CriticalSection EventBroadcaster::sharedContextLock{};

EventBroadcaster::ZMQContext::ZMQContext(const ScopedLock& lock)
#ifdef ZEROMQ
static const std::shared_ptr<void> ctx(zmq_ctx_new(), zmq_ctx_destroy);
#else
static const std::shared_ptr<void> ctx;
: context(zmq_ctx_new())
#endif
{
sharedContext = this;
}

// ZMQContext is a ReferenceCountedObject with a pointer in each instance's
// socket pointer, so this only happens when the last instance is destroyed.
EventBroadcaster::ZMQContext::~ZMQContext()
{
ScopedLock lock(sharedContextLock);
sharedContext = nullptr;
#ifdef ZEROMQ
zmq_ctx_destroy(context);
#endif
}

void* EventBroadcaster::ZMQContext::createZMQSocket()
{
#ifdef ZEROMQ
jassert(context != nullptr);
return zmq_socket(context, ZMQ_PUB);
#endif
}

EventBroadcaster::ZMQSocketPtr::ZMQSocketPtr()
: std::unique_ptr<void, decltype(&closeZMQSocket)>(nullptr, &closeZMQSocket)
{
ScopedLock lock(sharedContextLock);
if (sharedContext == nullptr)
{
// first one, create the context
context = new ZMQContext(lock);
}
else
{
// use already-created context
context = sharedContext;
}

#ifdef ZEROMQ
reset(context->createZMQSocket());
#endif
}

EventBroadcaster::ZMQSocketPtr::~ZMQSocketPtr()
{
// close the socket before the context might get destroyed.
reset(nullptr);
}

int EventBroadcaster::unbindZMQSocket()
{
#ifdef ZEROMQ
void* socket = zmqSocket.get();
if (socket != nullptr && listeningPort != 0)
{
return zmq_unbind(socket, getEndpoint(listeningPort).toRawUTF8());
}
#endif
return ctx;
return 0;
}

int EventBroadcaster::rebindZMQSocket()
{
#ifdef ZEROMQ
void* socket = zmqSocket.get();
if (socket != nullptr && listeningPort != 0)
{
return zmq_bind(socket, getEndpoint(listeningPort).toRawUTF8());
}
#endif
return 0;
}

void EventBroadcaster::closeZMQSocket(void* socket)
{
Expand All @@ -30,16 +98,33 @@ void EventBroadcaster::closeZMQSocket(void* socket)
#endif
}

String EventBroadcaster::getEndpoint(int port)
{
return String("tcp://*:") + String(port);
}

void EventBroadcaster::reportActualListeningPort(int port)
{
listeningPort = port;
auto editor = static_cast<EventBroadcasterEditor*>(getEditor());
if (editor)
{
editor->setDisplayedPort(port);
}
}

EventBroadcaster::EventBroadcaster()
: GenericProcessor ("Event Broadcaster")
, zmqContext (getZMQContext())
, zmqSocket (nullptr, &closeZMQSocket)
, listeningPort (0)
{
setProcessorType (PROCESSOR_TYPE_SINK);

setListeningPort(5557);
int portToTry = 5557;
while (setListeningPort(portToTry) == EADDRINUSE)
{
// try the next port, looking for one not in use
portToTry++;
}
}


Expand All @@ -56,27 +141,53 @@ int EventBroadcaster::getListeningPort() const
}


void EventBroadcaster::setListeningPort(int port, bool forceRestart)
int EventBroadcaster::setListeningPort(int port, bool forceRestart)
{
if ((listeningPort != port) || forceRestart)
{
#ifdef ZEROMQ
zmqSocket.reset(zmq_socket(zmqContext.get(), ZMQ_PUB));
if (!zmqSocket)
// unbind current socket (if any) to free up port
unbindZMQSocket();
ZMQSocketPtr newSocket;
auto editor = static_cast<EventBroadcasterEditor*>(getEditor());
int status = 0;

if (!newSocket.get())
{
std::cout << "Failed to create socket: " << zmq_strerror(zmq_errno()) << std::endl;
return;
status = zmq_errno();
std::cout << "Failed to create socket: " << zmq_strerror(status) << std::endl;
}
else
{
if (0 != zmq_bind(newSocket.get(), getEndpoint(port).toRawUTF8()))
{
status = zmq_errno();
std::cout << "Failed to open socket: " << zmq_strerror(status) << std::endl;
}
else
{
// success
zmqSocket.swap(newSocket);
reportActualListeningPort(port);
return status;
}
}

String url = String("tcp://*:") + String(port);
if (0 != zmq_bind(zmqSocket.get(), url.toRawUTF8()))
// failure, try to rebind current socket to previous port
if (0 == rebindZMQSocket())
{
std::cout << "Failed to open socket: " << zmq_strerror(zmq_errno()) << std::endl;
return;
reportActualListeningPort(listeningPort);
}
#endif
else
{
reportActualListeningPort(0);
}
return status;

listeningPort = port;
#else
reportActualListeningPort(port);
return 0;
#endif
}
}

Expand Down
44 changes: 36 additions & 8 deletions Source/Plugins/EventBroadcaster/EventBroadcaster.h
Expand Up @@ -24,7 +24,6 @@

#include <memory>


class EventBroadcaster : public GenericProcessor
{
public:
Expand All @@ -33,7 +32,8 @@ class EventBroadcaster : public GenericProcessor
AudioProcessorEditor* createEditor() override;

int getListeningPort() const;
void setListeningPort (int port, bool forceRestart = false);
// returns 0 on success, else the errno value for the error that occurred.
int setListeningPort (int port, bool forceRestart = false);

void process (AudioSampleBuffer& continuousBuffer) override;
void handleEvent (const EventChannel* channelInfo, const MidiMessage& event, int samplePosition = 0) override;
Expand All @@ -44,14 +44,42 @@ class EventBroadcaster : public GenericProcessor


private:
void sendEvent(const MidiMessage& event, float eventSampleRate) const;
static std::shared_ptr<void> getZMQContext();
static void closeZMQSocket (void* socket);
class ZMQContext : public ReferenceCountedObject
{
public:
ZMQContext(const ScopedLock& lock);
~ZMQContext() override;
void* createZMQSocket();
private:
void* context;
};

static void closeZMQSocket(void* socket);

class ZMQSocketPtr : public std::unique_ptr<void, decltype(&closeZMQSocket)>
{
public:
ZMQSocketPtr();
~ZMQSocketPtr();
private:
ReferenceCountedObjectPtr<ZMQContext> context;
};

int unbindZMQSocket();
int rebindZMQSocket();

const std::shared_ptr<void> zmqContext;
std::unique_ptr<void, decltype (&closeZMQSocket)> zmqSocket;
void sendEvent(const MidiMessage& event, float eventSampleRate) const;
static String getEndpoint(int port);
// called from getListeningPort() depending on success/failure of ZMQ operations
void reportActualListeningPort(int port);

// share a "dumb" pointer that doesn't take part in reference counting.
// want the context to be terminated by the time the static members are
// destroyed (see: https://github.com/zeromq/libzmq/issues/1708)
static ZMQContext* sharedContext;
static CriticalSection sharedContextLock;
ZMQSocketPtr zmqSocket;
int listeningPort;

};


Expand Down
23 changes: 21 additions & 2 deletions Source/Plugins/EventBroadcaster/EventBroadcasterEditor.cpp
Expand Up @@ -46,7 +46,14 @@ void EventBroadcasterEditor::buttonEvent(Button* button)
if (button == restartConnection)
{
EventBroadcaster* p = (EventBroadcaster*)getProcessor();
p->setListeningPort(p->getListeningPort(), true);
int status = p->setListeningPort(p->getListeningPort(), true);

#ifdef ZEROMQ
if (status != 0)
{
CoreServices::sendStatusMessage(String("Restart failed: ") + zmq_strerror(status));
}
#endif
}
}

Expand All @@ -58,6 +65,18 @@ void EventBroadcasterEditor::labelTextChanged(juce::Label* label)
Value val = label->getTextValue();

EventBroadcaster* p = (EventBroadcaster*)getProcessor();
p->setListeningPort(val.getValue());
int status = p->setListeningPort(val.getValue());

#ifdef ZEROMQ
if (status != 0)
{
CoreServices::sendStatusMessage(String("Port change failed: ") + zmq_strerror(status));
}
#endif
}
}

void EventBroadcasterEditor::setDisplayedPort(int port)
{
portLabel->setText(String(port), dontSendNotification);
}
2 changes: 2 additions & 0 deletions Source/Plugins/EventBroadcaster/EventBroadcasterEditor.h
Expand Up @@ -30,6 +30,8 @@ class EventBroadcasterEditor : public GenericEditor, public Label::Listener
void buttonEvent(Button* button) override;
void labelTextChanged(juce::Label* label) override;

void setDisplayedPort(int port);

private:
ScopedPointer<UtilityButton> restartConnection;
ScopedPointer<Label> urlLabel;
Expand Down

0 comments on commit 11bc708

Please sign in to comment.