Skip to content

Commit

Permalink
Merge e81fd7d into 752c422
Browse files Browse the repository at this point in the history
  • Loading branch information
fredoboulo committed Mar 29, 2019
2 parents 752c422 + e81fd7d commit 48bb1e5
Show file tree
Hide file tree
Showing 26 changed files with 403 additions and 182 deletions.
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,13 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>3.0.0</version>
<dependencies>
<dependency>
<groupId>com.puppycrawl.tools</groupId>
<artifactId>checkstyle</artifactId>
<version>8.18</version>
</dependency>
</dependencies>
<executions>
<execution>
<phase>validate</phase>
Expand Down
6 changes: 6 additions & 0 deletions src/checkstyle/checks.xml
Original file line number Diff line number Diff line change
Expand Up @@ -115,5 +115,11 @@
PLUS, PLUS_ASSIGN, QUESTION, SL, SLIST, SL_ASSIGN, SR, SR_ASSIGN,
STAR, STAR_ASSIGN, TYPE_EXTENSION_AND"/>
</module>
<module name="IllegalImport">
<property name="illegalPkgs"
value="java.util.function, java.util.stream" />
<property name="illegalClasses"
value="java.util.Objects, java.util.Optional" />
</module>
</module>
</module>
2 changes: 2 additions & 0 deletions src/main/java/org/zeromq/UncheckedZMQException.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

