Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

problem: jeromq is not thread-safe #783

Merged
merged 1 commit into from
May 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/main/java/zmq/Ctx.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ private enum Side

// Array of pointers to mailboxes for both application and I/O threads.
private int slotCount;
private Mailbox[] slots;
private IMailbox[] slots;

// Mailbox for zmq_term thread.
private final Mailbox termMailbox;
Expand Down Expand Up @@ -435,7 +435,7 @@ private void initSlots()
finally {
optSync.unlock();
}
slots = new Mailbox[slotCount];
slots = new IMailbox[slotCount];

// Initialize the infrastructure for zmq_term thread.
slots[TERM_TID] = termMailbox;
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/zmq/IMailbox.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package zmq;

import java.io.Closeable;

public interface IMailbox extends Closeable
{
void send(final Command cmd);

Command recv(long timeout);
}
7 changes: 4 additions & 3 deletions src/main/java/zmq/Mailbox.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package zmq;

import java.io.Closeable;
import java.io.IOException;
import java.nio.channels.SelectableChannel;
import java.util.concurrent.locks.Lock;
Expand All @@ -9,7 +8,7 @@
import zmq.pipe.YPipe;
import zmq.util.Errno;

public final class Mailbox implements Closeable
public final class Mailbox implements IMailbox
{
// The pipe to store actual commands.
private final YPipe<Command> cpipe;
Expand Down Expand Up @@ -55,7 +54,8 @@ public SelectableChannel getFd()
return signaler.getFd();
}

void send(final Command cmd)
@Override
public void send(final Command cmd)
{
boolean ok = false;
sync.lock();
Expand All @@ -72,6 +72,7 @@ void send(final Command cmd)
}
}

@Override
public Command recv(long timeout)
{
Command cmd;
Expand Down
134 changes: 134 additions & 0 deletions src/main/java/zmq/MailboxSafe.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package zmq;

import zmq.pipe.YPipe;
import zmq.util.Errno;

import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.Condition;

public class MailboxSafe implements IMailbox
{
// The pipe to store actual commands.
private final YPipe<Command> cpipe;

// Synchronize access to the mailbox from receivers and senders
private final ReentrantLock sync;

// Condition variable to pass signals from writer thread to reader thread.
private final Condition condition;

private final ArrayList<Signaler> signalers;

// mailbox name, for better debugging
private final String name;

private final Errno errno;

public MailboxSafe(Ctx ctx, ReentrantLock sync, String name)
{
this.errno = ctx.errno();
this.cpipe = new YPipe<>(Config.COMMAND_PIPE_GRANULARITY.getValue());
this.sync = sync;
this.condition = this.sync.newCondition();
this.signalers = new ArrayList<Signaler>(10);
this.name = name;

// Get the pipe into passive state. That way, if the users starts by
// polling on the associated file descriptor it will get woken up when
// new command is posted.
Command cmd = cpipe.read();
assert (cmd == null);
}

public void addSignaler(Signaler signaler)
{
this.signalers.add(signaler);
}

public void removeSignaler(Signaler signaler)
{
this.signalers.remove(signaler);
}

public void clearSignalers()
{
this.signalers.clear();
}

@Override
public void send(Command cmd)
{
sync.lock();
try {
cpipe.write(cmd, false);
boolean ok = cpipe.flush();

if (!ok) {
condition.signalAll();

for (int i = 0; i < signalers.size(); i++) {
signalers.get(i).send();
}
}
}
finally {
sync.unlock();
}
}

@Override
public Command recv(long timeout)
{
Command cmd;

// Try to get the command straight away.
cmd = cpipe.read();
if (cmd != null) {
return cmd;
}

// If the timeout is zero, it will be quicker to release the lock, giving other a chance to send a command
// and immediately relock it.
if (timeout == 0) {
sync.unlock();
sync.lock();
}
else {
try {
// Wait for signal from the command sender.
condition.await(timeout, TimeUnit.MILLISECONDS);
}
catch (InterruptedException e) {
errno.set(ZError.EINTR);
return null;
}
}

// Another thread may already fetch the command
cmd = cpipe.read();
if (cmd == null) {
errno.set(ZError.EAGAIN);
return null;
}

return cmd;
}

@Override
public void close() throws IOException
{
// Work around problem that other threads might still be in our
// send() method, by waiting on the mutex before disappearing.
sync.lock();
sync.unlock();
}

@Override
public String toString()
{
return super.toString() + "[" + name + "]";
}
}
33 changes: 33 additions & 0 deletions src/main/java/zmq/Msg.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ enum Type
// keep track of relative read position
private int readIndex = 0;

private int routingId;

public Msg()
{
this(0);
Expand Down Expand Up @@ -435,4 +437,35 @@ public Msg putShortString(String data)
writeIndex += Wire.putShortString(dup, data);
return this;
}

/**
* Return the routing id of a message. The routing id represent the CLIENT socket that sent the message to the
* SERVER socket.
* @return the routing id
* */
public int getRoutingId()
{
return routingId;
}

/**
* Set the routing id on a message. The routing id represent the CLIENT socket which the message should be sent to.
* Only SERVER socket is currently using the routing id.
* @param routingId the routing id
* @return true if successfully set the routing id.
*/
public boolean setRoutingId(int routingId)
{
if (routingId != 0) {
this.routingId = routingId;
return true;
}

return false;
}

public void resetRoutingId()
{
routingId = 0;
}
}
Loading