Skip to content

Commit

Permalink
problem: jeromq is not thread-safe
Browse files Browse the repository at this point in the history
Solution: port CLIENT and SERVER sockets from libzmq, which are thread-safe sockets.

The thread-safe sockets family includes other sockets, which are not part of this commit.

Also, CLIENT and SERVER cannot be used with a poller at the moment. For poller support, zmq_poller from libzmq has to be ported as well.
  • Loading branch information
somdoron committed May 8, 2020
1 parent f2826bb commit 5c4da28
Show file tree
Hide file tree
Showing 14 changed files with 1,399 additions and 641 deletions.
4 changes: 2 additions & 2 deletions src/main/java/zmq/Ctx.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ private enum Side

// Array of pointers to mailboxes for both application and I/O threads.
private int slotCount;
private Mailbox[] slots;
private IMailbox[] slots;

// Mailbox for zmq_term thread.
private final Mailbox termMailbox;
Expand Down Expand Up @@ -435,7 +435,7 @@ private void initSlots()
finally {
optSync.unlock();
}
slots = new Mailbox[slotCount];
slots = new IMailbox[slotCount];

// Initialize the infrastructure for zmq_term thread.
slots[TERM_TID] = termMailbox;
Expand Down
8 changes: 8 additions & 0 deletions src/main/java/zmq/IMailbox.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package zmq;

import java.io.Closeable;

public interface IMailbox extends Closeable {
void send(final Command cmd);
Command recv(long timeout);
}
7 changes: 4 additions & 3 deletions src/main/java/zmq/Mailbox.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package zmq;

import java.io.Closeable;
import java.io.IOException;
import java.nio.channels.SelectableChannel;
import java.util.concurrent.locks.Lock;
Expand All @@ -9,7 +8,7 @@
import zmq.pipe.YPipe;
import zmq.util.Errno;

public final class Mailbox implements Closeable
public final class Mailbox implements IMailbox
{
// The pipe to store actual commands.
private final YPipe<Command> cpipe;
Expand Down Expand Up @@ -55,7 +54,8 @@ public SelectableChannel getFd()
return signaler.getFd();
}

void send(final Command cmd)
@Override
public void send(final Command cmd)
{
boolean ok = false;
sync.lock();
Expand All @@ -72,6 +72,7 @@ void send(final Command cmd)
}
}

@Override
public Command recv(long timeout)
{
Command cmd;
Expand Down
129 changes: 129 additions & 0 deletions src/main/java/zmq/MailboxSafe.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package zmq;

import zmq.pipe.YPipe;
import zmq.util.Errno;

import java.io.IOException;
import java.util.Vector;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.Condition;

public class MailboxSafe implements IMailbox
{
// The pipe to store actual commands.
private final YPipe<Command> cpipe;

// Synchronize access to the mailbox from receivers and senders
private final ReentrantLock sync;

// Condition variable to pass signals from writer thread to reader thread.
private final Condition condition;

private final Vector<Signaler> signalers;

// mailbox name, for better debugging
private final String name;

private final Errno errno;

public MailboxSafe(Ctx ctx, ReentrantLock sync, String name)
{
this.errno = ctx.errno();
this.cpipe = new YPipe<>(Config.COMMAND_PIPE_GRANULARITY.getValue());
this.sync = sync;
this.condition = this.sync.newCondition();
this.signalers = new Vector<Signaler>(10);
this.name = name;

// Get the pipe into passive state. That way, if the users starts by
// polling on the associated file descriptor it will get woken up when
// new command is posted.
Command cmd = cpipe.read();
assert (cmd == null);
}

public void addSignaler(Signaler signaler)
{
this.signalers.add(signaler);
}

public void removeSignaler(Signaler signaler)
{
this.signalers.remove(signaler);
}

public void clearSignalers()
{
this.signalers.clear();
}

@Override
public void send(Command cmd)
{
sync.lock();
try {
cpipe.write(cmd, false);
boolean ok = cpipe.flush();

if (!ok) {
condition.signalAll();
signalers.forEach(Signaler::send);
}
}
finally {
sync.unlock();
}
}

@Override
public Command recv(long timeout)
{
Command cmd;

// Try to get the command straight away.
cmd = cpipe.read();
if (cmd != null) {
return cmd;
}

// If the timeout is zero, it will be quicker to release the lock, giving other a chance to send a command
// and immediately relock it.
if (timeout == 0) {
sync.unlock();
sync.lock();
} else {
try {
// Wait for signal from the command sender.
condition.await(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
errno.set(ZError.EINTR);
return null;
}
}

// Another thread may already fetch the command
cmd = cpipe.read();
if (cmd == null) {
errno.set(ZError.EAGAIN);
return null;
}

return cmd;
}

@Override
public void close() throws IOException
{
// Work around problem that other threads might still be in our
// send() method, by waiting on the mutex before disappearing.
sync.lock();
sync.unlock();
}

@Override
public String toString()
{
return super.toString() + "[" + name + "]";
}
}
33 changes: 33 additions & 0 deletions src/main/java/zmq/Msg.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ enum Type
// keep track of relative read position
private int readIndex = 0;

private int routingId;

public Msg()
{
this(0);
Expand Down Expand Up @@ -435,4 +437,35 @@ public Msg putShortString(String data)
writeIndex += Wire.putShortString(dup, data);
return this;
}

/**
* Return the routing id of a message. The routing id represent the CLIENT socket that sent the message to the
* SERVER socket.
* @return the routing id
* */
public int getRoutingId()
{
return routingId;
}

/**
* Set the routing id on a message. The routing id represent the CLIENT socket which the message should be sent to.
* Only SERVER socket is currently using the routing id.
* @param routingId the routing id
* @return true if successfully set the routing id.
*/
public boolean setRoutingId(int routingId)
{
if (routingId != 0) {
this.routingId = routingId;
return true;
}

return false;
}

public void resetRoutingId()
{
routingId = 0;
}
}
Loading

0 comments on commit 5c4da28

Please sign in to comment.