public abstract class UncheckedZMQException extends RuntimeException
{
private static final long serialVersionUID = 1L;

public UncheckedZMQException()
{
super();
Expand Down
5 changes: 3 additions & 2 deletions src/main/java/org/zeromq/ZActor.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;

import org.zeromq.ZMQ.Socket;
import org.zeromq.ZPoller.EventsHandler;

import zmq.util.Objects;

/**
* <p>First implementation of a background actor remotely controlled for ØMQ.</p>
*
Expand Down Expand Up @@ -451,7 +452,7 @@ public ZActor(final SelectorCreator selector, final Actor actor, final String mo
*/
@Deprecated
public ZActor(final ZContext context, final SelectorCreator selector, final Actor actor, final String motdelafin,
final Object... args)
final Object... args)
{
this(context, actor, motdelafin, args);
}
Expand Down
7 changes: 4 additions & 3 deletions src/main/java/org/zeromq/ZAuth.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.UUID;

import org.zeromq.ZMQ.Socket;
import org.zeromq.ZMQ.Socket.Mechanism;
import org.zeromq.util.ZMetadata;

import zmq.util.Objects;

/**
* A ZAuth actor takes over authentication for all incoming connections in
* its context. You can whitelist or blacklist peers based on IP address,
Expand Down Expand Up @@ -240,13 +241,13 @@ public static class ZapReply
public final String identity; // not part of the ZAP protocol, but handy information for user

private ZapReply(String version, String sequence, int statusCode, String statusText, String userId,
ZMetadata metadata)
ZMetadata metadata)
{
this(version, sequence, statusCode, statusText, userId, metadata, null, null);
}

private ZapReply(String version, String sequence, int statusCode, String statusText, String userId,
ZMetadata metadata, String address, String identity)
ZMetadata metadata, String address, String identity)
{
assert (ZAP_VERSION.equals(version));
this.version = version;
Expand Down
7 changes: 4 additions & 3 deletions src/main/java/org/zeromq/ZBeacon.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@
import java.nio.channels.ClosedChannelException;
import java.nio.channels.DatagramChannel;
import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import zmq.util.Objects;

public class ZBeacon
{
public static final long DEFAULT_BROADCAST_INTERVAL = 1000L;
Expand All @@ -25,7 +26,7 @@ public class ZBeacon
private final BroadcastServer broadcastServer;
private final AtomicReference<byte[]> prefix = new AtomicReference<>(new byte[0]);
private final AtomicLong broadcastInterval = new AtomicLong(DEFAULT_BROADCAST_INTERVAL);
private final AtomicReference<Listener> listener = new AtomicReference<Listener>();
private final AtomicReference<Listener> listener = new AtomicReference<>();

public ZBeacon(int port, byte[] beacon)
{
Expand All @@ -48,7 +49,7 @@ public ZBeacon(String host, int port, byte[] beacon, boolean ignoreLocalAddress,
}

private ZBeacon(String host, byte[] serverAddress, int port, byte[] beacon, long broadcastInterval,
boolean ignoreLocalAddress, boolean blocking)
boolean ignoreLocalAddress, boolean blocking)
{
Objects.requireNonNull(host, "Host cannot be null");
Objects.requireNonNull(serverAddress, "Server address cannot be null");
Expand Down
19 changes: 10 additions & 9 deletions src/main/java/org/zeromq/ZLoop.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;

import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.PollItem;
import org.zeromq.ZMQ.Poller;

import zmq.util.Objects;

/**
* The ZLoop class provides an event-driven reactor pattern. The reactor
* handles zmq.PollItem items (pollers or writers, sockets or fds), and
Expand Down Expand Up @@ -162,8 +163,8 @@ public int addPoller(PollItem pollItem, IZLoopHandler handler, Object arg)
System.out.printf(
"I: zloop: register %s poller (%s, %s)\n",
pollItem.getSocket() != null ? pollItem.getSocket().getType() : "RAW",
pollItem.getSocket(),
pollItem.getRawSocket());
pollItem.getSocket(),
pollItem.getRawSocket());
}
return 0;
}
Expand All @@ -187,8 +188,8 @@ public void removePoller(PollItem pollItem)
System.out.printf(
"I: zloop: cancel %s poller (%s, %s)",
pollItem.getSocket() != null ? pollItem.getSocket().getType() : "RAW",
pollItem.getSocket(),
pollItem.getRawSocket());
pollItem.getSocket(),
pollItem.getRawSocket());
}

}
Expand Down Expand Up @@ -309,8 +310,8 @@ public int start()
System.out.printf(
"I: zloop: can't poll %s socket (%s, %s)\n",
poller.item.getSocket() != null ? poller.item.getSocket().getType() : "RAW",
poller.item.getSocket(),
poller.item.getRawSocket());
poller.item.getSocket(),
poller.item.getRawSocket());
}
// Give handler one chance to handle error, then kill
// poller because it'll disrupt the reactor otherwise.
Expand All @@ -327,8 +328,8 @@ public int start()
System.out.printf(
"I: zloop: call %s socket handler (%s, %s)\n",
poller.item.getSocket() != null ? poller.item.getSocket().getType() : "RAW",
poller.item.getSocket(),
poller.item.getRawSocket());
poller.item.getSocket(),
poller.item.getRawSocket());
}
rc = poller.handler.handle(this, poller.item, poller.arg);
if (rc == -1) {
Expand Down
27 changes: 1 addition & 26 deletions src/main/java/org/zeromq/ZMQ.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;

import org.zeromq.proto.ZPicture;

Expand Down Expand Up @@ -3340,19 +3339,6 @@ public byte[] recv()
return recv(0);
}

/**
* Stream of incoming messages
* <p>
* This API is in DRAFT state and is subject to change at ANY time until declared stable
*
* @return infinite stream of the incoming messages
*/
@Draft
public Stream<byte[]> recvStream()
{
return Stream.generate(this::recv);
}

/**
* Receives a message.
*
Expand Down Expand Up @@ -3445,17 +3431,6 @@ public String recvStr()
return recvStr(0);
}

/**
* This API is in DRAFT state and is subject to change at ANY time until declared stable
*
* @return infinite stream of the incoming messages as a String object
*/
@Draft
public Stream<String> recvStrStream()
{
return Stream.generate(this::recvStr);
}

/**
* Receives a message as a string.
*
Expand Down Expand Up @@ -3636,7 +3611,7 @@ public static class Poller implements Closeable
private long timeout;

// When socket is removed from polling, store free slots here
private LinkedList<Integer> freeSlots;
private final LinkedList<Integer> freeSlots;

/**
* Class constructor.
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/org/zeromq/ZMonitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.UUID;

import org.zeromq.ZMQ.Socket;

import zmq.util.Objects;

/**
* The ZMonitor actor provides an API for obtaining socket events such as
* connected, listen, disconnected, etc. Socket events are only available
Expand Down
77 changes: 39 additions & 38 deletions src/main/java/org/zeromq/ZMsg.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@
import java.util.Collection;
import java.util.Deque;
import java.util.Iterator;
import java.util.function.Consumer;

import org.zeromq.ZMQ.Socket;

import zmq.util.Draft;
import zmq.util.function.Consumer;

/**
* The ZMsg class provides methods to send and receive multipart messages
Expand Down Expand Up @@ -252,43 +253,43 @@ public static ZMsg recvMsg(Socket socket, int flag)
return msg;
}

/**
* This API is in DRAFT state and is subject to change at ANY time until declared stable
* handle incoming message with a handler
*
* @param socket
* @param flags see ZMQ constants
* @param handler handler to handle incoming message
* @param exceptionHandler handler to handle exceptions
*/
@Draft
public static void recvMsg(ZMQ.Socket socket, int flags,
Consumer<ZMsg> handler,
Consumer<ZMQException> exceptionHandler)
{
try {
handler.accept(ZMsg.recvMsg(socket, flags));
}
catch (ZMQException e) {
exceptionHandler.accept(e);
}
}

/**
* This API is in DRAFT state and is subject to change at ANY time until declared stable
* handle incoming message with a handler
*
* @param socket
* @param flags see ZMQ constants
* @param handler handler to handle incoming message
*/
@Draft
public static void recvMsg(ZMQ.Socket socket, int flags, Consumer<ZMsg> handler)
{
handler.accept(ZMsg.recvMsg(socket, flags));
}

/**
/**
* This API is in DRAFT state and is subject to change at ANY time until declared stable
* handle incoming message with a handler
*
* @param socket
* @param flags see ZMQ constants
* @param handler handler to handle incoming message
* @param exceptionHandler handler to handle exceptions
*/
@Draft
public static void recvMsg(ZMQ.Socket socket, int flags,
Consumer<ZMsg> handler,
Consumer<ZMQException> exceptionHandler)
{
try {
handler.accept(ZMsg.recvMsg(socket, flags));
}
catch (ZMQException e) {
exceptionHandler.accept(e);
}
}

/**
* This API is in DRAFT state and is subject to change at ANY time until declared stable
* handle incoming message with a handler
*
* @param socket
* @param flags see ZMQ constants
* @param handler handler to handle incoming message
*/
@Draft
public static void recvMsg(ZMQ.Socket socket, int flags, Consumer<ZMsg> handler)
{
handler.accept(ZMsg.recvMsg(socket, flags));
}

/**
* Save message to an open data output stream.
*
* Data saved as:
Expand Down

0 comments on commit 48bb1e5

Please sign in to comment.