Skip to content

Commit

Permalink
Merge 3d5bbab into 6098762
Browse files Browse the repository at this point in the history
  • Loading branch information
fbacchella committed Feb 16, 2019
2 parents 6098762 + 3d5bbab commit a786913
Showing 1 changed file with 20 additions and 55 deletions.
75 changes: 20 additions & 55 deletions src/main/java/org/zeromq/ZContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

import java.io.Closeable;
import java.nio.channels.Selector;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Poller;
Expand All @@ -32,12 +32,12 @@ public class ZContext implements Closeable
/**
* List of sockets managed by this ZContext
*/
private final List<Socket> sockets;
private final Set<Socket> sockets;

/**
* List of selectors managed by this ZContext
*/
private final List<Selector> selectors;
private final Set<Selector> selectors;

/**
* Number of io threads allocated to this context, default 1
Expand All @@ -53,7 +53,7 @@ public class ZContext implements Closeable
/**
* Linger timeout, default 0
*/
private int linger;
private volatile int linger;

/**
* Send/receive HWM for pipes
Expand All @@ -63,17 +63,12 @@ public class ZContext implements Closeable
/**
* ZMQ_SNDHWM for normal sockets
*/
private int sndhwm;
private volatile int sndhwm;

/**
* ZMQ_RCVHWM for normal sockets
*/
private int rcvhwm;

/**
* Make ZContext threadsafe
*/
private final Lock mutex;
private volatile int rcvhwm;

/**
* Class Constructor
Expand All @@ -90,9 +85,8 @@ public ZContext(int ioThreads)

private ZContext(Context context, boolean main, int ioThreads)
{
this.sockets = new CopyOnWriteArrayList<>();
this.selectors = new CopyOnWriteArrayList<>();
this.mutex = new ReentrantLock();
this.sockets = ConcurrentHashMap.newKeySet();
this.selectors = ConcurrentHashMap.newKeySet();
this.context = context;
this.ioThreads = ioThreads;
this.main = main;
Expand Down Expand Up @@ -137,13 +131,7 @@ public Socket createSocket(SocketType type)
Socket socket = context.socket(type);
socket.setRcvHWM(this.rcvhwm);
socket.setSndHWM(this.sndhwm);
try {
mutex.lock();
sockets.add(socket);
}
finally {
mutex.unlock();
}
sockets.add(socket);
return socket;
}

Expand Down Expand Up @@ -173,13 +161,7 @@ public void destroySocket(Socket s)
}
s.setLinger(linger);
s.close();
try {
mutex.lock();
this.sockets.remove(s);
}
finally {
mutex.unlock();
}
this.sockets.remove(s);
}

/**
Expand Down Expand Up @@ -266,6 +248,8 @@ public int getIoThreads()
}

/**
* A deprecated function that does nothing.
*
* @param ioThreads the number of ioThreads to set
* @deprecated This value should not be changed after the context is initialized.
*/
Expand All @@ -288,13 +272,7 @@ public int getLinger()
*/
public void setLinger(int linger)
{
try {
mutex.lock();
this.linger = linger;
}
finally {
mutex.unlock();
}
this.linger = linger;
}

/**
Expand All @@ -305,13 +283,7 @@ public void setLinger(int linger)
*/
public void setRcvHWM(int rcvhwm)
{
try {
mutex.lock();
this.rcvhwm = rcvhwm;
}
finally {
mutex.unlock();
}
this.rcvhwm = rcvhwm;
}

/**
Expand All @@ -322,13 +294,7 @@ public void setRcvHWM(int rcvhwm)
*/
public void setSndHWM(int sndhwm)
{
try {
mutex.lock();
this.sndhwm = sndhwm;
}
finally {
mutex.unlock();
}
this.sndhwm = sndhwm;
}

/**
Expand Down Expand Up @@ -368,11 +334,12 @@ public void setContext(Context ctx)
}

/**
* Return a copy of the list of currently open sockets. Order is not meaningful.
* @return the sockets
*/
public List<Socket> getSockets()
{
return sockets;
return new ArrayList<>(sockets);
}

@Override
Expand All @@ -383,8 +350,6 @@ public void close()

public boolean isClosed()
{
synchronized (this) {
return context.isClosed();
}
return context.isClosed();
}
}

0 comments on commit a786913

Please sign in to comment.